AWS logo

AWS Security Lake

Amazon Security Lake is a purpose-built security data lake managed by AWS. It automatically centralizes security data from AWS environments, SaaS providers, on-premises systems, and custom sources into a single store in your own S3 bucket using the Open Cybersecurity Schema Framework (OCSF), making cross-organization security analytics straightforward.

AWS Security Lake Integration

Enrich security events with compliance control mappings before they land in AWS Security Lake. This guide uses a Kinesis Data Firehose delivery stream with a Lambda transformation function to call the Secberus /v1/map endpoint on each batch of records in flight.

Architecture

Security Events
(GuardDuty, CloudTrail,
 custom sources, etc.)
        │
        ▼
Kinesis Data Firehose
        │
        │  invoke (batch)
        ▼
Lambda Transform Function
  ├─ calls POST /v1/map
  └─ adds secberus_compliance field
        │
        ▼
AWS Security Lake (S3)
  enriched records with
  compliance control mappings

Step 1 — Discover Available Framework IDs

Before writing any code you need to know which framework IDs to use. Retrieve the full list:

curl -s -H "authorization: $SECBERUS_API_KEY" \
  https://compliance.secberus.ai/v1/frameworks \
  | jq '.[] | {id, name, region}'

Sample output:

{"id": "pci_dss_v4",     "name": "PCI DSS v4.0",                       "region": "Global"}
{"id": "nist_800_53_r5", "name": "NIST SP 800-53 Rev 5",               "region": "US"}
{"id": "nist_csf_v2",    "name": "NIST Cybersecurity Framework v2.0",  "region": "US"}
{"id": "iso_27001",      "name": "ISO/IEC 27001:2022",                 "region": "Global"}
{"id": "soc2",           "name": "SOC 2 Type II",                      "region": "US"}
{"id": "aicpa_tsc",      "name": "AICPA Trust Services Criteria",      "region": "US"}

Note the id values you want to map against — you will set these in the Lambda environment variables in Step 3.


Step 2 — Store the API Key in AWS Secrets Manager

Store your Secberus API key as a plaintext secret (not JSON):

aws secretsmanager create-secret \
  --name "secberus/api-key" \
  --secret-string "YOUR_API_KEY_HERE"

Note the returned ARN — you will set it as a Lambda environment variable in Step 3.


Step 3 — Create the Lambda Transformation Function

Create a Python 3.12 Lambda function with the following code. Firehose passes records in batches; the function calls /v1/map once per batch (not once per record) to minimize API calls, then attaches results to each event before returning them to Firehose.

import base64
import json
import os
import urllib.request
import urllib.error
import boto3

# Module-level cache — avoids a Secrets Manager call on every warm invocation
_api_key = None

def get_api_key():
    global _api_key
    if _api_key:
        return _api_key
    client = boto3.client("secretsmanager")
    resp = client.get_secret_value(SecretId=os.environ["SECBERUS_API_KEY_ARN"])
    _api_key = resp["SecretString"]
    return _api_key


def call_map_api(api_key, documents):
    frameworks   = os.environ.get("SECBERUS_FRAMEWORKS", "pci_dss_v4").split(",")
    min_sim      = float(os.environ.get("SECBERUS_MIN_SIMILARITY", "0.3"))
    topk         = int(os.environ.get("SECBERUS_TOPK", "3"))

    payload = json.dumps({
        "frameworks":     frameworks,
        "min_similarity": min_sim,
        "topk":           topk,
        "documents":      documents,
    }).encode("utf-8")

    req = urllib.request.Request(
        "https://compliance.secberus.ai/v1/map",
        data=payload,
        headers={
            "authorization": api_key,
            "Content-Type": "application/json",
        },
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=15) as resp:
        return json.loads(resp.read())


def lambda_handler(event, context):
    api_key = get_api_key()

    # Decode all Firehose records
    decoded = []
    for record in event["records"]:
        raw = base64.b64decode(record["data"]).decode("utf-8")
        try:
            data = json.loads(raw)
        except json.JSONDecodeError:
            data = {"_raw": raw}
        decoded.append({"recordId": record["recordId"], "data": data})

    # Build one document per record — use whichever field carries meaningful text
    source_field = os.environ.get("SECBERUS_SOURCE_FIELD", "description")
    documents = [
        {"id": item["recordId"], "document": text}
        for item in decoded
        if (text := item["data"].get(source_field, "").strip())
    ]

    # Single batched API call for all records in this Firehose batch
    compliance_by_record = {}
    if documents:
        try:
            result = call_map_api(api_key, documents)
            for fw in result.get("frameworks", []):
                for control in fw.get("controls", []):
                    rid = control["document_id"]
                    compliance_by_record.setdefault(rid, []).append({
                        "framework_id": fw["framework_id"],
                        "control_id":   control["control"]["id"],
                        "family":       control["control"].get("family"),
                        "similarity":   control["similarity"],
                        "confidence":   control["confidence"],
                    })
        except urllib.error.HTTPError as e:
            print(f"Secberus API HTTP error {e.code}: {e.read().decode()}")
        except Exception as e:
            print(f"Secberus API error: {e}")

    # Re-encode records with enrichment attached
    output = []
    for item in decoded:
        rid = item["recordId"]
        enriched = item["data"]
        enriched["secberus_compliance"] = compliance_by_record.get(rid, [])

        encoded = base64.b64encode(
            (json.dumps(enriched) + "\n").encode("utf-8")
        ).decode("utf-8")

        output.append({"recordId": rid, "result": "Ok", "data": encoded})

    return {"records": output}

Lambda environment variables

Variable Required Description
SECBERUS_API_KEY_ARN Yes Secrets Manager ARN from Step 2
SECBERUS_FRAMEWORKS Yes Comma-separated framework IDs from Step 1 (e.g., pci_dss_v4,nist_800_53_r5)
SECBERUS_SOURCE_FIELD Yes JSON field in your event that contains the text to map (e.g., description, finding_description, message)
SECBERUS_MIN_SIMILARITY No Minimum similarity threshold, 0.01–1.0 (default: 0.3)
SECBERUS_TOPK No Max controls returned per framework/document (default: 3)
Setting Recommended value
Runtime Python 3.12
Timeout 60 seconds
Memory 256 MB
Concurrency Match your Firehose parallelization factor

Step 4 — Grant IAM Permissions

Lambda execution role — add an inline policy allowing the function to read its secret:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "secretsmanager:GetSecretValue",
      "Resource": "arn:aws:secretsmanager:<region>:<account-id>:secret:secberus/api-key-*"
    }
  ]
}

Firehose — allow the delivery stream to invoke the Lambda:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction",
        "lambda:GetFunctionConfiguration"
      ],
      "Resource": "arn:aws:lambda:<region>:<account-id>:function:<your-function-name>:$LATEST"
    }
  ]
}

Step 5 — Configure Kinesis Data Firehose

When creating or updating your Firehose delivery stream:

  1. Transform source records with AWS Lambda — enable this and select your function from Step 3
  2. Buffer size — set to 1 MiB (smaller batches reduce per-event latency)
  3. Buffer interval — set to 60 seconds
  4. Destination — select Amazon S3 and point to your Security Lake custom source bucket

Security Lake custom source: If you haven't already registered a custom source in Security Lake, do so via the console (Security Lake → Custom sources → Add custom source) before pointing Firehose at the generated S3 path. Security Lake will provide the S3 URI, Glue database, and table name.


Step 6 — Verify Enrichment

After sending a test event through the stream, query the enriched records in Security Lake using Athena:

SELECT
  description,
  secberus_compliance
FROM "<your_security_lake_database>"."<your_custom_source_table>"
WHERE secberus_compliance IS NOT NULL
LIMIT 10;

A fully enriched record looks like this:

{
  "description": "Root account login detected without MFA",
  "secberus_compliance": [
    {
      "framework_id": "pci_dss_v4",
      "control_id":   "8.4.2",
      "family":       "Identify Users and Authenticate Access",
      "similarity":   0.91,
      "confidence":   "High"
    },
    {
      "framework_id": "nist_800_53_r5",
      "control_id":   "IA-2",
      "family":       "Identification and Authentication",
      "similarity":   0.88,
      "confidence":   "High"
    }
  ]
}

Error Handling

The Lambda writes API errors to CloudWatch Logs and returns the original record unmodified with "secberus_compliance": [] rather than failing the Firehose delivery. This means pipeline errors degrade gracefully — records still land in Security Lake, just without compliance enrichment.

To alert on enrichment failures:

  1. Create a CloudWatch Logs metric filter on your Lambda log group matching Secberus API
  2. Set an alarm on that metric to notify your on-call team

To handle records that fail Firehose processing entirely (result: ProcessingFailed), configure an S3 backup bucket on the Firehose stream for reprocessing.


Configuration Reference

Fields added to each event

Field Type Description
secberus_compliance array One entry per matched control across all frameworks
secberus_compliance[].framework_id string Framework identifier (e.g., pci_dss_v4)
secberus_compliance[].control_id string Control identifier (e.g., 8.4.2)
secberus_compliance[].family string Control family name
secberus_compliance[].similarity float Similarity score (0.0–1.0)
secberus_compliance[].confidence string High, Medium, Low, or Very Low

Key request parameters

Parameter Type Default Description
frameworks array[string] Framework IDs to map against (from Step 1)
min_similarity float Exclude matches below this score (0.01–1.0)
min_confidence string Exclude matches below this level: High, Medium, Low, Very Low. Mutually exclusive with min_similarity.
topk integer 1 Max controls returned per framework/document pair

API Quick Reference

Detail Value
Endpoint POST https://compliance.secberus.ai/v1/map
Auth header authorization: <api-key>
Content-Type application/json
List frameworks GET https://compliance.secberus.ai/v1/frameworks
Similarity range 0.01–1.0
Confidence levels Very Low, Low, Medium, High
Default topk 1
Lambda timeout 60 s recommended