Skip to content

Commit

Permalink
FIX: More generic way of using different (de)-serializers
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Waldbauer <[email protected]>
  • Loading branch information
waldbauer-certat committed May 31, 2022
1 parent 5697faf commit 5215a89
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 92 deletions.
9 changes: 4 additions & 5 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import inspect
import io
import json
import msgpack
import logging
import os
import re
Expand Down Expand Up @@ -99,6 +98,7 @@ class Bot(object):
statistics_host: str = "127.0.0.1"
statistics_password: Optional[str] = None
statistics_port: int = 6379
pipeline_use_packer: str = os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')

_message_processed_verb: str = 'Processed'

Expand Down Expand Up @@ -808,7 +808,7 @@ def __init_logger(self):

def __log_configuration_parameter(self, config_name: str, option: str, value: Any):
if "password" in option or "token" in option:
value = "HIDDEN"
value = "<redacted>"

message = "{} configuration: parameter {!r} loaded with value {!r}." \
.format(config_name.title(), option, value)
Expand Down Expand Up @@ -1369,9 +1369,8 @@ def export_event(self, event: libmessage.Event,
if 'raw' in event:
del event['raw']
if return_type is str:
return event.to_json(hierarchical=self.hierarchical,
with_type=self.with_type,
jsondict_as_string=self.jsondict_as_string)
return event.to_pack("JSON", hierarchical=self.hierarchical,
with_type=self.with_type)
else:
retval = event.to_dict(hierarchical=self.hierarchical,
with_type=self.with_type,
Expand Down
11 changes: 10 additions & 1 deletion intelmq/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,13 @@ class UnserializationError(IntelMQException, ValueError):
"""
def __init__(self, exception: Exception = None, object: bytes = None):
self.object = object
super().__init__("Could not unserialize message%s." % exception)
super().__init__("Could not unserialize message, %s." % exception)


class SerializationError(IntelMQException, ValueError):
"""
Unrecoverable error during message serialization
"""
def __init__(self, exception: Exception = None, object: bytes = None):
self.object = object
super().__init__("Could not serialize message, %s." % exception)
85 changes: 61 additions & 24 deletions intelmq/lib/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
import warnings
from collections import defaultdict
from typing import Any, Dict, Iterable, Optional, Sequence, Union
import msgpack

import intelmq.lib.exceptions as exceptions
import intelmq.lib.harmonization
from intelmq import HARMONIZATION_CONF_FILE
from intelmq.lib import utils

__all__ = ['Event', 'Message', 'MessageFactory', 'Report']
__all__ = ['Event', 'Message', 'MessageFactory', 'Report', 'Packer', 'PackerMsgPack', 'PackerJSON']
VALID_MESSSAGE_TYPES = ('Event', 'Message', 'Report')

try:
import msgpack
except:
msgpack = None


class MessageFactory(object):
"""
Expand Down Expand Up @@ -60,7 +64,7 @@ def from_dict(message: dict, harmonization=None,

@staticmethod
def unserialize(raw_message: bytes, harmonization: dict = None,
default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict:
default_type: Optional[str] = None, use_packer: str = "MsgPack") -> dict:
"""
Takes JSON-encoded Message object, returns instance of correct class.
Expand All @@ -78,14 +82,13 @@ def unserialize(raw_message: bytes, harmonization: dict = None,
default_type=default_type)

@staticmethod
def serialize(message) -> bytes:
def serialize(message, use_packer: str = 'MsgPack') -> bytes:
"""
Takes instance of message-derived class and makes JSON-encoded Message.
The class is saved in __type attribute.
"""
raw_message = Message.serialize(message)
return raw_message
return Message.serialize(message, use_packer=use_packer)


class Message(dict):
Expand Down Expand Up @@ -309,30 +312,29 @@ def deep_copy(self):
harmonization={self.__class__.__name__.lower(): self.harmonization_config})

def __str__(self):
return self.serialize(use_packer="json")
return self.serialize(use_packer="JSON")

def serialize(self, use_packer: str = "msgpack"):
def serialize(self, use_packer: str = "MsgPack"):
delete_type = False
if '__type' not in self:
delete_type = True
self['__type'] = self.__class__.__name__

if use_packer == "json":
packed = json.dumps(self)
else:
packed = msgpack.packb(self)
try:
packer: Packer = getattr(intelmq.lib.message, f"Packer{use_packer}")()
packed = packer.serialize(data=self)
except Exception as exc:
raise exceptions.SerializationError(exception=exc, object=self)

if delete_type:
del self['__type']
return packed

@staticmethod
def unserialize(message: bytes, use_packer: str = "msgpack"):
def unserialize(message: bytes, use_packer: str = "MsgPack"):
try:
if use_packer == "json":
return json.loads(message)
else:
return msgpack.unpackb(message, raw=False)
packer: Packer = getattr(intelmq.lib.message, f"Packer{use_packer}")()
return packer.unserialize(data=message)
except Exception as exc:
raise exceptions.UnserializationError(exception=exc, object=message)

Expand Down Expand Up @@ -485,13 +487,13 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False,

return new_dict

def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False):
json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
return json.dumps(json_dict, ensure_ascii=False, sort_keys=True)

def to_msgpack(self, hierarchical=False, with_type=False):
msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
return msgpack.packb(msgpack_dict)
def to_pack(self, use_packer="MsgPack", hierarchical=False, with_type=False, **kwargs):
try:
packer: Packer = getattr(intelmq.lib.message, f"Packer{use_packer}")()
data = self.to_dict(hierarchical=hierarchical, with_type=with_type)
return packer.serialize(data, **kwargs)
except Exception as exc:
raise exceptions.SerializationError(exception=exc, object=self)

def __eq__(self, other: dict) -> bool:
"""
Expand Down Expand Up @@ -590,3 +592,38 @@ def copy(self):
if 'time.observation' in retval and 'time.observation' not in self:
del retval['time.observation']
return retval


class Packer():
def __init__(self) -> None:
pass

def serialize(self, data: bytes, **kwargs):
raise NotImplementedError()

def unserialize(self, data: bytes, **kwargs):
raise NotImplementedError()


class PackerMsgPack(Packer):
def __init__(self) -> None:
if msgpack is None:
raise exceptions.MissingDependencyError("msgpack")
super().__init__()

def serialize(self, data, **kwargs):
return msgpack.packb(data, **kwargs)

def unserialize(self, data, **kwargs):
return msgpack.unpackb(data, raw=False, **kwargs)


class PackerJSON(Packer):
def __init__(self) -> None:
super().__init__()

def serialize(self, data, **kwargs):
return json.dumps(data, **kwargs)

def unserialize(self, data, **kwargs):
return json.loads(data, **kwargs)
4 changes: 2 additions & 2 deletions intelmq/lib/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def setUpClass(cls):
elif cls.bot_type != 'collector' and cls.default_input_message == '':
cls.default_input_message = {'__type': 'Event'}
if type(cls.default_input_message) is dict:
cls.default_input_message = msgpack.packb(cls.default_input_message)
cls.default_input_message = message.MessageFactory.serialize(cls.default_input_message, os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))

if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'):
password = os.environ.get('INTELMQ_TEST_REDIS_PASSWORD') or \
Expand Down Expand Up @@ -522,7 +522,7 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d
event = self.get_output_queue(path=path)[queue_pos]
self.assertIsInstance(event, bytes)

event_dict = msgpack.unpackb(event, raw=False)
event_dict = message.MessageFactory.unserialize(event, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
if isinstance(expected_msg, (message.Event, message.Report)):
expected = expected_msg.to_dict(with_type=True)
else:
Expand Down
5 changes: 3 additions & 2 deletions intelmq/tests/bots/experts/cymru_whois/test_expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
import msgpack
import os
import unittest

import intelmq.lib.test as test
from intelmq.bots.experts.cymru_whois.expert import CymruExpertBot
from work.intelmq.intelmq.lib import message

EXAMPLE_INPUT = {"__type": "Event",
"source.ip": "78.104.144.2", # example.com
Expand Down Expand Up @@ -93,7 +94,7 @@ def test_6to4_result(self):
"""
self.input_message = EXAMPLE_6TO4_INPUT
self.run_bot()
actual = msgpack.loads(self.get_output_queue()[0])
actual = message.MessageFactory.serialize(self.get_output_queue()[0], use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
self.assertDictContainsSubset(EXAMPLE_6TO4_INPUT, actual)
self.assertIn("source.asn", actual)
self.assertIn("source.as_name", actual)
Expand Down
3 changes: 1 addition & 2 deletions intelmq/tests/bots/experts/idea/test_expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# -*- coding: utf-8 -*-
import unittest
import json
import msgpack

import intelmq.lib.test as test
from intelmq.lib.message import MessageFactory
Expand Down Expand Up @@ -89,7 +88,7 @@ def test_conversion(self):
# the data from the "output" field and compare after removing ID's
event = self.get_output_queue()[0]
self.assertIsInstance(event, bytes)
event_dict = MessageFactory.unserialize(event)
event_dict = MessageFactory.unserialize(event, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
self.assertIsInstance(event_dict, dict)
self.assertTrue(b"output" in event_dict)
idea_event = json.loads(event_dict["output"])
Expand Down
2 changes: 0 additions & 2 deletions intelmq/tests/bots/parsers/json/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import base64
import os
import unittest
import json
import msgpack

import intelmq.lib.test as test
from intelmq.bots.parsers.json.parser import JSONParserBot
Expand Down
73 changes: 20 additions & 53 deletions intelmq/tests/lib/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
Most tests are performed on Report, as it is formally the same as Message,
but has a valid Harmonization configuration.
"""
import json
import msgpack
from cmath import exp
import os
import unittest

import pkg_resources
Expand Down Expand Up @@ -159,12 +159,14 @@ def test_event_ne_different_config(self):
def test_invalid_type(self):
""" Test if Message raises InvalidArgument for invalid type. """
with self.assertRaises(exceptions.InvalidArgument):
message.MessageFactory.unserialize(msgpack.dumps({"__type": "Message"}), harmonization=HARM)
data = message.MessageFactory.serialize({"__type": "Message"}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
message.MessageFactory.unserialize(data, harmonization=HARM, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))

def test_invalid_type2(self):
""" Test if MessageFactory raises InvalidArgument for invalid type. """
with self.assertRaises(exceptions.InvalidArgument):
message.MessageFactory.unserialize(msgpack.dumps({"__type": "Invalid"}), harmonization=HARM)
data = message.MessageFactory.serialize({"__type": "Invalid"}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
message.MessageFactory.unserialize(data, harmonization=HARM, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))

def test_report_invalid_key(self):
""" Test if report raises InvalidKey for invalid key in add(). """
Expand Down Expand Up @@ -365,9 +367,17 @@ def test_factory_serialize(self):
report.add('feed.name', 'Example')
report.add('feed.url', URL_SANE)
report.add('raw', LOREM_BASE64, sanitize=False)
actual = message.MessageFactory.serialize(report)
expected = (b'\x84\xa9feed.name\xa7Example\xa8feed.url\xb4https://example.com/\xa3raw\xb0bG9yZW0gaXBzdW0=\xa6__type\xa6Report')
self.assertDictEqual(msgpack.unpackb(expected), msgpack.unpackb(actual))
actual = message.MessageFactory.serialize(report, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
expected = message.MessageFactory.serialize({
'feed.name': 'Example',
'feed.url': 'https://example.com/',
'raw': 'bG9yZW0gaXBzdW0=',
'__type': 'Report',
}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
self.assertDictEqual(
message.MessageFactory.unserialize(expected, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')),
message.MessageFactory.unserialize(actual, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
)

def test_deep_copy_content(self):
""" Test if deep_copy does return the same items. """
Expand Down Expand Up @@ -495,54 +505,11 @@ def test_event_dict_hierarchical(self):
'00:00'}},
event.to_dict(hierarchical=True))

def test_event_json(self):
""" Test Event to_json. """
event = self.new_event()
event = self.add_event_examples(event)
actual = event.to_json()
self.assertIsInstance(actual, str)
expected = ('{"feed.url": "https://example.com/", "feed.name": '
'"Example", "raw": "bG9yZW0gaXBzdW0=", "time.observation": '
'"2015-01-01T13:37:00+00:00"}')
self.assertDictEqual(json.loads(expected), json.loads(actual))

def test_event_json_hierarchical(self):
""" Test Event to_json. """
event = self.new_event()
event = self.add_event_examples(event)
actual = event.to_json(hierarchical=True)
self.assertIsInstance(actual, str)
expected = ('{"feed": {"url": "https://example.com/", "name": '
'"Example"}, "raw": "bG9yZW0gaXBzdW0=", "time": '
'{"observation": "2015-01-01T13:37:00+00:00"}}')
self.assertDictEqual(json.loads(expected), json.loads(actual))

def test_event_msgpack(self):
""" Test event to_msgpack """
event = self.new_event()
event = self.add_event_examples(event)
actual = event.to_msgpack()
self.assertIsInstance(actual, bytes)
excepted = (b'\x84\xa9feed.name\xa7Example\xa8feed.url\xb4https://example.com/\xa3raw\xb0bG9yZW0gaXBzdW0=\xb0time.observation\xb92015-01-01T13:37:00+00:00')
self.assertDictEqual(msgpack.unpackb(excepted), msgpack.unpackb(actual))

def test_event_serialize(self):
""" Test Event serialize. """
event = self.new_event()
self.assertEqual(b'\x81\xa6__type\xa5Event',
event.serialize())

def test_event_string(self):
""" Test Event serialize. """
event = self.new_event()
self.assertEqual(b'\x81\xa6__type\xa5Event',
event.serialize())

def test_event_unicode(self):
""" Test Event serialize. """
event = self.new_event()
self.assertEqual(b'\x81\xa6__type\xa5Event',
event.serialize())
expected = message.MessageFactory.serialize({'__type': 'Event'}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
self.assertEqual(expected, event.serialize())

def test_event_from_report(self):
""" Data from report should be in event, except for extra. """
Expand Down Expand Up @@ -607,7 +574,7 @@ def test_event_init_check_tuple(self):

def test_event_init(self):
""" Test if initialization method checks fields. """
event = msgpack.dumps({"__type": "Event", "source.asn": "foo"})
event = message.MessageFactory.serialize({"__type": "Event", "source.asn": "foo"}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
with self.assertRaises(exceptions.InvalidValue):
message.MessageFactory.unserialize(event, harmonization=HARM)

Expand Down
5 changes: 4 additions & 1 deletion intelmq/tests/lib/test_parser_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ def set_bot(cls):
cls.bot_reference = DummyParserBot
cls.default_input_message = EXAMPLE_REPORT
cls.allowed_error_count = 1
cls.sysconfig = {'error_dump_message': True}
cls.sysconfig = {
'error_dump_message': True,
'INTELMQ_USE_PACKER': 'JSON',
}

def dump_message(self, error_traceback, message=None):
self.assertDictEqual(EXPECTED_DUMP, message)
Expand Down

0 comments on commit 5215a89

Please sign in to comment.