This enrichment reads the values of one or more specified message extensions containing either an ip string or a collection of ip strings and produces a new message augmented with the region names of each extension.
The cidr enrichment mapping reports the following messages.
Severity Level | Feature | Message |
---|---|---|
ERROR | region | Cidr failed 'reason' |
** Examples are shown in json for readability. Actual messages will be formatted in AVRO **
{
"dst_ip": "10.1.0.208"
}
{
"dst_ip": "10.1.0.208",
"dst_ip.region": "us-east"
}
{
// zone to match the name with the cidr array
"<zone-name-1>": {
// region name and corresponding array of subnet ips in the format of v4 or v6
"us-east" : ["10.1.0.0/16", "12.3.4.0/24"]
},
// zone to match the name with the cidr array
"<zone-name-2>": {
// region name and corresponding array of subnet ips in the format of v4 or v6
"eu-west": ["::ffff:f01:0/112"]
}
}
Incoming message
{
.....
"ip.src": "10.1.0.1",
"ip.dst": "15.1.0.2",
.....
}
Processed message
{
.....
"ip.src": "10.1.0.1",
"ip.src.region" : "us-east",
"ip.dst": "15.1.0.2",
"ip.dst.region" : "us-west",
.....
}
Property Name | Type | Description | Required/Default | Example |
---|---|---|---|---|
cidr.ip_fields | comma separated list of extension names | Apply geocode enrichment to these extensions | required | ip_dst,ip_src |
cidr.config_file_path | hdfs or local file system uri | The location of the configuration file containing the mapping of region names to the subnet. | required | hdfs:/user/myuser/flink-cyber/cidr/cidr.json |
schema.registry.url | url | Schema registry rest endpoint url | required | http://myregistryhost:7788/api/v1 |
topic.input | topic name | Incoming messages to be enriched. Stored in AVRO Message format managed by schema registry. | required | enrichment.input |
topic.output | topic name | Outgoing enriched messages. Stored in AVRO message format managed by schema registry. | required | enrichment.output |
parallelism | integer | Number of parallel tasks to run. | default=2 | 2 |
checkpoint.interval.ms | integer | Milliseconds between Flink state checkpoints | default=60000 | 10000 |
kafka.bootstrap.servers | comma separated list | Kafka bootstrap server names and ports. | required | brokerhost1:9092,brokerhost2:9092 |
kafka.setting name | Kafka setting | Settings for Kafka producers or Kafka consumer. | set as required by security and performance |
cidr.ip_fields=dst_ip
cidr.config_file_path=hdfs:/user/centos/flink-cyber/cidr/cidr.json
topic.input=enrichment.input
topic.output=enrichment.geo
kafka.bootstrap.servers=<kafka-bootstrap>
schema.registry.url=http://<schema-registry-server>:7788/api/v1
flink run -Dlog4j.configurationFile=enrichment-cidr-log4j.properties --jobmanager yarn-cluster -yjm 1024 -ytm 1024 --detached --yarnname "EnrichmentCidr" flink-enrichment-cidr-0.0.1-SNAPSHOT.jar enrichment-cidr.properties