AWS Security Lake
Amazon Security Lake is a purpose-built security data lake managed by AWS. It automatically centralizes security data from AWS environments, SaaS providers, on-premises systems, and custom sources into a single store in your own S3 bucket using the Open Cybersecurity Schema Framework (OCSF), making cross-organization security analytics straightforward.
AWS Security Lake Integration
Enrich security events with compliance control mappings before they land in AWS Security Lake. This guide uses a Kinesis Data Firehose delivery stream with a Lambda transformation function to call the Secberus /v1/map endpoint on each batch of records in flight.
Architecture
Security Events
(GuardDuty, CloudTrail,
custom sources, etc.)
│
▼
Kinesis Data Firehose
│
│ invoke (batch)
▼
Lambda Transform Function
├─ calls POST /v1/map
└─ adds secberus_compliance field
│
▼
AWS Security Lake (S3)
enriched records with
compliance control mappings
Step 1 — Discover Available Framework IDs
Before writing any code you need to know which framework IDs to use. Retrieve the full list:
curl -s -H "authorization: $SECBERUS_API_KEY" \
https://compliance.secberus.ai/v1/frameworks \
| jq '.[] | {id, name, region}'
Sample output:
{"id": "pci_dss_v4", "name": "PCI DSS v4.0", "region": "Global"}
{"id": "nist_800_53_r5", "name": "NIST SP 800-53 Rev 5", "region": "US"}
{"id": "nist_csf_v2", "name": "NIST Cybersecurity Framework v2.0", "region": "US"}
{"id": "iso_27001", "name": "ISO/IEC 27001:2022", "region": "Global"}
{"id": "soc2", "name": "SOC 2 Type II", "region": "US"}
{"id": "aicpa_tsc", "name": "AICPA Trust Services Criteria", "region": "US"}
Note the id values you want to map against — you will set these in the Lambda environment variables in Step 3.
Step 2 — Store the API Key in AWS Secrets Manager
Store your Secberus API key as a plaintext secret (not JSON):
aws secretsmanager create-secret \
--name "secberus/api-key" \
--secret-string "YOUR_API_KEY_HERE"
Note the returned ARN — you will set it as a Lambda environment variable in Step 3.
Step 3 — Create the Lambda Transformation Function
Create a Python 3.12 Lambda function with the following code. Firehose passes records in batches; the function calls /v1/map once per batch (not once per record) to minimize API calls, then attaches results to each event before returning them to Firehose.
import base64
import json
import os
import urllib.request
import urllib.error
import boto3
# Module-level cache — avoids a Secrets Manager call on every warm invocation
_api_key = None
def get_api_key():
global _api_key
if _api_key:
return _api_key
client = boto3.client("secretsmanager")
resp = client.get_secret_value(SecretId=os.environ["SECBERUS_API_KEY_ARN"])
_api_key = resp["SecretString"]
return _api_key
def call_map_api(api_key, documents):
frameworks = os.environ.get("SECBERUS_FRAMEWORKS", "pci_dss_v4").split(",")
min_sim = float(os.environ.get("SECBERUS_MIN_SIMILARITY", "0.3"))
topk = int(os.environ.get("SECBERUS_TOPK", "3"))
payload = json.dumps({
"frameworks": frameworks,
"min_similarity": min_sim,
"topk": topk,
"documents": documents,
}).encode("utf-8")
req = urllib.request.Request(
"https://compliance.secberus.ai/v1/map",
data=payload,
headers={
"authorization": api_key,
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
def lambda_handler(event, context):
api_key = get_api_key()
# Decode all Firehose records
decoded = []
for record in event["records"]:
raw = base64.b64decode(record["data"]).decode("utf-8")
try:
data = json.loads(raw)
except json.JSONDecodeError:
data = {"_raw": raw}
decoded.append({"recordId": record["recordId"], "data": data})
# Build one document per record — use whichever field carries meaningful text
source_field = os.environ.get("SECBERUS_SOURCE_FIELD", "description")
documents = [
{"id": item["recordId"], "document": text}
for item in decoded
if (text := item["data"].get(source_field, "").strip())
]
# Single batched API call for all records in this Firehose batch
compliance_by_record = {}
if documents:
try:
result = call_map_api(api_key, documents)
for fw in result.get("frameworks", []):
for control in fw.get("controls", []):
rid = control["document_id"]
compliance_by_record.setdefault(rid, []).append({
"framework_id": fw["framework_id"],
"control_id": control["control"]["id"],
"family": control["control"].get("family"),
"similarity": control["similarity"],
"confidence": control["confidence"],
})
except urllib.error.HTTPError as e:
print(f"Secberus API HTTP error {e.code}: {e.read().decode()}")
except Exception as e:
print(f"Secberus API error: {e}")
# Re-encode records with enrichment attached
output = []
for item in decoded:
rid = item["recordId"]
enriched = item["data"]
enriched["secberus_compliance"] = compliance_by_record.get(rid, [])
encoded = base64.b64encode(
(json.dumps(enriched) + "\n").encode("utf-8")
).decode("utf-8")
output.append({"recordId": rid, "result": "Ok", "data": encoded})
return {"records": output}
Lambda environment variables
| Variable | Required | Description |
|---|---|---|
SECBERUS_API_KEY_ARN |
Yes | Secrets Manager ARN from Step 2 |
SECBERUS_FRAMEWORKS |
Yes | Comma-separated framework IDs from Step 1 (e.g., pci_dss_v4,nist_800_53_r5) |
SECBERUS_SOURCE_FIELD |
Yes | JSON field in your event that contains the text to map (e.g., description, finding_description, message) |
SECBERUS_MIN_SIMILARITY |
No | Minimum similarity threshold, 0.01–1.0 (default: 0.3) |
SECBERUS_TOPK |
No | Max controls returned per framework/document (default: 3) |
Recommended Lambda settings
| Setting | Recommended value |
|---|---|
| Runtime | Python 3.12 |
| Timeout | 60 seconds |
| Memory | 256 MB |
| Concurrency | Match your Firehose parallelization factor |
Step 4 — Grant IAM Permissions
Lambda execution role — add an inline policy allowing the function to read its secret:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": "arn:aws:secretsmanager:<region>:<account-id>:secret:secberus/api-key-*"
}
]
}
Firehose — allow the delivery stream to invoke the Lambda:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:GetFunctionConfiguration"
],
"Resource": "arn:aws:lambda:<region>:<account-id>:function:<your-function-name>:$LATEST"
}
]
}
Step 5 — Configure Kinesis Data Firehose
When creating or updating your Firehose delivery stream:
- Transform source records with AWS Lambda — enable this and select your function from Step 3
- Buffer size — set to
1 MiB(smaller batches reduce per-event latency) - Buffer interval — set to
60 seconds - Destination — select Amazon S3 and point to your Security Lake custom source bucket
Security Lake custom source: If you haven't already registered a custom source in Security Lake, do so via the console (Security Lake → Custom sources → Add custom source) before pointing Firehose at the generated S3 path. Security Lake will provide the S3 URI, Glue database, and table name.
Step 6 — Verify Enrichment
After sending a test event through the stream, query the enriched records in Security Lake using Athena:
SELECT
description,
secberus_compliance
FROM "<your_security_lake_database>"."<your_custom_source_table>"
WHERE secberus_compliance IS NOT NULL
LIMIT 10;
A fully enriched record looks like this:
{
"description": "Root account login detected without MFA",
"secberus_compliance": [
{
"framework_id": "pci_dss_v4",
"control_id": "8.4.2",
"family": "Identify Users and Authenticate Access",
"similarity": 0.91,
"confidence": "High"
},
{
"framework_id": "nist_800_53_r5",
"control_id": "IA-2",
"family": "Identification and Authentication",
"similarity": 0.88,
"confidence": "High"
}
]
}
Error Handling
The Lambda writes API errors to CloudWatch Logs and returns the original record unmodified with "secberus_compliance": [] rather than failing the Firehose delivery. This means pipeline errors degrade gracefully — records still land in Security Lake, just without compliance enrichment.
To alert on enrichment failures:
- Create a CloudWatch Logs metric filter on your Lambda log group matching
Secberus API - Set an alarm on that metric to notify your on-call team
To handle records that fail Firehose processing entirely (result: ProcessingFailed), configure an S3 backup bucket on the Firehose stream for reprocessing.
Configuration Reference
Fields added to each event
| Field | Type | Description |
|---|---|---|
secberus_compliance |
array | One entry per matched control across all frameworks |
secberus_compliance[].framework_id |
string | Framework identifier (e.g., pci_dss_v4) |
secberus_compliance[].control_id |
string | Control identifier (e.g., 8.4.2) |
secberus_compliance[].family |
string | Control family name |
secberus_compliance[].similarity |
float | Similarity score (0.0–1.0) |
secberus_compliance[].confidence |
string | High, Medium, Low, or Very Low |
Key request parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
frameworks |
array[string] | — | Framework IDs to map against (from Step 1) |
min_similarity |
float | — | Exclude matches below this score (0.01–1.0) |
min_confidence |
string | — | Exclude matches below this level: High, Medium, Low, Very Low. Mutually exclusive with min_similarity. |
topk |
integer | 1 | Max controls returned per framework/document pair |
API Quick Reference
| Detail | Value |
|---|---|
| Endpoint | POST https://compliance.secberus.ai/v1/map |
| Auth header | authorization: <api-key> |
| Content-Type | application/json |
| List frameworks | GET https://compliance.secberus.ai/v1/frameworks |
| Similarity range | 0.01–1.0 |
| Confidence levels | Very Low, Low, Medium, High |
| Default topk | 1 |
| Lambda timeout | 60 s recommended |