Snowflake
Snowflake is the AI Data Cloud — a cloud data platform that provides data storage, processing, and analytic solutions across AWS, Azure, and GCP. Its unique multi-cluster shared data architecture separates compute from storage, enabling organizations to run diverse workloads concurrently while paying only for what they use. Snowpark enables Python UDFs with full external network access.
Snowflake Integration
Map compliance controls directly from Snowflake SQL using Snowpark External Access. A Python UDF calls /v1/map with outbound HTTP access granted through a Snowflake Network Rule and External Access Integration — no proxy, no middleware, no external pipeline required. The result is a native SQL function you can call in any query, view, or task.
Architecture
Snowflake SQL Query / Task
│
│ SELECT secberus_map(description, ...)
▼
Python UDF (Snowpark)
├─ reads API key from Snowflake Secret
├─ calls POST /v1/map via External Access Integration
└─ returns VARIANT array of matched controls
│
▼
Query Result / Target Table
enriched with compliance control mappings
Step 1 — Discover Available Framework IDs
Before writing any SQL, retrieve the framework IDs you want to map against:
curl -s -H "authorization: $SECBERUS_API_KEY" \
https://compliance.secberus.ai/v1/frameworks \
| jq '.[] | {id, name, region}'
Sample output:
{"id": "pci_dss_v4", "name": "PCI DSS v4.0", "region": "Global"}
{"id": "nist_800_53_r5", "name": "NIST SP 800-53 Rev 5", "region": "US"}
{"id": "nist_csf_v2", "name": "NIST Cybersecurity Framework v2.0", "region": "US"}
{"id": "iso_27001", "name": "ISO/IEC 27001:2022", "region": "Global"}
{"id": "soc2", "name": "SOC 2 Type II", "region": "US"}
{"id": "aicpa_tsc", "name": "AICPA Trust Services Criteria", "region": "US"}
Note the id values you need — you will pass them as arguments to the UDF in Step 6.
Step 2 — Create the Network Rule
A Network Rule allowlists the outbound destination. Run this in your Snowflake worksheet as ACCOUNTADMIN or a role with CREATE NETWORK RULE privilege:
CREATE OR REPLACE NETWORK RULE secberus_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('compliance.secberus.ai:443');
Step 3 — Store the API Key as a Snowflake Secret
Snowflake Secrets store credentials securely and make them accessible to UDFs without hardcoding values in function definitions:
CREATE OR REPLACE SECRET secberus_api_key
TYPE = GENERIC_STRING
SECRET_STRING = 'YOUR_API_KEY_HERE';
Grant usage to the role that will create and call the UDF:
GRANT READ ON SECRET secberus_api_key TO ROLE <your_role>;
Step 4 — Create the External Access Integration
An External Access Integration ties the Network Rule and Secret together and authorizes UDFs to use them:
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION secberus_access_integration
ALLOWED_NETWORK_RULES = (secberus_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (secberus_api_key)
ENABLED = TRUE;
GRANT USAGE ON INTEGRATION secberus_access_integration TO ROLE <your_role>;
Step 5 — Create the Python UDF
Create the UDF that calls /v1/map. The _snowflake module provides access to the Secret defined in Step 3 — the API key never appears in SQL or query history.
CREATE OR REPLACE FUNCTION secberus_map(
description STRING,
frameworks ARRAY,
min_similarity FLOAT,
topk INT
)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
EXTERNAL_ACCESS_INTEGRATIONS = (secberus_access_integration)
SECRETS = ('api_key' = secberus_api_key)
HANDLER = 'map_document'
AS $$
import _snowflake
import json
import urllib.request
import urllib.error
def map_document(description, frameworks, min_similarity, topk):
if not description or not description.strip():
return []
api_key = _snowflake.get_generic_secret_string('api_key')
payload = json.dumps({
"frameworks": list(frameworks),
"min_similarity": min_similarity,
"topk": topk,
"documents": [{"id": "row", "document": description}],
}).encode("utf-8")
req = urllib.request.Request(
"https://compliance.secberus.ai/v1/map",
data=payload,
headers={
"authorization": api_key,
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
result = json.loads(resp.read())
except urllib.error.HTTPError as e:
return [{"error": f"HTTP {e.code}: {e.read().decode()}"}]
except Exception as e:
return [{"error": str(e)}]
controls = []
for fw in result.get("frameworks", []):
for c in fw.get("controls", []):
controls.append({
"framework_id": fw["framework_id"],
"control_id": c["control"]["id"],
"family": c["control"].get("family", ""),
"similarity": c["similarity"],
"confidence": c["confidence"],
})
return controls
$$;
Step 6 — Use the Function in SQL
Basic enrichment
SELECT
id,
description,
secberus_map(description, ARRAY_CONSTRUCT('pci_dss_v4'), 0.3, 3) AS compliance_controls
FROM policies;
Flatten controls into individual rows
Use FLATTEN to expand the returned array into one row per matched control, making it easy to filter and aggregate:
SELECT
p.id,
p.description,
c.value:framework_id::STRING AS framework_id,
c.value:control_id::STRING AS control_id,
c.value:family::STRING AS family,
c.value:similarity::FLOAT AS similarity,
c.value:confidence::STRING AS confidence
FROM policies p,
LATERAL FLATTEN(
input => secberus_map(p.description, ARRAY_CONSTRUCT('pci_dss_v4', 'nist_800_53_r5'), 0.3, 3)
) c
WHERE c.value:confidence::STRING IN ('High', 'Medium');
Write enriched results to a target table
INSERT INTO policy_compliance_mappings
SELECT
p.id AS policy_id,
p.description,
c.value:framework_id::STRING AS framework_id,
c.value:control_id::STRING AS control_id,
c.value:family::STRING AS family,
c.value:similarity::FLOAT AS similarity,
c.value:confidence::STRING AS confidence,
CURRENT_TIMESTAMP() AS mapped_at
FROM policies p,
LATERAL FLATTEN(
input => secberus_map(p.description, ARRAY_CONSTRUCT('pci_dss_v4'), 0.3, 3)
) c;
Top controls across your policy library
SELECT
c.value:framework_id::STRING AS framework_id,
c.value:control_id::STRING AS control_id,
c.value:family::STRING AS family,
COUNT(*) AS policy_count,
AVG(c.value:similarity::FLOAT) AS avg_similarity
FROM policies p,
LATERAL FLATTEN(
input => secberus_map(p.description, ARRAY_CONSTRUCT('pci_dss_v4'), 0.3, 5)
) c
GROUP BY 1, 2, 3
ORDER BY policy_count DESC
LIMIT 20;
Step 7 — Automate with Streams and Tasks (Optional)
Use a Snowflake Stream to detect new or changed rows in your source table, and a Task to run enrichment automatically on a schedule or when new rows arrive.
-- Stream: captures inserts and updates to the policies table
CREATE OR REPLACE STREAM policies_stream ON TABLE policies;
-- Task: runs when the stream has data, enriches new rows into the target table
CREATE OR REPLACE TASK enrich_new_policies
WAREHOUSE = <your_warehouse>
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('policies_stream')
AS
INSERT INTO policy_compliance_mappings
SELECT
p.id,
p.description,
c.value:framework_id::STRING,
c.value:control_id::STRING,
c.value:family::STRING,
c.value:similarity::FLOAT,
c.value:confidence::STRING,
CURRENT_TIMESTAMP()
FROM policies_stream p,
LATERAL FLATTEN(
input => secberus_map(p.description, ARRAY_CONSTRUCT('pci_dss_v4', 'nist_800_53_r5'), 0.3, 3)
) c
WHERE p.METADATA$ACTION = 'INSERT';
-- Resume the task to activate it
ALTER TASK enrich_new_policies RESUME;
Alternative: External Functions
If your account predates External Access Integrations or you prefer routing through a controlled proxy, Snowflake's External Functions feature can also call /v1/map. External Functions route requests through an API Gateway (AWS, Azure, or GCP) which forwards to the Secberus API. This requires more infrastructure but gives you a centralized proxy where you can apply rate limiting or logging.
The Snowflake → API Gateway request format wraps rows as:
{
"data": [
[0, "Password rotation must occur every 90 days."],
[1, "MFA is required for all remote access."]
]
}
Your proxy must return:
{
"data": [
[0, [{"framework_id": "pci_dss_v4", "control_id": "8.3.9", ...}]],
[1, [{"framework_id": "pci_dss_v4", "control_id": "8.4.2", ...}]]
]
}
For most new implementations, Snowpark External Access is the recommended path — it eliminates the proxy entirely and is fully managed within Snowflake.
Configuration Reference
UDF parameters
| Parameter | Type | Description |
|---|---|---|
description |
STRING |
The text field to map (policy statement, control description, etc.) |
frameworks |
ARRAY |
Array of framework IDs from Step 1, e.g. ARRAY_CONSTRUCT('pci_dss_v4') |
min_similarity |
FLOAT |
Minimum similarity threshold (0.01–1.0). Use 0.3 as a starting point. |
topk |
INT |
Max controls returned per framework/document pair (default: 3) |
Fields in the returned VARIANT array
| Field | Type | Description |
|---|---|---|
framework_id |
string | Framework identifier (e.g., pci_dss_v4) |
control_id |
string | Control identifier (e.g., 8.3.9) |
family |
string | Control family name |
similarity |
float | Similarity score (0.0–1.0) |
confidence |
string | High, Medium, Low, or Very Low |
error |
string | Present only when the API call failed |
Snowflake objects created
| Object | Type | Purpose |
|---|---|---|
secberus_network_rule |
Network Rule | Allowlists outbound HTTPS to compliance.secberus.ai |
secberus_api_key |
Secret | Stores the API key securely |
secberus_access_integration |
External Access Integration | Authorizes UDFs to use the network rule and secret |
secberus_map |
Python UDF | Callable SQL function that calls /v1/map |
API Quick Reference
| Detail | Value |
|---|---|
| Endpoint | POST https://compliance.secberus.ai/v1/map |
| Auth header | authorization: <api-key> |
| Content-Type | application/json |
| List frameworks | GET https://compliance.secberus.ai/v1/frameworks |
| Similarity range | 0.01–1.0 |
| Confidence levels | Very Low, Low, Medium, High |
| Default topk | 1 |