Skip to content

Conversation

@akshraj-crest
Copy link
Contributor

@akshraj-crest akshraj-crest commented Nov 14, 2025

Proposed commit message

The initial release includes incident data stream, associated dashboards
and visualizations.

IRONSCALES fields are mapped to their corresponding ECS fields where possible.

Test samples were derived from documentation and live data samples,
which were subsequently sanitized.

Checklist

  • I have reviewed tips for building integrations and this pull request is aligned with them.
  • I have verified that all data streams collect metrics or logs.
  • I have added an entry to my package's changelog.yml file.
  • I have verified that Kibana version constraints are current according to guidelines.
  • I have verified that any added dashboard complies with Kibana's Dashboard good practices

How to test this PR locally

  • Clone integrations repo.
  • Install elastic package locally.
  • Start elastic stack using elastic-package.
  • Move to integrations/packages/ironscales directory.
  • Run the following command to run tests.

elastic-package test

2025/11/14 17:51:35  INFO New version is available - v0.116.0. Download from: https://github.com/elastic/elastic-package/releases/tag/v0.116.0
Run asset tests for the package
2025/11/14 17:51:36  INFO License text found in "/root/github-integration/integrations/LICENSE.txt" will be included in package
--- Test results for package: ironscales - START ---
╭────────────┬─────────────┬───────────┬─────────────────────────────────────────────────────────────────────┬────────┬──────────────╮
│ PACKAGE    │ DATA STREAM │ TEST TYPE │ TEST NAME                                                           │ RESULT │ TIME ELAPSED │
├────────────┼─────────────┼───────────┼─────────────────────────────────────────────────────────────────────┼────────┼──────────────┤
│ ironscales │             │ asset     │ dashboard ironscales-10c370de-4a54-41b2-bab7-0b0fdce7f399 is loaded │ PASS   │      1.812µs │
│ ironscales │             │ asset     │ search ironscales-21f03da1-39e9-4bc2-8df2-19c5f78bfb18 is loaded    │ PASS   │        411ns │
│ ironscales │             │ asset     │ search ironscales-cf37f3c8-3e04-4f96-9f1d-05176f4a8561 is loaded    │ PASS   │        407ns │
│ ironscales │ incident    │ asset     │ index_template logs-ironscales.incident is loaded                   │ PASS   │        391ns │
│ ironscales │ incident    │ asset     │ ingest_pipeline logs-ironscales.incident-0.1.0 is loaded            │ PASS   │        300ns │
╰────────────┴─────────────┴───────────┴─────────────────────────────────────────────────────────────────────┴────────┴──────────────╯
--- Test results for package: ironscales - END   ---
Done
Run pipeline tests for the package
--- Test results for package: ironscales - START ---
╭────────────┬─────────────┬───────────┬──────────────────────────────────────────────┬────────┬──────────────╮
│ PACKAGE    │ DATA STREAM │ TEST TYPE │ TEST NAME                                    │ RESULT │ TIME ELAPSED │
├────────────┼─────────────┼───────────┼──────────────────────────────────────────────┼────────┼──────────────┤
│ ironscales │ incident    │ pipeline  │ (ingest pipeline warnings test-incident.log) │ PASS   │ 383.866376ms │
│ ironscales │ incident    │ pipeline  │ test-incident.log                            │ PASS   │ 290.488796ms │
╰────────────┴─────────────┴───────────┴──────────────────────────────────────────────┴────────┴──────────────╯
--- Test results for package: ironscales - END   ---
Done
Run policy tests for the package
--- Test results for package: ironscales - START ---
No test results
--- Test results for package: ironscales - END   ---
Done
Run static tests for the package
--- Test results for package: ironscales - START ---
╭────────────┬─────────────┬───────────┬──────────────────────────┬────────┬──────────────╮
│ PACKAGE    │ DATA STREAM │ TEST TYPE │ TEST NAME                │ RESULT │ TIME ELAPSED │
├────────────┼─────────────┼───────────┼──────────────────────────┼────────┼──────────────┤
│ ironscales │ incident    │ static    │ Verify sample_event.json │ PASS   │ 131.871168ms │
╰────────────┴─────────────┴───────────┴──────────────────────────┴────────┴──────────────╯
--- Test results for package: ironscales - END   ---
Done
Run system tests for the package
2025/11/14 17:51:42  INFO Installing package...
2025/11/14 17:51:42  INFO License text found in "/root/github-integration/integrations/LICENSE.txt" will be included in package
2025/11/14 17:51:54  INFO Running test for data_stream "incident" with configuration 'default'
2025/11/14 17:52:02  INFO Setting up independent Elastic Agent...
2025/11/14 17:52:16  INFO Setting up service...
2025/11/14 17:52:49  INFO Tearing down service...
2025/11/14 17:52:50  INFO Write container logs to file: /root/github-integration/integrations/build/container-logs/ironscales-1763122970540757525.log
2025/11/14 17:52:53  INFO Tearing down agent...
2025/11/14 17:52:53  INFO Write container logs to file: /root/github-integration/integrations/build/container-logs/elastic-agent-1763122973641765384.log
2025/11/14 17:53:02  INFO Uninstalling package...
--- Test results for package: ironscales - START ---
╭────────────┬─────────────┬───────────┬───────────┬────────┬───────────────╮
│ PACKAGE    │ DATA STREAM │ TEST TYPE │ TEST NAME │ RESULT │  TIME ELAPSED │
├────────────┼─────────────┼───────────┼───────────┼────────┼───────────────┤
│ ironscales │ incident    │ system    │ default   │ PASS   │ 55.655959273s │
╰────────────┴─────────────┴───────────┴───────────┴────────┴───────────────╯
--- Test results for package: ironscales - END   ---
Done

Related issues

Screenshots

ironscales_ss_2 image

Go Code for Ingest Pipeline Generation

The incident data stream pipeline is generated using Go code built on top of the Dispear library.
Below is the code used for generating the pipeline logic:

package main

import (
	"fmt"
	"strings"

	. "github.com/efd6/dispear"
)

const (
	ECSVersion = "9.2.0"
	PkgRoot    = "json"
)
const errorFormat = "Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}"

// removeErrorHandler generates a series of Renders that first remove the given field
// from the document and then append a custom error message to the 'error.message' field.
// This is typically used to handle errors in an Ingest Pipeline by removing a field that
// caused an issue and appending a formatted error message to the document.
func removeErrorHandler(f string) []Renderer {
	return []Renderer{
		REMOVE(f),
		APPEND("error.message", errorFormat),
	}
}

// safeNavigateAndCheck converts a dot-separated field path to a safe navigation string.
//
// Example:
// "ironscales.incident.created" -> "ctx.ironscales?.incident?.created"
func safeNavigateAndCheck(field string) string {
	parts := strings.Split(field, ".")
	condition := "ctx"
	for i, part := range parts {
		if i > 0 { // Skip the first part which is already included in the condition
			condition += fmt.Sprintf("?.%s", part)
		} else {
			condition += fmt.Sprintf(".%s", part)
		}
	}
	return condition
}

func main() {

	// Initial processors of pipeline

	DESCRIPTION("Pipeline for processing incident logs.")

	DROP("empty events placeholder").IF("ctx.message == 'empty_events_placeholder'")

	SET("ecs.version").VALUE(ECSVersion)

	TERMINATE("data collection error").
		IF("ctx.error?.message != null && ctx.message == null && ctx.event?.original == null").
		DESCRIPTION("error message set and no data to process.")

	BLANK()
	BLANK().COMMENT("remove agentless metadata")

	REMOVE(
		"organization",
		"division",
		"team",
	).
		IF("ctx.organization instanceof String && ctx.division instanceof String && ctx.team instanceof String").
		IGNORE_MISSING(true).
		TAG("remove_agentless_tags").
		DESCRIPTION("Removes the fields added by Agentless as metadata, as they can collide with ECS fields.")

	BLANK()
	BLANK().COMMENT("parse the event JSON")

	RENAME("message", "event.original").
		IF("ctx.event?.original == null").
		DESCRIPTION("Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.").
		IGNORE_MISSING(true)

	REMOVE("message").
		TAG("remove_message").
		IF("ctx.event?.original != null").
		DESCRIPTION("The `message` field is no longer required if the document has an `event.original` field.").
		IGNORE_MISSING(true)

	JSON(PkgRoot, "event.original")

	// Setting event.* fields

	BLANK()
	BLANK().COMMENT("Set event.* fields")

	SET("event.kind").VALUE("event")

	// Script to rename into snake case

	BLANK()

	BLANK().COMMENT("rename to snake case")

	SCRIPT().
		TAG("script_convert_camelcase_to_snake_case").
		DESCRIPTION("Convert camelCase to snake_case.").
		LANG("painless").
		SOURCE(`
        // Helper function to convert camelCase to snake_case
        String camelToSnake(String str) {
            def result = "";
            for (int i = 0; i < str.length(); i++) {
                char c = str.charAt(i);
                if (Character.isUpperCase(c)) {
                    if (i > 0 && Character.isLowerCase(str.charAt(i - 1))) {
                        result += "_";
                    }
                    result += Character.toLowerCase(c);
                } else {
                    result += c;
                }
            }
            return result;
        }
        // Recursive function to handle nested fields
        def convertToSnakeCase(def obj) {
          if (obj instanceof Map) {
            // Convert each key in the map
            def newObj = [:];
            for (entry in obj.entrySet()) {
              String newKey = camelToSnake(entry.getKey());
              newObj[newKey] = convertToSnakeCase(entry.getValue());
            }
            return newObj;
          } else if (obj instanceof List) {
            // If it's a list, process each item recursively
            def newList = [];
            for (item in obj) {
              newList.add(convertToSnakeCase(item));
            }
            return newList;
          } else {
            return obj;
          }
        }
        // Apply the conversion
        ctx.ironscales = ctx.ironscales ?: [:];
        if (ctx.json != null) {
          ctx.ironscales.incident = convertToSnakeCase(ctx.json);
        }
        // Remove json field
        ctx.remove('json');
		`)

	// Use Date processors

	BLANK()

	BLANK().COMMENT("Date processors")

	for _, field := range []string{
		"ironscales.incident.created",
		"ironscales.incident.first_challenged_date",
		"ironscales.incident.latest_email_date",
		"ironscales.incident.first_reported_date",
	} {
		DATE(field, field, "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'", "yyyy-MM-dd'T'HH:mm:ss'Z'").
			IF(safeNavigateAndCheck(field) + " != null" + " && " + "ctx." + field + " != ''").
			ON_FAILURE(removeErrorHandler(field)...)
	}

	// Convert to Long

	BLANK()

	BLANK().COMMENT("Convert to Long Processors")

	for _, field := range []string{
		"ironscales.incident.links_count",
		"ironscales.incident.attachments_count",
		"ironscales.incident.affected_mailboxes_count",
		"ironscales.incident.comments_count",
		"ironscales.incident.release_request_count",
		"ironscales.incident.federation.companies_affected",
		"ironscales.incident.federation.companies_marked_phishing",
		"ironscales.incident.federation.companies_marked_spam",
		"ironscales.incident.federation.companies_marked_fp",
		"ironscales.incident.federation.companies_unclassified",
	} {
		CONVERT("", field, "long").
			IGNORE_MISSING(true).
			ON_FAILURE(removeErrorHandler(field)...)
	}

	// Convert to long with foreach

	FOREACH("ironscales.incident.attachments",
		CONVERT("", "_ingest._value.file_size", "long").
			IGNORE_MISSING(true).
			ON_FAILURE(removeErrorHandler("_ingest._value.file_size")...),
	).IF("ctx.ironscales?.incident?.attachments instanceof List")

	FOREACH("ironscales.incident.related_incidents",
		CONVERT("", "_ingest._value", "long").
			IGNORE_MISSING(true).
			ON_FAILURE(removeErrorHandler("_ingest._value")...),
	).IF("ctx.ironscales?.incident?.related_incidents instanceof List")

	// Convert to double

	BLANK()
	BLANK().COMMENT("Convert to Double Processors")

	for _, field := range []string{
		"ironscales.incident.themis_proba",
		"ironscales.incident.federation.phishing_ratio",
	} {
		CONVERT("", field, "double").
			IGNORE_MISSING(true).
			ON_FAILURE(removeErrorHandler(field)...)
	}

	// Convert to boolean

	BLANK()
	BLANK().COMMENT("Convert to Boolean Processors")

	for _, field := range []string{
		"ironscales.incident.sender_is_internal",
		"ironscales.incident.reported_by_end_user",
	} {
		CONVERT("", field, "boolean").
			IGNORE_MISSING(true).
			ON_FAILURE(removeErrorHandler(field)...)
	}

	// Convert to String

	BLANK()
	BLANK().COMMENT("Convert to String Processors")

	for _, field := range []string{
		"ironscales.incident.incident_id",
		"ironscales.incident.company_id",
	} {
		CONVERT("", field, "string").
			IGNORE_MISSING(true)
	}

	// Convert to IP

	BLANK()
	BLANK().COMMENT("Convert to IP")

	for _, field := range []string{
		"ironscales.incident.mail_server.ip",
	} {
		CONVERT("", field, "ip").
			IGNORE_MISSING(true).
			IF(safeNavigateAndCheck(field) + " != ''").
			ON_FAILURE(removeErrorHandler(field)...)
	}

	FOREACH("ironscales.incident.reports",
		CONVERT("", "_ingest._value.mail_server.ip", "ip").
			IGNORE_MISSING(true).
			ON_FAILURE(removeErrorHandler("_ingest._value.mail_server.ip")...),
	).IF("ctx.ironscales?.incident?.reports instanceof List")

	// Set ECS Mapping

	BLANK()
	BLANK().COMMENT("Map custom fields to corresponding ECS and related fields.")

	// Map ECS mapping for top-level fields

	for _, mapping := range []struct {
		ecsField, customField string
	}{
		{ecsField: "event.id", customField: "ironscales.incident.incident_id"},
		{ecsField: "email.subject", customField: "ironscales.incident.email_subject"},
		{ecsField: "user.name", customField: "ironscales.incident.assignee"},
		{ecsField: "event.created", customField: "ironscales.incident.created"},
		{ecsField: "organization.id", customField: "ironscales.incident.company_id"},
		{ecsField: "organization.name", customField: "ironscales.incident.company_name"},
		{ecsField: "host.domain", customField: "ironscales.incident.mail_server.host"},
	} {
		SET(mapping.ecsField).COPY_FROM(mapping.customField).IGNORE_EMPTY(true)
	}

	for _, mapping := range []struct {
		ecsField, customField string
	}{
		{ecsField: "email.to.address", customField: "ironscales.incident.recipient_email"},
		{ecsField: "email.from.address", customField: "ironscales.incident.sender_email"},
		{ecsField: "email.reply_to.address", customField: "ironscales.incident.reply_to"},
		{ecsField: "host.ip", customField: "ironscales.incident.mail_server.ip"},
	} {
		APPEND(mapping.ecsField, "{{{"+mapping.customField+"}}}").
			IF(safeNavigateAndCheck(mapping.customField) + " != null").
			ALLOW_DUPLICATES(false)
	}

	// Map ECS mapping for array fields

	FOREACH("ironscales.incident.links",
		APPEND("url.full", "{{{_ingest._value.url}}}").
			ALLOW_DUPLICATES(false),
	).IF("ctx.ironscales?.incident?.links instanceof List")

	FOREACH("ironscales.incident.attachments",
		APPEND("email.attachments.file.name", "{{{_ingest._value.file_name}}}").
			ALLOW_DUPLICATES(false),
	).IF("ctx.ironscales?.incident?.attachments instanceof List")

	FOREACH("ironscales.incident.attachments",
		APPEND("email.attachments.file.hash.md5", "{{{_ingest._value.md5}}}").
			ALLOW_DUPLICATES(false),
	).IF("ctx.ironscales?.incident?.attachments instanceof List")

	// Map related mapppings for top level fields

	for _, mapping := range []struct {
		ecsField, customField string
	}{
		{ecsField: "related.user", customField: "ironscales.incident.recipient_email"},
		{ecsField: "related.user", customField: "ironscales.incident.recipient_name"},
		{ecsField: "related.user", customField: "ironscales.incident.assignee"},
		{ecsField: "related.user", customField: "ironscales.incident.sender_name"},
		{ecsField: "related.user", customField: "ironscales.incident.sender_email"},
		{ecsField: "related.user", customField: "ironscales.incident.resolved_by"},
		{ecsField: "related.user", customField: "ironscales.incident.reply_to"},
		{ecsField: "related.user", customField: "ironscales.incident.reporter_name"},
		{ecsField: "related.hosts", customField: "ironscales.incident.mail_server.host"},
		{ecsField: "related.ip", customField: "ironscales.incident.mail_server.ip"},
	} {
		APPEND(mapping.ecsField, "{{{"+mapping.customField+"}}}").
			IF(safeNavigateAndCheck(mapping.customField) + " != null").
			ALLOW_DUPLICATES(false)
	}

	// Map related mapppings for array fields

	FOREACH("ironscales.incident.reports",
		APPEND("related.user", "{{{_ingest._value.name}}}").
			ALLOW_DUPLICATES(false),
	).IF("ctx.ironscales?.incident?.reports instanceof List")

	FOREACH("ironscales.incident.reports",
		APPEND("related.user", "{{{_ingest._value.email}}}").
			ALLOW_DUPLICATES(false),
	).IF("ctx.ironscales?.incident?.reports instanceof List")

	FOREACH("ironscales.incident.reports",
		APPEND("related.user", "{{{_ingest._value.sender_email}}}").
			ALLOW_DUPLICATES(false),
	).IF("ctx.ironscales?.incident?.reports instanceof List")

	FOREACH("ironscales.incident.reports",
		APPEND("related.hosts", "{{{_ingest._value.mail_server.host}}}").
			ALLOW_DUPLICATES(false),
	).IF("ctx.ironscales?.incident?.reports instanceof List")

	FOREACH("ironscales.incident.reports",
		APPEND("related.ip", "{{{_ingest._value.mail_server.ip}}}").
			ALLOW_DUPLICATES(false),
	).IF("ctx.ironscales?.incident?.reports instanceof List")

	// Remove Duplicate Fields.

	BLANK()
	BLANK().COMMENT("Remove Duplicate Custom Field if preserve_duplicate_custom_fields are not enabled")

	// Processor to remove duplicate tags from array fields

	FOREACH("ironscales.incident.links",
		REMOVE(
			"_ingest._value.url",
		).
			TAG("remove_custom_duplicate_fields_from_ironscales_incident_links_array").
			IGNORE_MISSING(true),
	).IF("ctx.ironscales?.incident?.links instanceof List && (ctx.tags == null || !ctx.tags.contains('preserve_duplicate_custom_fields'))")

	FOREACH("ironscales.incident.attachments",
		REMOVE(
			"_ingest._value.file_name",
			"_ingest._value.md5",
		).
			TAG("remove_custom_duplicate_fields_from_ironscales_incident_attachments_array").
			IGNORE_MISSING(true),
	).IF("ctx.ironscales?.incident?.attachments instanceof List && (ctx.tags == null || !ctx.tags.contains('preserve_duplicate_custom_fields'))")

	REMOVE(
		"ironscales.incident.incident_id",
		"ironscales.incident.email_subject",
		"ironscales.incident.recipient_email",
		"ironscales.incident.assignee",
		"ironscales.incident.sender_email",
		"ironscales.incident.created",
		"ironscales.incident.company_id",
		"ironscales.incident.company_name",
		"ironscales.incident.reply_to",
		"ironscales.incident.mail_server.host",
		"ironscales.incident.mail_server.ip",
	).
		IF("ctx.tags == null || !ctx.tags.contains('preserve_duplicate_custom_fields')").
		TAG("remove_custom_duplicate_fields").
		IGNORE_MISSING(true)

	BLANK()
	BLANK().COMMENT("Remove `ironscales.incident.affected_mailbox_count` as it is same as `ironscales.incident.affected_mailboxes_count`")

	REMOVE("ironscales.incident.affected_mailbox_count").
		IGNORE_MISSING(true)

	// Clean up script

	BLANK()
	BLANK().COMMENT("Cleanup")

	SCRIPT().
		TAG("script_to_drop_null_values").
		DESCRIPTION("This script processor iterates over the whole document to remove fields with null values.").
		LANG("painless").
		SOURCE(`
		void handleMap(Map map) {
		map.values().removeIf(v -> {
			if (v instanceof Map) {
			handleMap(v);
			} else if (v instanceof List) {
			handleList(v);
			}
			return v == null || v == '' || (v instanceof Map && v.size() == 0) || (v instanceof List && v.size() == 0)
		});
		}
		void handleList(List list) {
		list.removeIf(v -> {
			if (v instanceof Map) {
			handleMap(v);
			} else if (v instanceof List) {
			handleList(v);
			}
			return v == null || v == '' || (v instanceof Map && v.size() == 0) || (v instanceof List && v.size() == 0)
		});
		}
		handleMap(ctx);
		`)

	// Set and Append processor on last

	SET("event.kind").VALUE("pipeline_error").IF("ctx.error?.message != null")
	APPEND("tags", "preserve_original_event").
		IF("ctx.error?.message != null").
		ALLOW_DUPLICATES(false)

	// Global on failure processor

	ON_FAILURE(
		APPEND("error.message", errorFormat),
		SET("event.kind").VALUE("pipeline_error"),
		APPEND("tags", "preserve_original_event").
			ALLOW_DUPLICATES(false),
	)

	// Generate the pipeline

	Generate()
}

@akshraj-crest akshraj-crest requested a review from a team as a code owner November 14, 2025 12:26
@andrewkroh andrewkroh added dashboard Relates to a Kibana dashboard bug, enhancement, or modification. documentation Improvements or additions to documentation. Applied to PRs that modify *.md files. Integration:ironscales [Integration not found in source] New Integration Issue or pull request for creating a new integration package. labels Nov 14, 2025
@akshraj-crest akshraj-crest marked this pull request as draft November 15, 2025 14:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dashboard Relates to a Kibana dashboard bug, enhancement, or modification. documentation Improvements or additions to documentation. Applied to PRs that modify *.md files. Integration:ironscales [Integration not found in source] New Integration Issue or pull request for creating a new integration package.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[New Integration] IronScales

2 participants