diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 9c7ab4eb3..e8207e131 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -17,7 +17,6 @@ import inspect import io import json -import msgpack import logging import os import re @@ -99,6 +98,7 @@ class Bot(object): statistics_host: str = "127.0.0.1" statistics_password: Optional[str] = None statistics_port: int = 6379 + pipeline_use_packer: str = os.environ.get('INTELMQ_USE_PACKER', 'MsgPack') _message_processed_verb: str = 'Processed' @@ -808,7 +808,7 @@ def __init_logger(self): def __log_configuration_parameter(self, config_name: str, option: str, value: Any): if "password" in option or "token" in option: - value = "HIDDEN" + value = "" message = "{} configuration: parameter {!r} loaded with value {!r}." \ .format(config_name.title(), option, value) @@ -1369,9 +1369,8 @@ def export_event(self, event: libmessage.Event, if 'raw' in event: del event['raw'] if return_type is str: - return event.to_json(hierarchical=self.hierarchical, - with_type=self.with_type, - jsondict_as_string=self.jsondict_as_string) + return event.to_pack("JSON", hierarchical=self.hierarchical, + with_type=self.with_type) else: retval = event.to_dict(hierarchical=self.hierarchical, with_type=self.with_type, diff --git a/intelmq/lib/exceptions.py b/intelmq/lib/exceptions.py index 7c149ed85..08b3b3606 100644 --- a/intelmq/lib/exceptions.py +++ b/intelmq/lib/exceptions.py @@ -180,4 +180,13 @@ class UnserializationError(IntelMQException, ValueError): """ def __init__(self, exception: Exception = None, object: bytes = None): self.object = object - super().__init__("Could not unserialize message%s." % exception) + super().__init__("Could not unserialize message, %s." % exception) + + +class SerializationError(IntelMQException, ValueError): + """ + Unrecoverable error during message serialization + """ + def __init__(self, exception: Exception = None, object: bytes = None): + self.object = object + super().__init__("Could not serialize message, %s." % exception) diff --git a/intelmq/lib/message.py b/intelmq/lib/message.py index 0de529e2f..cdc07ec88 100644 --- a/intelmq/lib/message.py +++ b/intelmq/lib/message.py @@ -14,16 +14,20 @@ import warnings from collections import defaultdict from typing import Any, Dict, Iterable, Optional, Sequence, Union -import msgpack import intelmq.lib.exceptions as exceptions import intelmq.lib.harmonization from intelmq import HARMONIZATION_CONF_FILE from intelmq.lib import utils -__all__ = ['Event', 'Message', 'MessageFactory', 'Report'] +__all__ = ['Event', 'Message', 'MessageFactory', 'Report', 'Packer', 'PackerMsgPack', 'PackerJSON'] VALID_MESSSAGE_TYPES = ('Event', 'Message', 'Report') +try: + import msgpack +except: + msgpack = None + class MessageFactory(object): """ @@ -60,7 +64,7 @@ def from_dict(message: dict, harmonization=None, @staticmethod def unserialize(raw_message: bytes, harmonization: dict = None, - default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict: + default_type: Optional[str] = None, use_packer: str = "MsgPack") -> dict: """ Takes JSON-encoded Message object, returns instance of correct class. @@ -78,14 +82,13 @@ def unserialize(raw_message: bytes, harmonization: dict = None, default_type=default_type) @staticmethod - def serialize(message) -> bytes: + def serialize(message, use_packer: str = 'MsgPack') -> bytes: """ Takes instance of message-derived class and makes JSON-encoded Message. The class is saved in __type attribute. """ - raw_message = Message.serialize(message) - return raw_message + return Message.serialize(message, use_packer=use_packer) class Message(dict): @@ -309,30 +312,29 @@ def deep_copy(self): harmonization={self.__class__.__name__.lower(): self.harmonization_config}) def __str__(self): - return self.serialize(use_packer="json") + return self.serialize(use_packer="JSON") - def serialize(self, use_packer: str = "msgpack"): + def serialize(self, use_packer: str = "MsgPack"): delete_type = False if '__type' not in self: delete_type = True self['__type'] = self.__class__.__name__ - if use_packer == "json": - packed = json.dumps(self) - else: - packed = msgpack.packb(self) + try: + packer: Packer = getattr(intelmq.lib.message, f"Packer{use_packer}")() + packed = packer.serialize(data=self) + except Exception as exc: + raise exceptions.SerializationError(exception=exc, object=self) if delete_type: del self['__type'] return packed @staticmethod - def unserialize(message: bytes, use_packer: str = "msgpack"): + def unserialize(message: bytes, use_packer: str = "MsgPack"): try: - if use_packer == "json": - return json.loads(message) - else: - return msgpack.unpackb(message, raw=False) + packer: Packer = getattr(intelmq.lib.message, f"Packer{use_packer}")() + return packer.unserialize(data=message) except Exception as exc: raise exceptions.UnserializationError(exception=exc, object=message) @@ -485,13 +487,13 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False, return new_dict - def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False): - json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type) - return json.dumps(json_dict, ensure_ascii=False, sort_keys=True) - - def to_msgpack(self, hierarchical=False, with_type=False): - msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type) - return msgpack.packb(msgpack_dict) + def to_pack(self, use_packer="MsgPack", hierarchical=False, with_type=False, **kwargs): + try: + packer: Packer = getattr(intelmq.lib.message, f"Packer{use_packer}")() + data = self.to_dict(hierarchical=hierarchical, with_type=with_type) + return packer.serialize(data, **kwargs) + except Exception as exc: + raise exceptions.SerializationError(exception=exc, object=self) def __eq__(self, other: dict) -> bool: """ @@ -590,3 +592,38 @@ def copy(self): if 'time.observation' in retval and 'time.observation' not in self: del retval['time.observation'] return retval + + +class Packer(): + def __init__(self) -> None: + pass + + def serialize(self, data: bytes, **kwargs): + raise NotImplementedError() + + def unserialize(self, data: bytes, **kwargs): + raise NotImplementedError() + + +class PackerMsgPack(Packer): + def __init__(self) -> None: + if msgpack is None: + raise exceptions.MissingDependencyError("msgpack") + super().__init__() + + def serialize(self, data, **kwargs): + return msgpack.packb(data, **kwargs) + + def unserialize(self, data, **kwargs): + return msgpack.unpackb(data, raw=False, **kwargs) + + +class PackerJSON(Packer): + def __init__(self) -> None: + super().__init__() + + def serialize(self, data, **kwargs): + return json.dumps(data, **kwargs) + + def unserialize(self, data, **kwargs): + return json.loads(data, **kwargs) diff --git a/intelmq/lib/test.py b/intelmq/lib/test.py index 632a18cbf..44ad95810 100644 --- a/intelmq/lib/test.py +++ b/intelmq/lib/test.py @@ -153,7 +153,7 @@ def setUpClass(cls): elif cls.bot_type != 'collector' and cls.default_input_message == '': cls.default_input_message = {'__type': 'Event'} if type(cls.default_input_message) is dict: - cls.default_input_message = msgpack.packb(cls.default_input_message) + cls.default_input_message = message.MessageFactory.serialize(cls.default_input_message, os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'): password = os.environ.get('INTELMQ_TEST_REDIS_PASSWORD') or \ @@ -522,7 +522,7 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d event = self.get_output_queue(path=path)[queue_pos] self.assertIsInstance(event, bytes) - event_dict = msgpack.unpackb(event, raw=False) + event_dict = message.MessageFactory.unserialize(event, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) if isinstance(expected_msg, (message.Event, message.Report)): expected = expected_msg.to_dict(with_type=True) else: diff --git a/intelmq/tests/bots/experts/cymru_whois/test_expert.py b/intelmq/tests/bots/experts/cymru_whois/test_expert.py index 70343e701..b025d0040 100644 --- a/intelmq/tests/bots/experts/cymru_whois/test_expert.py +++ b/intelmq/tests/bots/experts/cymru_whois/test_expert.py @@ -3,11 +3,12 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- -import msgpack +import os import unittest import intelmq.lib.test as test from intelmq.bots.experts.cymru_whois.expert import CymruExpertBot +from work.intelmq.intelmq.lib import message EXAMPLE_INPUT = {"__type": "Event", "source.ip": "78.104.144.2", # example.com @@ -93,7 +94,7 @@ def test_6to4_result(self): """ self.input_message = EXAMPLE_6TO4_INPUT self.run_bot() - actual = msgpack.loads(self.get_output_queue()[0]) + actual = message.MessageFactory.serialize(self.get_output_queue()[0], use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) self.assertDictContainsSubset(EXAMPLE_6TO4_INPUT, actual) self.assertIn("source.asn", actual) self.assertIn("source.as_name", actual) diff --git a/intelmq/tests/bots/experts/idea/test_expert.py b/intelmq/tests/bots/experts/idea/test_expert.py index 5a02ef7fe..bd87f0ac9 100644 --- a/intelmq/tests/bots/experts/idea/test_expert.py +++ b/intelmq/tests/bots/experts/idea/test_expert.py @@ -5,7 +5,6 @@ # -*- coding: utf-8 -*- import unittest import json -import msgpack import intelmq.lib.test as test from intelmq.lib.message import MessageFactory @@ -89,7 +88,7 @@ def test_conversion(self): # the data from the "output" field and compare after removing ID's event = self.get_output_queue()[0] self.assertIsInstance(event, bytes) - event_dict = MessageFactory.unserialize(event) + event_dict = MessageFactory.unserialize(event, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) self.assertIsInstance(event_dict, dict) self.assertTrue(b"output" in event_dict) idea_event = json.loads(event_dict["output"]) diff --git a/intelmq/tests/bots/parsers/json/test_parser.py b/intelmq/tests/bots/parsers/json/test_parser.py index 2c83658ed..38f97ec51 100644 --- a/intelmq/tests/bots/parsers/json/test_parser.py +++ b/intelmq/tests/bots/parsers/json/test_parser.py @@ -6,8 +6,6 @@ import base64 import os import unittest -import json -import msgpack import intelmq.lib.test as test from intelmq.bots.parsers.json.parser import JSONParserBot diff --git a/intelmq/tests/lib/test_message.py b/intelmq/tests/lib/test_message.py index 4c079b959..8f2227623 100644 --- a/intelmq/tests/lib/test_message.py +++ b/intelmq/tests/lib/test_message.py @@ -10,8 +10,8 @@ Most tests are performed on Report, as it is formally the same as Message, but has a valid Harmonization configuration. """ -import json -import msgpack +from cmath import exp +import os import unittest import pkg_resources @@ -159,12 +159,14 @@ def test_event_ne_different_config(self): def test_invalid_type(self): """ Test if Message raises InvalidArgument for invalid type. """ with self.assertRaises(exceptions.InvalidArgument): - message.MessageFactory.unserialize(msgpack.dumps({"__type": "Message"}), harmonization=HARM) + data = message.MessageFactory.serialize({"__type": "Message"}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) + message.MessageFactory.unserialize(data, harmonization=HARM, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) def test_invalid_type2(self): """ Test if MessageFactory raises InvalidArgument for invalid type. """ with self.assertRaises(exceptions.InvalidArgument): - message.MessageFactory.unserialize(msgpack.dumps({"__type": "Invalid"}), harmonization=HARM) + data = message.MessageFactory.serialize({"__type": "Invalid"}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) + message.MessageFactory.unserialize(data, harmonization=HARM, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) def test_report_invalid_key(self): """ Test if report raises InvalidKey for invalid key in add(). """ @@ -365,9 +367,17 @@ def test_factory_serialize(self): report.add('feed.name', 'Example') report.add('feed.url', URL_SANE) report.add('raw', LOREM_BASE64, sanitize=False) - actual = message.MessageFactory.serialize(report) - expected = (b'\x84\xa9feed.name\xa7Example\xa8feed.url\xb4https://example.com/\xa3raw\xb0bG9yZW0gaXBzdW0=\xa6__type\xa6Report') - self.assertDictEqual(msgpack.unpackb(expected), msgpack.unpackb(actual)) + actual = message.MessageFactory.serialize(report, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) + expected = message.MessageFactory.serialize({ + 'feed.name': 'Example', + 'feed.url': 'https://example.com/', + 'raw': 'bG9yZW0gaXBzdW0=', + '__type': 'Report', + }, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) + self.assertDictEqual( + message.MessageFactory.unserialize(expected, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')), + message.MessageFactory.unserialize(actual, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) + ) def test_deep_copy_content(self): """ Test if deep_copy does return the same items. """ @@ -495,54 +505,11 @@ def test_event_dict_hierarchical(self): '00:00'}}, event.to_dict(hierarchical=True)) - def test_event_json(self): - """ Test Event to_json. """ - event = self.new_event() - event = self.add_event_examples(event) - actual = event.to_json() - self.assertIsInstance(actual, str) - expected = ('{"feed.url": "https://example.com/", "feed.name": ' - '"Example", "raw": "bG9yZW0gaXBzdW0=", "time.observation": ' - '"2015-01-01T13:37:00+00:00"}') - self.assertDictEqual(json.loads(expected), json.loads(actual)) - - def test_event_json_hierarchical(self): - """ Test Event to_json. """ - event = self.new_event() - event = self.add_event_examples(event) - actual = event.to_json(hierarchical=True) - self.assertIsInstance(actual, str) - expected = ('{"feed": {"url": "https://example.com/", "name": ' - '"Example"}, "raw": "bG9yZW0gaXBzdW0=", "time": ' - '{"observation": "2015-01-01T13:37:00+00:00"}}') - self.assertDictEqual(json.loads(expected), json.loads(actual)) - - def test_event_msgpack(self): - """ Test event to_msgpack """ - event = self.new_event() - event = self.add_event_examples(event) - actual = event.to_msgpack() - self.assertIsInstance(actual, bytes) - excepted = (b'\x84\xa9feed.name\xa7Example\xa8feed.url\xb4https://example.com/\xa3raw\xb0bG9yZW0gaXBzdW0=\xb0time.observation\xb92015-01-01T13:37:00+00:00') - self.assertDictEqual(msgpack.unpackb(excepted), msgpack.unpackb(actual)) - def test_event_serialize(self): """ Test Event serialize. """ event = self.new_event() - self.assertEqual(b'\x81\xa6__type\xa5Event', - event.serialize()) - - def test_event_string(self): - """ Test Event serialize. """ - event = self.new_event() - self.assertEqual(b'\x81\xa6__type\xa5Event', - event.serialize()) - - def test_event_unicode(self): - """ Test Event serialize. """ - event = self.new_event() - self.assertEqual(b'\x81\xa6__type\xa5Event', - event.serialize()) + expected = message.MessageFactory.serialize({'__type': 'Event'}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) + self.assertEqual(expected, event.serialize()) def test_event_from_report(self): """ Data from report should be in event, except for extra. """ @@ -607,7 +574,7 @@ def test_event_init_check_tuple(self): def test_event_init(self): """ Test if initialization method checks fields. """ - event = msgpack.dumps({"__type": "Event", "source.asn": "foo"}) + event = message.MessageFactory.serialize({"__type": "Event", "source.asn": "foo"}, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')) with self.assertRaises(exceptions.InvalidValue): message.MessageFactory.unserialize(event, harmonization=HARM) diff --git a/intelmq/tests/lib/test_parser_bot.py b/intelmq/tests/lib/test_parser_bot.py index 9ec923b37..52fe5edb5 100644 --- a/intelmq/tests/lib/test_parser_bot.py +++ b/intelmq/tests/lib/test_parser_bot.py @@ -130,7 +130,10 @@ def set_bot(cls): cls.bot_reference = DummyParserBot cls.default_input_message = EXAMPLE_REPORT cls.allowed_error_count = 1 - cls.sysconfig = {'error_dump_message': True} + cls.sysconfig = { + 'error_dump_message': True, + 'INTELMQ_USE_PACKER': 'JSON', + } def dump_message(self, error_traceback, message=None): self.assertDictEqual(EXPECTED_DUMP, message)