Skip to content

Commit edfc434

Browse files
authored
[CYB-60] Support enrichment storage configuration in all places where enrichments are used (#17)
1 parent bd205b0 commit edfc434

File tree

47 files changed

+611
-804
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+611
-804
lines changed

flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/enrichment/stix/parsing/ParsedThreatIntelligence.java

-73
This file was deleted.

flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/enrichment/stix/parsing/ThreatIntelligenceDetails.java

-67
This file was deleted.

flink-cyber/flink-enrichment/flink-enrichment-combined/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@
9696
<groupId>org.apache.flink</groupId>
9797
<artifactId>flink-streaming-java</artifactId>
9898
</dependency>
99+
<dependency>
100+
<groupId>org.apache.flink</groupId>
101+
<artifactId>flink-clients</artifactId>
102+
<version>${flink.version}</version>
103+
<scope>provided</scope>
104+
</dependency>
99105
</dependencies>
100106

101107

flink-cyber/flink-enrichment/flink-enrichment-combined/src/main/java/com/cloudera/cyber/enrichment/EnrichmentJob.java

+46-30
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,23 @@
1616
import com.cloudera.cyber.commands.EnrichmentCommand;
1717
import com.cloudera.cyber.commands.EnrichmentCommandResponse;
1818
import com.cloudera.cyber.enrichemnt.stellar.StellarEnrichmentJob;
19-
import com.cloudera.cyber.enrichment.geocode.IpGeo;
2019
import com.cloudera.cyber.enrichment.cidr.IpRegionCidr;
20+
import com.cloudera.cyber.enrichment.geocode.IpGeo;
2121
import com.cloudera.cyber.enrichment.hbase.HbaseJob;
2222
import com.cloudera.cyber.enrichment.hbase.HbaseJobRawKafka;
23+
import com.cloudera.cyber.enrichment.hbase.config.EnrichmentsConfig;
2324
import com.cloudera.cyber.enrichment.lookup.LookupJob;
2425
import com.cloudera.cyber.enrichment.lookup.config.EnrichmentConfig;
2526
import com.cloudera.cyber.enrichment.lookup.config.EnrichmentKind;
2627
import com.cloudera.cyber.enrichment.rest.RestLookupJob;
2728
import com.cloudera.cyber.enrichment.threatq.ThreatQConfig;
28-
import com.cloudera.cyber.enrichment.threatq.ThreatQEntry;
2929
import com.cloudera.cyber.enrichment.threatq.ThreatQJob;
3030
import com.cloudera.cyber.flink.FlinkUtils;
3131
import com.cloudera.cyber.scoring.ScoredMessage;
3232
import com.cloudera.cyber.scoring.ScoringJob;
3333
import com.cloudera.cyber.scoring.ScoringRuleCommand;
3434
import com.cloudera.cyber.scoring.ScoringRuleCommandResult;
3535
import lombok.extern.slf4j.Slf4j;
36-
import org.apache.flink.api.common.functions.MapFunction;
3736
import org.apache.flink.api.java.tuple.Tuple2;
3837
import org.apache.flink.api.java.utils.ParameterTool;
3938
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -46,12 +45,10 @@
4645
import java.util.Arrays;
4746
import java.util.List;
4847

49-
import static com.cloudera.cyber.enrichment.geocode.IpGeoJob.PARAM_ASN_DATABASE_PATH;
50-
import static com.cloudera.cyber.enrichment.geocode.IpGeoJob.PARAM_ASN_FIELDS;
51-
import static com.cloudera.cyber.enrichment.geocode.IpGeoJob.PARAM_GEO_DATABASE_PATH;
52-
import static com.cloudera.cyber.enrichment.geocode.IpGeoJob.PARAM_GEO_FIELDS;
5348
import static com.cloudera.cyber.enrichment.cidr.IpRegionCidrJob.PARAM_CIDR_CONFIG_PATH;
5449
import static com.cloudera.cyber.enrichment.cidr.IpRegionCidrJob.PARAM_CIDR_IP_FIELDS;
50+
import static com.cloudera.cyber.enrichment.geocode.IpGeoJob.*;
51+
import static com.cloudera.cyber.enrichment.hbase.HbaseJob.PARAMS_ENRICHMENT_CONFIG;
5552

5653
@Slf4j
5754
public abstract class EnrichmentJob {
@@ -93,40 +90,47 @@ protected StreamExecutionEnvironment createPipeline(ParameterTool params) throws
9390
Arrays.asList(params.getRequired(PARAM_CIDR_IP_FIELDS).split(",")),
9491
params.getRequired(PARAM_CIDR_CONFIG_PATH)) : asnEnriched;
9592

96-
Tuple2<SingleOutputStreamOperator<Message>, DataStream<EnrichmentCommandResponse>> enriched = LookupJob.enrich(localEnrichments, cidrEnriched, enrichmentConfigs);
93+
Tuple2<DataStream<Message>, DataStream<EnrichmentCommandResponse>> enriched = LookupJob.enrich(localEnrichments, cidrEnriched, enrichmentConfigs);
9794

9895
DataStream<EnrichmentCommandResponse> enrichmentCommandResponses = enriched.f1;
9996

100-
// write the hbase enrichments to hbase
101-
if (params.getBoolean(PARAMS_ENABLE_HBASE, true)) {
102-
DataStream<EnrichmentCommandResponse> hbaseEnrichmentResponses = new HbaseJobRawKafka().writeEnrichments(env, params, hbaseEnrichments);
103-
if (enrichmentCommandResponses != null) {
104-
enrichmentCommandResponses = enriched.f1.union(hbaseEnrichmentResponses);
105-
} else {
106-
enrichmentCommandResponses = hbaseEnrichmentResponses;
107-
}
97+
boolean hbaseEnabled = params.getBoolean(PARAMS_ENABLE_HBASE, true);
98+
boolean threatqEnabled = params.getBoolean(PARAMS_ENABLE_THREATQ, true);
99+
100+
EnrichmentsConfig enrichmentsStorageConfig = null;
101+
if (hbaseEnabled || threatqEnabled) {
102+
enrichmentsStorageConfig = EnrichmentsConfig.load(params.getRequired(PARAMS_ENRICHMENT_CONFIG));
108103
}
109104

110-
writeEnrichmentQueryResults(env, params, enrichmentCommandResponses);
111105

112-
DataStream<Message> hbased = params.getBoolean(PARAMS_ENABLE_HBASE, true) ?
113-
HbaseJob.enrich(enriched.f0, enrichmentConfigs) : enriched.f0;
106+
DataStream<Message> hbased = hbaseEnabled ?
107+
HbaseJob.enrich(enriched.f0, enrichmentConfigs, enrichmentsStorageConfig) : enriched.f0;
114108

115109
// rest based enrichments
116110
DataStream<Message> rested = params.getBoolean(PARAMS_ENABLE_REST, true) ?
117111
RestLookupJob.enrich(hbased, params.getRequired(PARAMS_REST_CONFIG_FILE)) : hbased;
118112

119113
// Run threatQ integrations
120114
DataStream<Message> tqed;
121-
if (params.getBoolean(PARAMS_ENABLE_THREATQ, true)) {
115+
DataStream<EnrichmentCommand> threatqEnrichments = null;
116+
if (threatqEnabled) {
122117
List<ThreatQConfig> threatQconfigs = ThreatQJob.parseConfigs(Files.readAllBytes(Paths.get(params.getRequired(PARAMS_THREATQ_CONFIG_FILE))));
123118
log.info("ThreatQ Configs {}", threatQconfigs);
124-
tqed = ThreatQJob.enrich(rested, threatQconfigs);
125-
ThreatQJob.ingest(createThreatQSource(env, params), threatQconfigs);
119+
tqed = ThreatQJob.enrich(rested, threatQconfigs, enrichmentsStorageConfig);
120+
threatqEnrichments = createThreatQSource(env, params);
126121
} else {
127122
tqed = rested;
128123
}
129124

125+
DataStream<EnrichmentCommandResponse> hbaseTqEnrichResults = null;
126+
if (hbaseEnabled || threatqEnabled) {
127+
hbaseTqEnrichResults = writeHbaseThreatQEnrichmentsToHbaseAndRespond(params, env, hbaseEnrichments, threatqEnrichments, enrichmentsStorageConfig);
128+
}
129+
130+
131+
// write the local, hbase, and threatq responses to the output topic
132+
writeEnrichmentQueryResults(env, params, unionStreams(enrichmentCommandResponses, hbaseTqEnrichResults));
133+
130134
DataStream<Message> stellarStream;
131135
if (params.getBoolean(PARAMS_ENABLE_STELLAR, true)) {
132136
String configDir = params.getRequired(StellarEnrichmentJob.PARAMS_CONFIG_DIR);
@@ -139,21 +143,37 @@ protected StreamExecutionEnvironment createPipeline(ParameterTool params) throws
139143

140144
// disabled by default - NOT IMPLEMENTED
141145
DataStream<Message> ruled = params.getBoolean(PARAMS_ENABLE_RULES, false) ?
142-
doRules(stellarStream, params) : stellarStream;
146+
doRules(stellarStream) : stellarStream;
143147

144148
DataStream<ScoredMessage> scoring = doScoring(ruled, env, params);
145149

146150
writeResults(env, params, scoring);
147151
return env;
148152
}
149153

154+
private DataStream<EnrichmentCommandResponse> writeHbaseThreatQEnrichmentsToHbaseAndRespond(ParameterTool params, StreamExecutionEnvironment env,
155+
DataStream<EnrichmentCommand> hbaseEnrichments, DataStream<EnrichmentCommand> threatqEnrichments,
156+
EnrichmentsConfig enrichmentsStorageConfig) {
157+
158+
// write the threatq and hbase enrichments to hbase
159+
return new HbaseJobRawKafka().writeEnrichments(env, params, unionStreams(hbaseEnrichments, threatqEnrichments), enrichmentsStorageConfig);
160+
}
161+
162+
private static<T> DataStream<T> unionStreams(DataStream<T> stream1, DataStream<T> stream2) {
163+
if (stream1 != null && stream2 != null) {
164+
return stream1.union(stream2);
165+
} else if (stream1 != null) {
166+
return stream1;
167+
} else {
168+
return stream2;
169+
}
170+
}
150171

151172
/**
152173
* @param in Messages incoming for rules processing
153-
* @param params Global Job Parameters
154174
* @return incoming stream for now. Not implemented.
155175
*/
156-
private DataStream<Message> doRules(DataStream<Message> in, ParameterTool params) {
176+
private DataStream<Message> doRules(DataStream<Message> in) {
157177
return in;
158178
}
159179

@@ -172,11 +192,7 @@ private DataStream<ScoredMessage> doScoring(DataStream<Message> in, StreamExecut
172192

173193
protected abstract void writeEnrichmentQueryResults(StreamExecutionEnvironment env, ParameterTool params, DataStream<EnrichmentCommandResponse> sideOutput);
174194

175-
protected abstract DataStream<ThreatQEntry> createThreatQSource(StreamExecutionEnvironment env, ParameterTool params);
176-
177-
protected MapFunction<Message, Message> getLongTermLookupFunction() {
178-
return null;
179-
}
195+
protected abstract DataStream<EnrichmentCommand> createThreatQSource(StreamExecutionEnvironment env, ParameterTool params);
180196

181197
protected abstract DataStream<ScoringRuleCommand> createRulesSource(StreamExecutionEnvironment env, ParameterTool params);
182198

flink-cyber/flink-enrichment/flink-enrichment-combined/src/main/java/com/cloudera/cyber/enrichment/EnrichmentJobKafka.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import com.cloudera.cyber.Message;
1616
import com.cloudera.cyber.commands.EnrichmentCommand;
1717
import com.cloudera.cyber.commands.EnrichmentCommandResponse;
18-
import com.cloudera.cyber.enrichment.threatq.ThreatQEntry;
1918
import com.cloudera.cyber.enrichment.threatq.ThreatQParserFlatMap;
2019
import com.cloudera.cyber.flink.FlinkUtils;
2120
import com.cloudera.cyber.flink.Utils;
@@ -44,7 +43,6 @@ public class EnrichmentJobKafka extends EnrichmentJob {
4443
private static final String PARAMS_GROUP_ID = "group.id";
4544
private static final String DEFAULT_GROUP_ID = "enrichment-combined";
4645
public static final String SCORING_RULES_GROUP_ID = "scoring-rules";
47-
private static final String DEFAULT_TI_TABLE = "threatIntelligence";
4846
private static final String PARAMS_TOPIC_THREATQ_INPUT = "threatq.topic.input";
4947

5048

@@ -81,7 +79,7 @@ protected void writeEnrichmentQueryResults(StreamExecutionEnvironment env, Param
8179
}
8280

8381
@Override
84-
protected DataStream<ThreatQEntry> createThreatQSource(StreamExecutionEnvironment env, ParameterTool params) {
82+
protected DataStream<EnrichmentCommand> createThreatQSource(StreamExecutionEnvironment env, ParameterTool params) {
8583
String topic = params.getRequired(PARAMS_TOPIC_THREATQ_INPUT);
8684
String groupId = "threatq-parser";
8785

flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/enrichment_json.md

+33
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,39 @@ The majestic_million enrichment is ingested in batch only.
127127
}
128128
```
129129

130+
### Reserved Enrichment Types
131+
There are two reserved enrichment names: threatq and first_seen. Reserved enrichments define a mapping to the hbase table, column family and storage format.
132+
The keys and values are defined by the enrichment and profile jobs.
133+
134+
* The first_seen enrichment mapping is ignored. Specify the hbase table and format using the profile properties file settings.
135+
136+
* If there is no specific mapping for threatq enrichment, the default table and format is used.
137+
138+
* To store threatq enrichments in a different table and format, override the threatq enrichment. In the example below the threatq enrichment is store in the threatq table
139+
and cf column family with the HBASE_METRON format.
140+
```
141+
{
142+
"storageConfigs": {
143+
"default": {
144+
"format": "HBASE_METRON",
145+
"hbaseTableName": "enrich_default",
146+
"columnFamily": "cf"
147+
},
148+
"threatq": {
149+
"format": "HBASE_METRON",
150+
"hbaseTableName": "threatq",
151+
"columnFamily": "cf"
152+
}
153+
},
154+
"enrichmentConfigs": {
155+
"threatq" : {
156+
"storage": "threatq",
157+
"fieldMapping": {
158+
}
159+
}
160+
}
161+
```
162+
130163
## EnrichmentsConfig Json
131164

132165
| Json Field | Type | Description | Required/Default |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.cloudera.cyber.enrichment.hbase;
2+
3+
import com.cloudera.cyber.enrichment.hbase.config.EnrichmentStorageConfig;
4+
import com.cloudera.cyber.hbase.LookupKey;
5+
6+
import java.io.Serializable;
7+
8+
public interface EnrichmentLookupBuilder extends Serializable {
9+
LookupKey build(EnrichmentStorageConfig storageConfig, String enrichmentType, String fieldValue);
10+
}

0 commit comments

Comments
 (0)