Splunk logo

Splunk

Splunk is a leading data platform for security, observability, and IT operations. The Splunk platform collects, indexes, and correlates machine data in real time from any source, enabling search, monitoring, alerting, and visualization across your entire infrastructure — including on-premises, cloud, and hybrid environments.

Splunk Integration

Enrich Splunk events with compliance control mappings using Splunk's native external lookup feature. An external lookup is a Python script wired into transforms.conf and props.conf — the same transform system Splunk uses for all field extractions and enrichments — so compliance fields become available automatically at search time with no custom commands required.

How It Works

When a search touches an event whose sourcetype has the lookup configured, Splunk passes the event's text field to the Python script, which calls /v1/map in batch and returns compliance fields. Those fields (compliance_controls, compliance_error) are then available like any other field in the event.

Splunk Search
     │
     │  LOOKUP- stanza in props.conf fires
     ▼
secberus_map.py (bin/)
  ├─ reads CSV batch from stdin
  ├─ calls POST /v1/map (one request per batch)
  └─ writes enriched CSV to stdout
     │
     ▼
Fields added to events:
  compliance_controls   (JSON array of matched controls)
  compliance_error      (non-empty if the API call failed)

Step 1 — Discover Available Framework IDs

Before configuring anything, retrieve the framework IDs you want to map against:

curl -s -H "authorization: $SECBERUS_API_KEY" \
  https://compliance.secberus.ai/v1/frameworks \
  | jq '.[] | {id, name, region}'

Sample output:

{"id": "pci_dss_v4",     "name": "PCI DSS v4.0",                       "region": "Global"}
{"id": "nist_800_53_r5", "name": "NIST SP 800-53 Rev 5",               "region": "US"}
{"id": "nist_csf_v2",    "name": "NIST Cybersecurity Framework v2.0",  "region": "US"}
{"id": "iso_27001",      "name": "ISO/IEC 27001:2022",                 "region": "Global"}
{"id": "soc2",           "name": "SOC 2 Type II",                      "region": "US"}
{"id": "aicpa_tsc",      "name": "AICPA Trust Services Criteria",      "region": "US"}

Note the id values you need — you will set them in local/secberus.conf in Step 3.


Step 2 — Create the Splunk App

Create a minimal Splunk app to hold the script and configuration. Replace secberus_compliance with your preferred app name throughout.

$SPLUNK_HOME/etc/apps/secberus_compliance/
├── default/
│   ├── app.conf
│   ├── transforms.conf
│   └── props.conf
├── local/
│   └── secberus.conf       ← API key and options (not committed to source control)
└── bin/
    └── secberus_map.py

default/app.conf

[launcher]
author = Secberus
description = Compliance mapping enrichment via Secberus AI API
version = 1.0.0

[ui]
is_visible = false
label = Secberus Compliance

Step 3 — Store the API Key

Create local/secberus.conf with your API key and lookup options. The local/ directory is excluded from app exports and should not be committed to source control.

[secberus]
api_key        = YOUR_API_KEY_HERE
frameworks     = pci_dss_v4,nist_800_53_r5
source_field   = description
min_similarity = 0.3
topk           = 3
Key Description
api_key Your Secberus API key
frameworks Comma-separated framework IDs from Step 1
source_field The event field containing the text to map
min_similarity Minimum similarity threshold, 0.01–1.0
topk Max controls returned per framework/document pair

Step 4 — Create the Lookup Script

Create bin/secberus_map.py. Splunk calls this script with event data as CSV on stdin and expects enriched CSV on stdout. All records in a batch are sent in a single /v1/map request to minimize API calls.

#!/usr/bin/env python3

import csv
import sys
import json
import os
import configparser
import urllib.request
import urllib.error


def _load_config():
    cfg = configparser.ConfigParser()
    script_dir = os.path.dirname(os.path.abspath(__file__))
    conf_path = os.path.join(script_dir, "..", "local", "secberus.conf")
    cfg.read(conf_path)
    s = "secberus"
    return {
        "api_key":        cfg.get(s, "api_key",        fallback=os.environ.get("SECBERUS_API_KEY", "")),
        "frameworks":     cfg.get(s, "frameworks",     fallback="pci_dss_v4").split(","),
        "source_field":   cfg.get(s, "source_field",   fallback="description"),
        "min_similarity": float(cfg.get(s, "min_similarity", fallback="0.3")),
        "topk":           int(cfg.get(s, "topk",       fallback="3")),
    }


def _call_map_api(api_key, documents, frameworks, min_similarity, topk):
    payload = json.dumps({
        "frameworks":     frameworks,
        "min_similarity": min_similarity,
        "topk":           topk,
        "documents":      documents,
    }).encode("utf-8")

    req = urllib.request.Request(
        "https://compliance.secberus.ai/v1/map",
        data=payload,
        headers={
            "authorization": api_key,
            "Content-Type": "application/json",
        },
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=15) as resp:
        return json.loads(resp.read())


def main():
    config = _load_config()
    reader = csv.DictReader(sys.stdin)
    rows = list(reader)

    if not rows:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=(reader.fieldnames or []) + ["compliance_controls", "compliance_error"],
        )
        writer.writeheader()
        return

    out_fields = list(reader.fieldnames) + ["compliance_controls", "compliance_error"]

    # Build one document per row
    source_field = config["source_field"]
    documents = [
        {"id": str(idx), "document": text}
        for idx, row in enumerate(rows)
        if (text := row.get(source_field, "").strip())
    ]

    # Single batched API call for all rows in this lookup invocation
    compliance_by_idx = {}
    api_error = ""
    if documents:
        try:
            result = _call_map_api(
                config["api_key"],
                documents,
                config["frameworks"],
                config["min_similarity"],
                config["topk"],
            )
            for fw in result.get("frameworks", []):
                for control in fw.get("controls", []):
                    i = control["document_id"]
                    compliance_by_idx.setdefault(i, []).append({
                        "framework_id": fw["framework_id"],
                        "control_id":   control["control"]["id"],
                        "family":       control["control"].get("family", ""),
                        "similarity":   control["similarity"],
                        "confidence":   control["confidence"],
                    })
        except urllib.error.HTTPError as e:
            api_error = f"HTTP {e.code}: {e.read().decode()}"
        except Exception as e:
            api_error = str(e)

    writer = csv.DictWriter(sys.stdout, fieldnames=out_fields, extrasaction="ignore")
    writer.writeheader()

    for idx, row in enumerate(rows):
        controls = compliance_by_idx.get(str(idx), [])
        row["compliance_controls"] = json.dumps(controls)
        row["compliance_error"]    = api_error
        writer.writerow(row)


if __name__ == "__main__":
    main()

Make the script executable:

chmod +x $SPLUNK_HOME/etc/apps/secberus_compliance/bin/secberus_map.py

Step 5 — Configure transforms.conf

default/transforms.conf

[secberus_compliance]
external_cmd = secberus_map.py description
fields_list  = description, compliance_controls, compliance_error
  • external_cmd — the script name followed by the lookup key field (description by default; change this to match your source_field from secberus.conf)
  • fields_list — all fields the script reads from and writes to Splunk

Step 6 — Apply via props.conf

default/props.conf

Add the lookup to the sourcetypes whose events you want enriched. Replace your_sourcetype with the actual sourcetype name (e.g., guardduty, aws:cloudtrail, or a custom one).

[your_sourcetype]
LOOKUP-secberus_compliance = secberus_compliance description OUTPUT compliance_controls, compliance_error

Splunk will automatically run the lookup for any search that touches events of this sourcetype, adding compliance_controls and compliance_error as fields.

Restart Splunk (or bump the app) to pick up the new configuration:

$SPLUNK_HOME/bin/splunk restart

Step 7 — Use in Searches

Once configured, compliance fields are available in any search over the enriched sourcetype.

View all mapped controls on recent events:

index=security sourcetype=your_sourcetype earliest=-1h
| lookup secberus_compliance description OUTPUT compliance_controls
| spath input=compliance_controls path="{}. control_id" output=control_ids
| table _time, description, control_ids

Filter to High-confidence matches only:

index=security sourcetype=your_sourcetype
| lookup secberus_compliance description OUTPUT compliance_controls
| mvexpand compliance_controls
| spath input=compliance_controls
| where confidence="High"
| table _time, description, framework_id, control_id, similarity, confidence

Count events per control across a time window:

index=security sourcetype=your_sourcetype earliest=-7d
| lookup secberus_compliance description OUTPUT compliance_controls
| mvexpand compliance_controls
| spath input=compliance_controls
| stats count by framework_id, control_id, confidence
| sort -count

Surface lookup errors for monitoring:

index=security sourcetype=your_sourcetype
| lookup secberus_compliance description OUTPUT compliance_controls, compliance_error
| where compliance_error!=""
| table _time, description, compliance_error

Configuration Reference

Fields added to each event

Field Type Description
compliance_controls JSON string Array of matched controls across all configured frameworks
compliance_controls[].framework_id string Framework identifier (e.g., pci_dss_v4)
compliance_controls[].control_id string Control identifier (e.g., 8.4.2)
compliance_controls[].family string Control family name
compliance_controls[].similarity float Similarity score (0.0–1.0)
compliance_controls[].confidence string High, Medium, Low, or Very Low
compliance_error string Non-empty when the API call failed; empty on success

secberus.conf options

Key Default Description
api_key Secberus API key (required)
frameworks pci_dss_v4 Comma-separated framework IDs from Step 1
source_field description Event field containing the text to map
min_similarity 0.3 Minimum similarity threshold, 0.01–1.0
topk 3 Max controls returned per framework/document pair

Error Handling

Errors are written to compliance_error rather than raising exceptions, so a failing API call does not break the search — events are returned with compliance_controls = [] and compliance_error set to the error message.

Common errors and remediation:

Error Cause Fix
HTTP 403 Invalid or missing API key Check api_key in local/secberus.conf
HTTP 400 Unknown framework ID Run Step 1 to get valid IDs and update frameworks
timed out Network or API latency Increase the timeout value in secberus_map.py
Empty compliance_controls No matches above threshold Lower min_similarity or check source_field value

API Quick Reference

Detail Value
Endpoint POST https://compliance.secberus.ai/v1/map
Auth header authorization: <api-key>
Content-Type application/json
List frameworks GET https://compliance.secberus.ai/v1/frameworks
Similarity range 0.01–1.0
Confidence levels Very Low, Low, Medium, High
Default topk 1