diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 7c730f976..9ca64d58a 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -8,6 +8,7 @@ from pathlib import Path from uuid import uuid4 import re +import redis from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError @@ -35,6 +36,12 @@ class MISPFeedOutputBot(OutputBot): misp_org_name = None misp_org_uuid = None output_dir: str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path + queue_db: int = 2 + queue_host: str = "localhost" + queue_name: str = None + queue_password: str = None + queue_port: int = 6379 + batch_size: int = 100 _is_multithreadable: bool = False @staticmethod @@ -45,6 +52,13 @@ def check_output_dir(dirname): return True def init(self): + # Set up redis connection for length checks + if not self.queue_name: + self.queue_name = self.source_queue + self.redis = self.connect_redis() + + self.event_batch = [] + if MISPEvent is None and import_fail_reason == 'syntax': raise MissingDependencyError("pymisp", version='>=2.4.117.3', @@ -105,13 +119,31 @@ def process(self): event = self.receive_message().to_dict(jsondict_as_string=True) - obj = self.current_event.add_object(name='intelmq_event') - for object_relation, value in event.items(): - try: - obj.add_attribute(object_relation, value=value) - except NewAttributeError: - # This entry isn't listed in the harmonization file, ignoring. - pass + current_queue_len = self.redis.llen(self.queue_name) + if current_queue_len % self.batch_size == 0: + self.flush_batch() + else: + self.event_batch.append(event) + + self.acknowledge_message() + + def connect_redis(self): + return redis.Redis( + host=self.queue_host, + port=self.queue_port, + db=self.queue_db, + password=self.queue_password, + ) + + def flush_batch(self): + for event in self.event_batch: + obj = self.current_event.add_object(name='intelmq_event') + for object_relation, value in event.items(): + try: + obj.add_attribute(object_relation, value=value) + except NewAttributeError: + # This entry isn't listed in the harmonization file, ignoring. + pass feed_output = self.current_event.to_feed(with_meta=False) @@ -119,7 +151,8 @@ def process(self): json.dump(feed_output, f) feed_meta_generator(self.output_dir) - self.acknowledge_message() + + self.event_batch.clear() @staticmethod def check(parameters):