Logstash
Logstash is an open-source server-side data processing pipeline that ingests data from multiple sources simultaneously, transforms it, and routes it to your preferred destination. Part of the Elastic Stack, it handles a wide range of inputs — logs, metrics, web applications, data stores, and cloud services — through a rich library of plugins.
Logstash Integration
Enrich security events with compliance control mappings inside a Logstash pipeline using the logstash-filter-http plugin. The plugin calls /v1/map inline as events flow through the filter stage, then a Ruby block extracts the matched controls into discrete fields before the event is forwarded to its output.
Step 1 — Discover Available Framework IDs
Before writing any pipeline config, retrieve the framework IDs you want to map against:
curl -s -H "authorization: $SECBERUS_API_KEY" \
https://compliance.secberus.ai/v1/frameworks \
| jq '.[] | {id, name, region}'
Sample output:
{"id": "pci_dss_v4", "name": "PCI DSS v4.0", "region": "Global"}
{"id": "nist_800_53_r5", "name": "NIST SP 800-53 Rev 5", "region": "US"}
{"id": "nist_csf_v2", "name": "NIST Cybersecurity Framework v2.0", "region": "US"}
{"id": "iso_27001", "name": "ISO/IEC 27001:2022", "region": "Global"}
{"id": "soc2", "name": "SOC 2 Type II", "region": "US"}
{"id": "aicpa_tsc", "name": "AICPA Trust Services Criteria", "region": "US"}
Note the id values for the frameworks you want — you will set them in the pipeline config in Step 4.
Step 2 — Store the API Key in the Logstash Keystore
The Logstash keystore keeps secrets out of pipeline config files. Create the keystore (if it doesn't already exist) and add your API key:
bin/logstash-keystore create
bin/logstash-keystore add SECBERUS_API_KEY
Reference it in pipeline config as ${SECBERUS_API_KEY}.
Step 3 — Install the HTTP Filter Plugin
bin/logstash-plugin install logstash-filter-http
Verify the installation:
bin/logstash-plugin list logstash-filter-http
Step 4 — Configure the Pipeline
Add the following filter block to your pipeline config. Adjust description to the field in your events that contains the text to map, and update the frameworks array with the IDs from Step 1.
filter {
# Only enrich events that carry a non-empty description field
if [description] and [description] != "" {
http {
url => "https://compliance.secberus.ai/v1/map"
verb => "POST"
headers => {
"authorization" => "${SECBERUS_API_KEY}"
"Content-Type" => "application/json"
}
body_format => "json"
body => {
"frameworks" => ["pci_dss_v4", "nist_800_53_r5"] # update as needed
"min_similarity" => 0.3
"topk" => 3
"documents" => [
{
"id" => "event"
"document" => "%{description}"
}
]
}
target_body => "[@metadata][secberus_response]"
target_headers => "[@metadata][secberus_headers]"
}
# Extract matched controls from the API response into a structured array
ruby {
code => '
response = event.get("[@metadata][secberus_response]")
next unless response.is_a?(Hash) && response["frameworks"]
controls = []
response["frameworks"].each do |fw|
(fw["controls"] || []).each do |c|
controls << {
"framework_id" => fw["framework_id"],
"control_id" => c["control"]["id"],
"family" => c["control"]["family"].to_s,
"similarity" => c["similarity"],
"confidence" => c["confidence"]
}
end
end
event.set("compliance_controls", controls)
event.remove("[@metadata][secberus_headers]")
'
}
}
}
How it works
- The
httpfilter POSTs the event'sdescriptionfield to/v1/mapand stores the full JSON response in[@metadata][secberus_response](metadata is never forwarded to outputs). - The
rubyblock walks the response and buildscompliance_controls— a flat array of matched controls across all frameworks. - Events with no
description, or where the API call fails, pass through unmodified.
Step 5 — Verify the Output
Run Logstash against a single test event to confirm enrichment is working:
echo '{"description": "All user passwords must be rotated every 90 days."}' \
| bin/logstash -e '
input { stdin { codec => json } }
filter { /* paste your filter block here */ }
output { stdout { codec => rubydebug } }
'
A successfully enriched event will include a compliance_controls array:
{
"description": "All user passwords must be rotated every 90 days.",
"compliance_controls": [
{
"framework_id": "pci_dss_v4",
"control_id": "8.3.9",
"family": "Identify Users and Authenticate Access",
"similarity": 0.89,
"confidence": "High"
},
{
"framework_id": "nist_800_53_r5",
"control_id": "IA-5",
"family": "Identification and Authentication",
"similarity": 0.84,
"confidence": "High"
}
]
}
Complete Pipeline Example
A minimal but functional pipeline reading from Beats, enriching with compliance mappings, and forwarding to Elasticsearch:
input {
beats {
port => 5044
}
}
filter {
if [description] and [description] != "" {
http {
url => "https://compliance.secberus.ai/v1/map"
verb => "POST"
headers => {
"authorization" => "${SECBERUS_API_KEY}"
"Content-Type" => "application/json"
}
body_format => "json"
body => {
"frameworks" => ["pci_dss_v4"]
"min_similarity" => 0.3
"topk" => 3
"documents" => [{"id" => "event", "document" => "%{description}"}]
}
target_body => "[@metadata][secberus_response]"
}
ruby {
code => '
response = event.get("[@metadata][secberus_response]")
next unless response.is_a?(Hash) && response["frameworks"]
controls = []
response["frameworks"].each do |fw|
(fw["controls"] || []).each do |c|
controls << {
"framework_id" => fw["framework_id"],
"control_id" => c["control"]["id"],
"family" => c["control"]["family"].to_s,
"similarity" => c["similarity"],
"confidence" => c["confidence"]
}
end
end
event.set("compliance_controls", controls)
'
}
}
}
output {
elasticsearch {
hosts => ["https://your-es-host:9200"]
index => "logs-security-%{+YYYY.MM.dd}"
user => "${ES_USER}"
password => "${ES_PASSWORD}"
}
}
Configuration Reference
http filter options used
| Option | Description |
|---|---|
url |
The Secberus API endpoint |
verb |
Must be POST |
headers |
Pass API key in authorization; set Content-Type to application/json |
body_format |
Set to json to serialize the body hash as JSON |
body |
Request payload; use %{field_name} to interpolate event fields |
target_body |
Field to store the parsed JSON response; use [@metadata][...] to prevent forwarding to outputs |
Fields added to each event
| Field | Type | Description |
|---|---|---|
compliance_controls |
array | Matched controls across all configured frameworks |
compliance_controls[].framework_id |
string | Framework identifier (e.g., pci_dss_v4) |
compliance_controls[].control_id |
string | Control identifier (e.g., 8.3.9) |
compliance_controls[].family |
string | Control family name |
compliance_controls[].similarity |
float | Similarity score (0.0–1.0) |
compliance_controls[].confidence |
string | High, Medium, Low, or Very Low |
Key request parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
frameworks |
array[string] | — | Framework IDs to map against (from Step 1) |
min_similarity |
float | — | Exclude matches below this score (0.01–1.0) |
min_confidence |
string | — | Exclude below this level: High, Medium, Low, Very Low. Mutually exclusive with min_similarity. |
topk |
integer | 1 | Max controls per framework/document pair |
API Quick Reference
| Detail | Value |
|---|---|
| Endpoint | POST https://compliance.secberus.ai/v1/map |
| Auth header | authorization: <api-key> |
| Content-Type | application/json |
| List frameworks | GET https://compliance.secberus.ai/v1/frameworks |
| Similarity range | 0.01–1.0 |
| Confidence levels | Very Low, Low, Medium, High |
| Default topk | 1 |