Sumo Logic logo

Sumo Logic

Sumo Logic is a cloud-native log analytics and intelligence platform that provides real-time insights across the entire application lifecycle and stack. It enables DevSecOps teams to monitor, troubleshoot, and secure modern applications through continuous intelligence, with purpose-built security analytics for threat detection and compliance workflows.

Sumo Logic Integration

Enrich Sumo Logic searches with compliance control mappings using a Lookup Table. A Python script calls /v1/map for your policy documents, generates a CSV, and uploads it to a Sumo Logic Lookup Table. The lookup operator then joins incoming log events to their compliance mappings at search time — no external processing required during live searches.

Architecture

Python enrichment script (run on a schedule)
  ├─ calls GET /v1/frameworks  (discover available IDs)
  ├─ calls POST /v1/map        (map your policy documents)
  └─ uploads CSV to Sumo Logic Lookup Table via Management API
          │
          ▼
Sumo Logic Lookup Table
  (policy_text → compliance controls mapping)
          │
          │  | lookup operator at search time
          ▼
Sumo Logic Search Results
  enriched with compliance_controls, framework_id,
  control_id, confidence fields

Step 1 — Discover Available Framework IDs

Before writing any config, retrieve the framework IDs you want to map against:

curl -s -H "authorization: $SECBERUS_API_KEY" \
  https://compliance.secberus.ai/v1/frameworks \
  | jq '.[] | {id, name, region}'

Sample output:

{"id": "pci_dss_v4",     "name": "PCI DSS v4.0",                       "region": "Global"}
{"id": "nist_800_53_r5", "name": "NIST SP 800-53 Rev 5",               "region": "US"}
{"id": "nist_csf_v2",    "name": "NIST Cybersecurity Framework v2.0",  "region": "US"}
{"id": "iso_27001",      "name": "ISO/IEC 27001:2022",                 "region": "Global"}
{"id": "soc2",           "name": "SOC 2 Type II",                      "region": "US"}
{"id": "aicpa_tsc",      "name": "AICPA Trust Services Criteria",      "region": "US"}

Note the id values you need — set them in FRAMEWORKS in the script below.


Step 2 — Create the Lookup Table in Sumo Logic

  1. In Sumo Logic, navigate to Manage Data → Logs → Lookup Tables → Add Lookup Table.

  2. Set the following:

    Field Value
    Name Secberus Compliance Mappings
    Primary key policy_text
    TTL Leave blank (no expiry — the script refreshes the table on each run)
  3. Create the table. Note the Lookup Table ID from the URL — you will need it for the script.


Step 3 — Create the Enrichment Script

This script calls /v1/map for each policy document you define, then uploads the results to your Sumo Logic Lookup Table via the Management API. Run it on a schedule to keep the table current.

#!/usr/bin/env python3

import csv
import io
import json
import os
import sys
import urllib.request
import urllib.error


# ── Configuration ──────────────────────────────────────────────────────────────
SECBERUS_API_KEY    = os.environ["SECBERUS_API_KEY"]
SUMO_ACCESS_ID      = os.environ["SUMO_ACCESS_ID"]
SUMO_ACCESS_KEY     = os.environ["SUMO_ACCESS_KEY"]
SUMO_ENDPOINT       = os.environ.get("SUMO_ENDPOINT", "https://api.sumologic.com/api")
LOOKUP_TABLE_ID     = os.environ["SUMO_LOOKUP_TABLE_ID"]  # from Step 2

FRAMEWORKS          = ["pci_dss_v4", "nist_800_53_r5"]     # update as needed
MIN_SIMILARITY      = 0.3
TOPK                = 3

# ── Policy documents to map ────────────────────────────────────────────────────
# Add one entry per policy statement you want to enrich.
# The "id" must be unique within this list.
POLICY_DOCUMENTS = [
    {"id": "pwd-rotation",  "document": "All user passwords must be changed every 90 days."},
    {"id": "mfa-required",  "document": "Multi-factor authentication is required for all remote access and privileged accounts."},
    {"id": "encryption",    "document": "All data at rest must be encrypted using AES-256. Data in transit must use TLS 1.2 or higher."},
    {"id": "access-review", "document": "User access rights are reviewed quarterly and revoked immediately upon termination."},
    # Add more policy documents here
]
# ──────────────────────────────────────────────────────────────────────────────


def call_map_api(documents):
    payload = json.dumps({
        "frameworks":     FRAMEWORKS,
        "min_similarity": MIN_SIMILARITY,
        "topk":           TOPK,
        "documents":      documents,
    }).encode("utf-8")

    req = urllib.request.Request(
        "https://compliance.secberus.ai/v1/map",
        data=payload,
        headers={
            "authorization": SECBERUS_API_KEY,
            "Content-Type":  "application/json",
        },
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=30) as resp:
        return json.loads(resp.read())


def build_csv(documents, map_response):
    # Build a lookup: doc_id -> list of best controls (one row per doc per framework)
    rows = []
    controls_by_doc = {}
    for fw in map_response.get("frameworks", []):
        for control in fw.get("controls", []):
            doc_id = control["document_id"]
            controls_by_doc.setdefault(doc_id, []).append({
                "framework_id": fw["framework_id"],
                "control_id":   control["control"]["id"],
                "family":       control["control"].get("family", ""),
                "similarity":   control["similarity"],
                "confidence":   control["confidence"],
            })

    doc_text = {d["id"]: d["document"] for d in documents}

    for doc in documents:
        doc_id   = doc["id"]
        controls = controls_by_doc.get(doc_id, [])
        if controls:
            # One CSV row per matched control so the lookup can return the best match
            for c in controls:
                rows.append({
                    "policy_text":  doc_text[doc_id],
                    "framework_id": c["framework_id"],
                    "control_id":   c["control_id"],
                    "family":       c["family"],
                    "similarity":   c["similarity"],
                    "confidence":   c["confidence"],
                })
        else:
            rows.append({
                "policy_text":  doc_text[doc_id],
                "framework_id": "",
                "control_id":   "",
                "family":       "",
                "similarity":   "",
                "confidence":   "",
            })

    buf = io.StringIO()
    writer = csv.DictWriter(
        buf,
        fieldnames=["policy_text", "framework_id", "control_id", "family", "similarity", "confidence"],
    )
    writer.writeheader()
    writer.writerows(rows)
    return buf.getvalue()


def upload_to_sumo(csv_content):
    import base64
    credentials = base64.b64encode(
        f"{SUMO_ACCESS_ID}:{SUMO_ACCESS_KEY}".encode()
    ).decode()

    data = csv_content.encode("utf-8")
    req = urllib.request.Request(
        f"{SUMO_ENDPOINT}/v1/lookupTables/{LOOKUP_TABLE_ID}/upload",
        data=data,
        headers={
            "Authorization":   f"Basic {credentials}",
            "Content-Type":    "text/csv",
            "merge":           "false",   # replace the entire table on each run
        },
        method="PUT",
    )
    with urllib.request.urlopen(req, timeout=30) as resp:
        return json.loads(resp.read())


def main():
    print(f"Mapping {len(POLICY_DOCUMENTS)} documents against {FRAMEWORKS}...")
    result = call_map_api(POLICY_DOCUMENTS)

    warnings = result.get("messages", [])
    for w in warnings:
        print(f"[{w['severity']}] {w['message']}", file=sys.stderr)

    csv_content = build_csv(POLICY_DOCUMENTS, result)
    print(f"Generated {csv_content.count(chr(10)) - 1} lookup rows")

    upload_result = upload_to_sumo(csv_content)
    print(f"Lookup table updated: {upload_result}")


if __name__ == "__main__":
    main()

Environment variables

Variable Description
SECBERUS_API_KEY Your Secberus API key
SUMO_ACCESS_ID Sumo Logic Access ID (from Administration → Security → Access Keys)
SUMO_ACCESS_KEY Sumo Logic Access Key
SUMO_ENDPOINT Your Sumo Logic API endpoint (default: https://api.sumologic.com/api)
SUMO_LOOKUP_TABLE_ID Lookup Table ID from Step 2

Run the script to populate the table for the first time:

export SECBERUS_API_KEY="..."
export SUMO_ACCESS_ID="..."
export SUMO_ACCESS_KEY="..."
export SUMO_LOOKUP_TABLE_ID="..."
python3 secberus_sumo_enricher.py

Step 4 — Use the Lookup in Searches

Once the table is populated, use the lookup operator in any Sumo Logic search to join events to their compliance mappings.

Enrich all security events with their compliance controls:

_sourceCategory=security
| lookup framework_id, control_id, family, confidence
    from sumo://lookup/v1/lookupTables/<LOOKUP_TABLE_ID>
    on description=policy_text
| where !isNull(control_id)
| table _time, description, framework_id, control_id, family, confidence

Count events by framework and confidence level over the last 7 days:

_sourceCategory=security _loglevel=WARNING
| lookup framework_id, confidence
    from sumo://lookup/v1/lookupTables/<LOOKUP_TABLE_ID>
    on description=policy_text
| where !isNull(framework_id)
| count by framework_id, confidence
| sort by _count desc

Alert dashboard: High-confidence PCI DSS matches in the last hour:

_sourceCategory=security
| lookup framework_id, control_id, confidence
    from sumo://lookup/v1/lookupTables/<LOOKUP_TABLE_ID>
    on description=policy_text
| where framework_id = "pci_dss_v4" and confidence = "High"
| count by control_id
| sort by _count desc

Replace <LOOKUP_TABLE_ID> with the ID noted in Step 2. Alternatively, use the lookup table's display name: sumo://lookup/<table-name>.


Step 5 — Automate Lookup Refresh

Schedule the script to run periodically so the lookup table stays current as your policy documents change. A daily or weekly refresh is sufficient for most environments.

cron (Linux/macOS):

# Run daily at 2 AM
0 2 * * * /usr/bin/python3 /opt/secberus/secberus_sumo_enricher.py >> /var/log/secberus_enrich.log 2>&1

AWS Lambda + EventBridge (serverless):

Package the script as a Lambda function and trigger it with an EventBridge scheduled rule (rate(1 day)). Store credentials in AWS Secrets Manager and load them at runtime rather than via environment variables.


Configuration Reference

Columns in the Lookup Table

Column Type Description
policy_text string (primary key) The policy statement text matched against at search time
framework_id string Framework identifier (e.g., pci_dss_v4)
control_id string Control identifier (e.g., 8.3.9)
family string Control family name
similarity float Similarity score from the map API (0.0–1.0)
confidence string High, Medium, Low, or Very Low

Script configuration constants

Constant Default Description
FRAMEWORKS ["pci_dss_v4", "nist_800_53_r5"] Framework IDs to map against (from Step 1)
MIN_SIMILARITY 0.3 Exclude matches below this score
TOPK 3 Max controls returned per framework/document pair
POLICY_DOCUMENTS List of {id, document} objects to enrich

API Quick Reference

Detail Value
Endpoint POST https://compliance.secberus.ai/v1/map
Auth header authorization: <api-key>
Content-Type application/json
List frameworks GET https://compliance.secberus.ai/v1/frameworks
Similarity range 0.01–1.0
Confidence levels Very Low, Low, Medium, High
Default topk 1