-
Notifications
You must be signed in to change notification settings - Fork 600
feat: ESQL query validation against Elastic cluster #4955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
||
rule_integrations = meta.get("integration", []) | ||
if rule_integrations: | ||
for integration in rule_integrations: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simple style fix, replacing if
condition with a more robust default value condition via
rule_integrations = meta.get("integration") or []
package = value | ||
|
||
if package in list(package_manifest): | ||
if package in package_manifest: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small style fix
detection_rules/rule_validators.py
Outdated
|
||
log(f"Got query columns: {', '.join(query_column_names)}") | ||
|
||
# FIXME: validate the dynamic columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The columns returned from the cluster must be validated against the input mapping, and the dynamic fields checked for validity.
at the moment (before any field validation) the test marks 33 rules out of 75 as invalid. The tests were executed against a vanilla local The many errors are most probably because of the bugs in the code, so I expect the number of invalid rules to go down after those are fixed. full log
|
Updated to include initial dynamic field validation. This will parse the schema(s) for dynamic fields and perform some initial formatting check. It checks if the field has a proper prefix as described in #4909, and if the field is based on a field that is present in the schema. However, additional validation will be needed if we want to validate the proper types for ES|QL function and operator return values. https://www.elastic.co/docs/reference/query-languages/esql/esql-functions-operators Additionally, a number of the errors seen in the above testing are due to schema updates that do not have the required fields. For instance. Next steps are:
Note after discussion with @Mikaayenson we determined that the sub-field of the dynamic query does not need to have ecs enforcement here. E.g. For |
#5151 has merged, which unblocks this PR and it is now ready for review. |
|
||
def get_column_from_index_mapping_schema(keys: list[str], current_schema: dict[str, Any] | None) -> str | None: | ||
"""Recursively traverse the schema to find the type of the column.""" | ||
key = keys[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pull out just the first? OR Why pass in all keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is necessary because this is a recursive method, if one did not pass all of the keys it would not be able to pop the keys as the method performs recursions.
The line:
get_column_from_index_mapping_schema(keys[1:], current_schema=column.get("properties"))
Needs the additional keys in order to function.
def combine_dicts(dest: dict[Any, Any], src: dict[Any, Any]) -> None: | ||
"""Combine two dictionaries recursively.""" | ||
for k, v in src.items(): | ||
if k in dest and isinstance(dest[k], dict) and isinstance(v, dict): | ||
combine_dicts(dest[k], v) # type: ignore[reportUnknownVariableType] | ||
else: | ||
dest[k] = v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple thoughts on this method:
-
It might be good to start with a copy of
dest
to ensure non-mutation since currently it's modifying thedest
dict in-place.
e.g.merged = dest.copy()
-
I would also returned
merged
as new merged dictionary. -
Do we need support for iterable type (like list / set)? I think right now its just overwriting the existing in
dest
fromsrc
instead of actually merging.
Finally small nit: what do you think about renaming this to something more cononical like deep_merge
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 1 and 2:
In effect then, is your suggestion to perform this copy in a non-recursive way? Having the recursive function create copies at every level of recursion would be quite inefficient.
In its current state the goal of the function is to mutate the original (combination as accomplished by merge in this case), so if the goal is to:
ensure non-mutation
Then this changes the goal of the function. E.g. combine via copy instead of the current goal which is combine via merge.
Fine to switch, but just wanting to make sure I am not missing something.
For 3
For our use case of parsing integration fields yamls (example for those not familiar), this behavior is acceptable. We are using the flat_schema_to_index_mapping
function out put and/or direct output from Kibana. In this way, it changes the types from the integrations yaml from list/set to nested dictionaries. However, given the more generic naming of the function and the general implication that it can be used on all dictionaries, it makes sense that we should add this support if we want to keep it as a general purpose function.
View-rule with remote validation --> 🟢python -m detection_rules view-rule rules/linux/discovery_port_scanning_activity_from_compromised_host.toml
Loaded config file: /Users/shashankks/elastic_workspace/detection-rules/.detection-rules-cfg.json
█▀▀▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄ ▄ █▀▀▄ ▄ ▄ ▄ ▄▄▄ ▄▄▄
█ █ █▄▄ █ █▄▄ █ █ █ █ █ █▀▄ █ █▄▄▀ █ █ █ █▄▄ █▄▄
█▄▄▀ █▄▄ █ █▄▄ █▄▄ █ ▄█▄ █▄█ █ ▀▄█ █ ▀▄ █▄▄█ █▄▄ █▄▄ ▄▄█
{
"author": [
"Elastic"
],
"description": "This rule detects potential port scanning activity from a compromised host. Port scanning is a common reconnaissance technique used by attackers to identify open ports and services on a target system. A compromised host may exhibit port scanning behavior when an attacker is attempting to map out the network topology, identify vulnerable services, or prepare for further exploitation. This rule identifies potential port scanning activity by monitoring network connection attempts from a single host to a large number of ports within a short time frame. ESQL rules have limited fields available in its alert documents. Make sure to review the original documents to aid in the investigation of this alert.",
"from": "now-61m",
"interval": "1h",
"language": "esql",
"license": "Elastic License v2",
"name": "Potential Port Scanning Activity from Compromised Host",
"note": " ## Triage and analysis\n\n> **Disclaimer**:\n> This investigation guide was created using generative AI technology and has been reviewed to improve its accuracy and relevance. While every effort has been made to ensure its quality, we recommend validating the content and adapting it to suit your specific environment and operational needs.\n\n### Investigating Potential Port Scanning Activity from Compromised Host\n\nPort scanning is a reconnaissance method used by attackers to identify open ports and services on a network, often as a precursor to exploitation. In Linux environments, compromised hosts may perform rapid connection attempts to numerous ports, signaling potential scanning activity. The detection rule identifies such behavior by analyzing network logs for a high number of distinct port connections from a single host within a short timeframe, indicating possible malicious intent.\n\n### Possible investigation steps\n\n- Review the network logs to identify the specific host exhibiting the port scanning behavior by examining the destination.ip and process.executable fields.\n- Analyze the @timestamp field to determine the exact time frame of the scanning activity and correlate it with any other suspicious activities or alerts from the same host.\n- Investigate the process.executable field to understand which application or service initiated the connection attempts, and verify if it is a legitimate process or potentially malicious.\n- Check the destination.port field to identify the range and types of ports targeted by the scanning activity, which may provide insights into the attacker's objectives or the services they are interested in.\n- Assess the host's security posture by reviewing recent changes, installed software, and user activity to determine if the host has been compromised or if the scanning is part of legitimate network operations.\n- Consult the original documents and logs for additional context and details that may not be captured in the alert to aid in a comprehensive investigation.\n\n### False positive analysis\n\n- Legitimate network scanning tools used by system administrators for network maintenance or security assessments can trigger this rule. To handle this, identify and whitelist the IP addresses or processes associated with these tools.\n- Automated vulnerability scanners or monitoring systems that perform regular checks on network services may cause false positives. Exclude these systems by creating exceptions for their known IP addresses or process names.\n- High-volume legitimate services that open multiple connections to different ports, such as load balancers or proxy servers, might be flagged. Review and exclude these services by specifying their IP addresses or process executables.\n- Development or testing environments where frequent port scanning is part of routine operations can be mistakenly identified. Implement exceptions for these environments by excluding their specific network segments or host identifiers.\n- Scheduled network discovery tasks that are part of IT operations can mimic port scanning behavior. Document and exclude these tasks by setting up time-based exceptions or identifying their unique process signatures.\n\n### Response and remediation\n\n- Isolate the compromised host from the network immediately to prevent further scanning and potential lateral movement.\n- Terminate any suspicious processes identified by the process.executable field to halt ongoing malicious activities.\n- Conduct a thorough review of the compromised host's system logs and network traffic to identify any unauthorized access or data exfiltration attempts.\n- Patch and update all software and services on the compromised host to close any vulnerabilities that may have been exploited.\n- Change all credentials associated with the compromised host and any potentially affected systems to prevent unauthorized access.\n- Monitor the network for any further signs of scanning activity or other suspicious behavior from other hosts, indicating potential additional compromises.\n- Escalate the incident to the security operations team for further investigation and to determine if additional systems are affected.\n",
"query": "from logs-endpoint.events.network-*\n| where\n @timestamp > now() - 1h and\n host.os.type == \"linux\" and\n event.type == \"start\" and\n event.action == \"connection_attempted\" and\n not (\n cidr_match(destination.ip, \"127.0.0.0/8\", \"::1\", \"FE80::/10\", \"FF00::/8\") or\n process.executable in (\n \"/opt/dbtk/bin/jsvc\", \"/usr/lib/dotnet/dotnet\", \"/usr/share/elasticsearch/jdk/bin/java\", \"/usr/sbin/haproxy\",\n \"/usr/bin/java\", \"/opt/kaspersky/kesl/libexec/kesl\", \"/usr/bin/dotnet\", \"/opt/java/openjdk/bin/java\"\n ) or\n process.executable like \"/var/opt/kaspersky/kesl/*kesl\" or\n process.executable like \"/usr/lib/jvm/*/java\" or\n process.executable like \"/opt/google/chrome*\" or\n process.executable like \"/var/lib/docker/*/java\" or\n process.executable like \"/usr/lib64/jvm/*/java\" or\n process.executable like \"/snap/*\" or\n process.executable like \"/home/*/.local/share/JetBrains/*\"\n )\n| keep\n @timestamp,\n host.os.type,\n event.type,\n event.action,\n destination.port,\n process.executable,\n destination.ip,\n agent.id,\n host.name\n| stats\n Esql.event_count = count(),\n Esql.destination_port_count_distinct = count_distinct(destination.port),\n Esql.agent_id_count_distinct = count_distinct(agent.id),\n Esql.host_name_values = values(host.name),\n Esql.agent_id_values = values(agent.id)\n by process.executable, destination.ip\n| where\n Esql.agent_id_count_distinct == 1 and\n Esql.destination_port_count_distinct > 100\n| sort Esql.event_count asc\n| limit 100\n",
"related_integrations": [
{
"package": "endpoint",
"version": "^9.0.0"
}
],
"required_fields": [
{
"ecs": false,
"name": "Esql.agent_id_count_distinct",
"type": "long"
},
{
"ecs": false,
"name": "Esql.agent_id_values",
"type": "keyword"
},
{
"ecs": false,
"name": "Esql.destination_port_count_distinct",
"type": "long"
},
{
"ecs": false,
"name": "Esql.event_count",
"type": "long"
},
{
"ecs": false,
"name": "Esql.host_name_values",
"type": "keyword"
},
{
"ecs": true,
"name": "destination.ip",
"type": "ip"
},
{
"ecs": true,
"name": "process.executable",
"type": "keyword"
}
],
"risk_score": 21,
"rule_id": "6b341d03-1d63-41ac-841a-2009c86959ca",
"setup": "## Setup\n\nThis rule requires data coming in from Elastic Defend.\n\n### Elastic Defend Integration Setup\nElastic Defend is integrated into the Elastic Agent using Fleet. Upon configuration, the integration allows the Elastic Agent to monitor events on your host and send data to the Elastic Security app.\n\n#### Prerequisite Requirements:\n- Fleet is required for Elastic Defend.\n- To configure Fleet Server refer to the [documentation](https://www.elastic.co/guide/en/fleet/current/fleet-server.html).\n\n#### The following steps should be executed in order to add the Elastic Defend integration on a Linux System:\n- Go to the Kibana home page and click \"Add integrations\".\n- In the query bar, search for \"Elastic Defend\" and select the integration to see more details about it.\n- Click \"Add Elastic Defend\".\n- Configure the integration name and optionally add a description.\n- Select the type of environment you want to protect, either \"Traditional Endpoints\" or \"Cloud Workloads\".\n- Select a configuration preset. Each preset comes with different default settings for Elastic Agent, you can further customize these later by configuring the Elastic Defend integration policy. [Helper guide](https://www.elastic.co/guide/en/security/current/configure-endpoint-integration-policy.html).\n- We suggest selecting \"Complete EDR (Endpoint Detection and Response)\" as a configuration setting, that provides \"All events; all preventions\"\n- Enter a name for the agent policy in \"New agent policy name\". If other agent policies already exist, you can click the \"Existing hosts\" tab and select an existing policy instead.\nFor more details on Elastic Agent configuration settings, refer to the [helper guide](https://www.elastic.co/guide/en/fleet/8.10/agent-policy.html).\n- Click \"Save and Continue\".\n- To complete the integration, select \"Add Elastic Agent to your hosts\" and continue to the next section to install the Elastic Agent on your hosts.\nFor more details on Elastic Defend refer to the [helper guide](https://www.elastic.co/guide/en/security/current/install-endpoint.html).\n",
"severity": "low",
"tags": [
"Domain: Endpoint",
"OS: Linux",
"Use Case: Threat Detection",
"Tactic: Discovery",
"Data Source: Elastic Defend",
"Resources: Investigation Guide"
],
"threat": [
{
"framework": "MITRE ATT&CK",
"tactic": {
"id": "TA0007",
"name": "Discovery",
"reference": "https://attack.mitre.org/tactics/TA0007/"
},
"technique": [
{
"id": "T1046",
"name": "Network Service Discovery",
"reference": "https://attack.mitre.org/techniques/T1046/"
}
]
}
],
"timestamp_override": "event.ingested",
"type": "esql",
"version": 7
}
detection-rules on esql-field-validation [$?⇣] is 📦 v1.5.0 via 🐍 v3.12.8 (.venv) on ☁️ [email protected] took 16s
❯
5a876e0d-d39a-49b9-8ad8-19c9b622203b: Index `test-sentinel_one_cloud_funnel-url1760010011451` created: {'acknowledged': True, 'shards_acknowledged': True, 'index': 'test-sentinel_one_cloud_funnel-url1760010011451'}
5a876e0d-d39a-49b9-8ad8-19c9b622203b: Index `test-rule-ecs-index1760010011451` created: {'acknowledged': True, 'shards_acknowledged': True, 'index': 'test-rule-ecs-index1760010011451'}
5a876e0d-d39a-49b9-8ad8-19c9b622203b: Index `test-rule-non-ecs-index1760010011451` created: {'acknowledged': True, 'shards_acknowledged': True, 'index': 'test-rule-non-ecs-index1760010011451'}
5a876e0d-d39a-49b9-8ad8-19c9b622203b: Executing a query against `rule-test-index-1760010011451, test-logs-1760010011451, test-endpoint-action_responses1760010011451, test-endpoint-actions1760010011451, test-endpoint-alerts1760010011451, test-endpoint-api1760010011451, test-endpoint-collection1760010011451, test-endpoint-file1760010011451, test-endpoint-heartbeat1760010011451, test-endpoint-library1760010011451, test-endpoint-metadata1760010011451, test-endpoint-metrics1760010011451, test-endpoint-network1760010011451, test-endpoint-policy1760010011451, test-endpoint-process1760010011451, test-endpoint-registry1760010011451, test-endpoint-security1760010011451, test-system-application1760010011451, test-system-auth1760010011451, test-system-core1760010011451, test-system-cpu1760010011451, test-system-diskio1760010011451, test-system-filesystem1760010011451, test-system-fsstat1760010011451, test-system-load1760010011451, test-system-memory1760010011451, test-system-network1760010011451, test-system-process1760010011451, test-system-process_summary1760010011451, test-system-security1760010011451, test-system-socket_summary1760010011451, test-system-syslog1760010011451, test-system-system1760010011451, test-system-uptime1760010011451, test-windows-applocker_exe_and_dll1760010011451, test-windows-applocker_msi_and_script1760010011451, test-windows-applocker_packaged_app_deployment1760010011451, test-windows-applocker_packaged_app_execution1760010011451, test-windows-forwarded1760010011451, test-windows-perfmon1760010011451, test-windows-powershell1760010011451, test-windows-powershell_operational1760010011451, test-windows-service1760010011451, test-windows-sysmon_operational1760010011451, test-windows-windows_defender1760010011451, test-auditd_manager-auditd1760010011451, test-m365_defender-alert1760010011451, test-m365_defender-event1760010011451, test-m365_defender-incident1760010011451, test-m365_defender-vulnerability1760010011451, test-m365_defender-latest_cdr_vulnerabilities1760010011451, test-crowdstrike-alert1760010011451, test-crowdstrike-falcon1760010011451, test-crowdstrike-fdr1760010011451, test-crowdstrike-host1760010011451, test-crowdstrike-vulnerability1760010011451, test-sentinel_one_cloud_funnel-command_script1760010011451, test-sentinel_one_cloud_funnel-cross_process1760010011451, test-sentinel_one_cloud_funnel-dns1760010011451, test-sentinel_one_cloud_funnel-event1760010011451, test-sentinel_one_cloud_funnel-file1760010011451, test-sentinel_one_cloud_funnel-indicators1760010011451, test-sentinel_one_cloud_funnel-ip1760010011451, test-sentinel_one_cloud_funnel-logins1760010011451, test-sentinel_one_cloud_funnel-module1760010011451, test-sentinel_one_cloud_funnel-process1760010011451, test-sentinel_one_cloud_funnel-registry1760010011451, test-sentinel_one_cloud_funnel-scheduled_task1760010011451, test-sentinel_one_cloud_funnel-threat_intelligence_indicators1760010011451, test-sentinel_one_cloud_funnel-url1760010011451, test-rule-ecs-index1760010011451, test-rule-non-ecs-index1760010011451`
/Users/shashankks/elastic_workspace/detection-rules/detection_rules/index_mappings.py:254: ElasticsearchWarning: No limit defined, adding default limit of [1000]
response = elastic_client.esql.query(query=query)
{
"api_key" : "MASKED",
"cloud_id": "E2ERelease_90:dXMtd2VzdDIuZ2NwLmVsYXN0aWMtY2xvdWQuY29tOjQ0MyQ1NDhmOGRmOTMyMGM0ZTA2OTViOGMxOTdiNGMyZjhhYSQ5NmNiYzI4OWE3N2I0MjBmYWM5OWY0NzFkNWVjNGJjMQ==",
"provider_type": "basic",
"provider_name": "cloud-basic"
}
74f45152-9aee-11ef-b0a5-f661ea17fbcd: Validating against 9.2.0 stack
74f45152-9aee-11ef-b0a5-f661ea17fbcd: Extracted indices from query: logs-aws.cloudtrail*
74f45152-9aee-11ef-b0a5-f661ea17fbcd: Extracted Event Dataset integrations from query: logs-aws.cloudtrail*
74f45152-9aee-11ef-b0a5-f661ea17fbcd: Collected mappings: 0
74f45152-9aee-11ef-b0a5-f661ea17fbcd: Combined mappings prepared: 54 |
…tection-rules into esql-field-validation
Great question! Yes this is expected from some additions in a368516. We need to validate the rule against all of the stack versions in the stack schema map to check for rules that should be min stacked. Since we are building the index mappings directly from the integrations rather than installing them in the stack, we can test the schemas from the various different stack versions against a single version of Kibana. See #5151 (comment) for an example. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As such, I have tested the commands used and the code changes. Some questions on implementations was clarified with justifications from Eric.
My major concern around is the time it takes to execute python -m detection_rules dev test esql-remote-validation
. We are making it run on PR(S) and push to main and protected branches. This will have significant increase in developer productivity time. Was the timing aspect considered during these discussions? Given that we will see increase in ESQL rule adoption , I only see an increase in execution time.
Yes timing was considered, but unfortunately by nature the remote validation process will be quite time consuming (and increasingly so as more rules are added). For some additional background, there are four approaches that can be taken that we considered for ES|QL validation.
|
Updated workflow to use env wrapping for secrets in if statements. See https://github.com/orgs/community/discussions/26726 for context/rationale. |
For the CI workflow see example runs below:
|
name: ES|QL Validation | ||
on: | ||
push: | ||
branches: [ "main", "8.*", "9.*" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not sure we want this to run outside of PRs because its expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I think my only concern would be backport testing then, but given that we are checking everything in the stack schema map anyway each time, I think it would be unlikely that we would miss something. The case I can think of would be when we introduce a min-stack and then have a case where the fork is no longer tested.
Again I agree it is probably not worth the expense, just adding context.
def validate_columns_index_mapping( | ||
self, query_columns: list[dict[str, str]], combined_mappings: dict[str, Any], version: str = "" | ||
) -> bool: | ||
"""Validate that the columns in the ESQL query match the provided mappings.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this validating every field that comes back from Kibana or just the fields used in the query?
Pull Request
Issue link(s):
Summary - What I changed
As a note to reviewers, the entry point when validating a given rule is through
remote_validate_rule
.Another note, in some integrations (specifically Okta) there are fields defined in the integration where the mapping is not directly supported in the stack. See details below for an example. Fleet handles these cases by removing the offending fields. As such, this PR proposes a similar process. See
find_nested_multifields
for the core logic for identifying these offending fields.Details
When using the Okta mapping as-is, one would receive the following error:
We can see in the integration YAML
(Relevant Snippet)
logOnlySecurityData is a keyword but has fields, behaviors is a field of logOnlySecurityData and is also a keyword, but is also has fields like New_City which is not allowed according to the error message.
When installing the integration through fleet, one can see that it strips the sub-fields under behaviors.
We also see a similar issue with flattened objects having sub fields, where for instance in auditd manager
paths
is defined as flattened, but also has fields, which is not supported in an index mapping as fleet also discards it (see image below).Paths in auditd yml
Example sub field of flattened paths in auditd yml
How To Test
.detection-rules-cfg.yml
) or from the environment variablesOnce you have the environment variables setup and stack ready, you can test the remote validation with the following command:
python -m pytest tests/test_rules_remote.py::TestRemoteRules::test_esql_rules -s -v
Note,
-v
is optional but provides useful debugging information.Also, test remote validation with the rule loader through view-rule via the following:
export DR_REMOTE_ESQL_VALIDATION=True python -m detection_rules view-rule rules/linux/discovery_port_scanning_activity_from_compromised_host.toml
Can also use the following commands to test all ESQL rules:
python -m detection_rules dev test esql-remote-validation --verbosity 1
Checklist
bug
,enhancement
,schema
,maintenance
,Rule: New
,Rule: Deprecation
,Rule: Tuning
,Hunt: New
, orHunt: Tuning
so guidelines can be generatedmeta:rapid-merge
label if planning to merge within 24 hoursContributor checklist