Skip to content

Commit

Permalink
FIX: Handle not existing fields with manual mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
kamil-certat committed Jul 8, 2024
1 parent 88effe6 commit 07e44d6
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
13 changes: 7 additions & 6 deletions intelmq/bots/outputs/misp/output_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,13 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic

def _custom_mapping(self, obj: "MISPObject", message: dict):
for object_relation, definition in self.attribute_mapping.items():
obj.add_attribute(
object_relation,
value=message[object_relation],
**self._extract_misp_attribute_kwargs(message, definition),
)
# In case of manual mapping, we want to fail if it produces incorrect values
if object_relation in message:
obj.add_attribute(
object_relation,
value=message[object_relation],
**self._extract_misp_attribute_kwargs(message, definition),
)
# In case of manual mapping, we want to fail if it produces incorrect values

def _generate_feed(self, message: dict = None):
if message:
Expand Down
22 changes: 22 additions & 0 deletions intelmq/tests/bots/outputs/misp/test_output_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,28 @@ def test_attribute_mapping(self):
assert malware_name["value"] == EXAMPLE_EVENT["malware.name"]
assert malware_name["comment"] == EXAMPLE_EVENT["extra.non_ascii"]

def test_attribute_mapping_empty_field(self):
self.run_bot(
parameters={
"attribute_mapping": {
"source.ip": {},
"source.fqdn": {}, # not exists in the message
}
}
)

current_event = open(f"{self.directory.name}/.current").read()
with open(current_event) as f:
objects = json.load(f).get("Event", {}).get("Object", [])

assert len(objects) == 1
attributes = objects[0].get("Attribute")
assert len(attributes) == 1
source_ip = next(
attr for attr in attributes if attr.get("object_relation") == "source.ip"
)
assert source_ip["value"] == "152.166.119.2"

def test_event_separation(self):
self.input_message = [
EXAMPLE_EVENT,
Expand Down

0 comments on commit 07e44d6

Please sign in to comment.