Splunk
Splunk is a leading data platform for security, observability, and IT operations. The Splunk platform collects, indexes, and correlates machine data in real time from any source, enabling search, monitoring, alerting, and visualization across your entire infrastructure — including on-premises, cloud, and hybrid environments.
Splunk Integration
Enrich Splunk events with compliance control mappings using Splunk's native external lookup feature. An external lookup is a Python script wired into transforms.conf and props.conf — the same transform system Splunk uses for all field extractions and enrichments — so compliance fields become available automatically at search time with no custom commands required.
How It Works
When a search touches an event whose sourcetype has the lookup configured, Splunk passes the event's text field to the Python script, which calls /v1/map in batch and returns compliance fields. Those fields (compliance_controls, compliance_error) are then available like any other field in the event.
Splunk Search
│
│ LOOKUP- stanza in props.conf fires
▼
secberus_map.py (bin/)
├─ reads CSV batch from stdin
├─ calls POST /v1/map (one request per batch)
└─ writes enriched CSV to stdout
│
▼
Fields added to events:
compliance_controls (JSON array of matched controls)
compliance_error (non-empty if the API call failed)
Step 1 — Discover Available Framework IDs
Before configuring anything, retrieve the framework IDs you want to map against:
curl -s -H "authorization: $SECBERUS_API_KEY" \
https://compliance.secberus.ai/v1/frameworks \
| jq '.[] | {id, name, region}'
Sample output:
{"id": "pci_dss_v4", "name": "PCI DSS v4.0", "region": "Global"}
{"id": "nist_800_53_r5", "name": "NIST SP 800-53 Rev 5", "region": "US"}
{"id": "nist_csf_v2", "name": "NIST Cybersecurity Framework v2.0", "region": "US"}
{"id": "iso_27001", "name": "ISO/IEC 27001:2022", "region": "Global"}
{"id": "soc2", "name": "SOC 2 Type II", "region": "US"}
{"id": "aicpa_tsc", "name": "AICPA Trust Services Criteria", "region": "US"}
Note the id values you need — you will set them in local/secberus.conf in Step 3.
Step 2 — Create the Splunk App
Create a minimal Splunk app to hold the script and configuration. Replace secberus_compliance with your preferred app name throughout.
$SPLUNK_HOME/etc/apps/secberus_compliance/
├── default/
│ ├── app.conf
│ ├── transforms.conf
│ └── props.conf
├── local/
│ └── secberus.conf ← API key and options (not committed to source control)
└── bin/
└── secberus_map.py
default/app.conf
[launcher]
author = Secberus
description = Compliance mapping enrichment via Secberus AI API
version = 1.0.0
[ui]
is_visible = false
label = Secberus Compliance
Step 3 — Store the API Key
Create local/secberus.conf with your API key and lookup options. The local/ directory is excluded from app exports and should not be committed to source control.
[secberus]
api_key = YOUR_API_KEY_HERE
frameworks = pci_dss_v4,nist_800_53_r5
source_field = description
min_similarity = 0.3
topk = 3
| Key | Description |
|---|---|
api_key |
Your Secberus API key |
frameworks |
Comma-separated framework IDs from Step 1 |
source_field |
The event field containing the text to map |
min_similarity |
Minimum similarity threshold, 0.01–1.0 |
topk |
Max controls returned per framework/document pair |
Step 4 — Create the Lookup Script
Create bin/secberus_map.py. Splunk calls this script with event data as CSV on stdin and expects enriched CSV on stdout. All records in a batch are sent in a single /v1/map request to minimize API calls.
#!/usr/bin/env python3
import csv
import sys
import json
import os
import configparser
import urllib.request
import urllib.error
def _load_config():
cfg = configparser.ConfigParser()
script_dir = os.path.dirname(os.path.abspath(__file__))
conf_path = os.path.join(script_dir, "..", "local", "secberus.conf")
cfg.read(conf_path)
s = "secberus"
return {
"api_key": cfg.get(s, "api_key", fallback=os.environ.get("SECBERUS_API_KEY", "")),
"frameworks": cfg.get(s, "frameworks", fallback="pci_dss_v4").split(","),
"source_field": cfg.get(s, "source_field", fallback="description"),
"min_similarity": float(cfg.get(s, "min_similarity", fallback="0.3")),
"topk": int(cfg.get(s, "topk", fallback="3")),
}
def _call_map_api(api_key, documents, frameworks, min_similarity, topk):
payload = json.dumps({
"frameworks": frameworks,
"min_similarity": min_similarity,
"topk": topk,
"documents": documents,
}).encode("utf-8")
req = urllib.request.Request(
"https://compliance.secberus.ai/v1/map",
data=payload,
headers={
"authorization": api_key,
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
def main():
config = _load_config()
reader = csv.DictReader(sys.stdin)
rows = list(reader)
if not rows:
writer = csv.DictWriter(
sys.stdout,
fieldnames=(reader.fieldnames or []) + ["compliance_controls", "compliance_error"],
)
writer.writeheader()
return
out_fields = list(reader.fieldnames) + ["compliance_controls", "compliance_error"]
# Build one document per row
source_field = config["source_field"]
documents = [
{"id": str(idx), "document": text}
for idx, row in enumerate(rows)
if (text := row.get(source_field, "").strip())
]
# Single batched API call for all rows in this lookup invocation
compliance_by_idx = {}
api_error = ""
if documents:
try:
result = _call_map_api(
config["api_key"],
documents,
config["frameworks"],
config["min_similarity"],
config["topk"],
)
for fw in result.get("frameworks", []):
for control in fw.get("controls", []):
i = control["document_id"]
compliance_by_idx.setdefault(i, []).append({
"framework_id": fw["framework_id"],
"control_id": control["control"]["id"],
"family": control["control"].get("family", ""),
"similarity": control["similarity"],
"confidence": control["confidence"],
})
except urllib.error.HTTPError as e:
api_error = f"HTTP {e.code}: {e.read().decode()}"
except Exception as e:
api_error = str(e)
writer = csv.DictWriter(sys.stdout, fieldnames=out_fields, extrasaction="ignore")
writer.writeheader()
for idx, row in enumerate(rows):
controls = compliance_by_idx.get(str(idx), [])
row["compliance_controls"] = json.dumps(controls)
row["compliance_error"] = api_error
writer.writerow(row)
if __name__ == "__main__":
main()
Make the script executable:
chmod +x $SPLUNK_HOME/etc/apps/secberus_compliance/bin/secberus_map.py
Step 5 — Configure transforms.conf
default/transforms.conf
[secberus_compliance]
external_cmd = secberus_map.py description
fields_list = description, compliance_controls, compliance_error
external_cmd— the script name followed by the lookup key field (descriptionby default; change this to match yoursource_fieldfromsecberus.conf)fields_list— all fields the script reads from and writes to Splunk
Step 6 — Apply via props.conf
default/props.conf
Add the lookup to the sourcetypes whose events you want enriched. Replace your_sourcetype with the actual sourcetype name (e.g., guardduty, aws:cloudtrail, or a custom one).
[your_sourcetype]
LOOKUP-secberus_compliance = secberus_compliance description OUTPUT compliance_controls, compliance_error
Splunk will automatically run the lookup for any search that touches events of this sourcetype, adding compliance_controls and compliance_error as fields.
Restart Splunk (or bump the app) to pick up the new configuration:
$SPLUNK_HOME/bin/splunk restart
Step 7 — Use in Searches
Once configured, compliance fields are available in any search over the enriched sourcetype.
View all mapped controls on recent events:
index=security sourcetype=your_sourcetype earliest=-1h
| lookup secberus_compliance description OUTPUT compliance_controls
| spath input=compliance_controls path="{}. control_id" output=control_ids
| table _time, description, control_ids
Filter to High-confidence matches only:
index=security sourcetype=your_sourcetype
| lookup secberus_compliance description OUTPUT compliance_controls
| mvexpand compliance_controls
| spath input=compliance_controls
| where confidence="High"
| table _time, description, framework_id, control_id, similarity, confidence
Count events per control across a time window:
index=security sourcetype=your_sourcetype earliest=-7d
| lookup secberus_compliance description OUTPUT compliance_controls
| mvexpand compliance_controls
| spath input=compliance_controls
| stats count by framework_id, control_id, confidence
| sort -count
Surface lookup errors for monitoring:
index=security sourcetype=your_sourcetype
| lookup secberus_compliance description OUTPUT compliance_controls, compliance_error
| where compliance_error!=""
| table _time, description, compliance_error
Configuration Reference
Fields added to each event
| Field | Type | Description |
|---|---|---|
compliance_controls |
JSON string | Array of matched controls across all configured frameworks |
compliance_controls[].framework_id |
string | Framework identifier (e.g., pci_dss_v4) |
compliance_controls[].control_id |
string | Control identifier (e.g., 8.4.2) |
compliance_controls[].family |
string | Control family name |
compliance_controls[].similarity |
float | Similarity score (0.0–1.0) |
compliance_controls[].confidence |
string | High, Medium, Low, or Very Low |
compliance_error |
string | Non-empty when the API call failed; empty on success |
secberus.conf options
| Key | Default | Description |
|---|---|---|
api_key |
— | Secberus API key (required) |
frameworks |
pci_dss_v4 |
Comma-separated framework IDs from Step 1 |
source_field |
description |
Event field containing the text to map |
min_similarity |
0.3 |
Minimum similarity threshold, 0.01–1.0 |
topk |
3 |
Max controls returned per framework/document pair |
Error Handling
Errors are written to compliance_error rather than raising exceptions, so a failing API call does not break the search — events are returned with compliance_controls = [] and compliance_error set to the error message.
Common errors and remediation:
| Error | Cause | Fix |
|---|---|---|
HTTP 403 |
Invalid or missing API key | Check api_key in local/secberus.conf |
HTTP 400 |
Unknown framework ID | Run Step 1 to get valid IDs and update frameworks |
timed out |
Network or API latency | Increase the timeout value in secberus_map.py |
Empty compliance_controls |
No matches above threshold | Lower min_similarity or check source_field value |
API Quick Reference
| Detail | Value |
|---|---|
| Endpoint | POST https://compliance.secberus.ai/v1/map |
| Auth header | authorization: <api-key> |
| Content-Type | application/json |
| List frameworks | GET https://compliance.secberus.ai/v1/frameworks |
| Similarity range | 0.01–1.0 |
| Confidence levels | Very Low, Low, Medium, High |
| Default topk | 1 |