Skip to content

Commit 5a5d580

Browse files
authoredMay 22, 2024··
Merge pull request #1224 from vincenzocaputo/attach-galaxy-cluster
Add attach galaxy cluster method
2 parents a74dd07 + 07fb871 commit 5a5d580

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed
 

‎pymisp/api.py

+33
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,39 @@ def delete_event_report(self, event_report: MISPEventReport | int | str | UUID,
587587

588588
# ## END Event Report ###
589589

590+
# ## BEGIN Galaxy Cluster ###
591+
def attach_galaxy_cluster(self, misp_entity: MISPEvent | MISPAttribute, galaxy_cluster: MISPGalaxyCluster | int | str, local: bool = False, pythonify: bool = False) -> dict[str, Any] | list[dict[str, Any]]:
592+
"""Attach a galaxy cluster to an event or an attribute
593+
594+
:param misp_entity: a MISP Event or a MISP Attribute
595+
:param galaxy_cluster: Galaxy cluster to attach
596+
:param local: whether the object should be attached locally or not to the target
597+
:param pythonify: Returns a PyMISP Object instead of the plain json output
598+
"""
599+
if isinstance(misp_entity, MISPEvent):
600+
attach_target_type = 'event'
601+
elif isinstance(misp_entity, MISPAttribute):
602+
attach_target_type = 'attribute'
603+
else:
604+
raise PyMISPError('The misp_entity must be MISPEvent or MISPAttribute')
605+
606+
attach_target_id = misp_entity.id
607+
local = 1 if local else 0
608+
609+
if isinstance(galaxy_cluster, MISPGalaxyCluster):
610+
cluster_id = galaxy_cluster.id
611+
elif isinstance(galaxy_cluster, (int, str)):
612+
cluster_id = galaxy_cluster
613+
else:
614+
raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)')
615+
616+
to_post = { 'Galaxy': { 'target_id': cluster_id } }
617+
url = f'galaxies/attachCluster/{attach_target_id}/{attach_target_type}/local:{local}'
618+
619+
r = self._prepare_request('POST', url, data=to_post)
620+
return self._check_json_response(r)
621+
# ## END Galaxy Cluster ###
622+
590623
# ## BEGIN Object ###
591624

592625
def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject:

‎tests/testlive_comprehensive.py

+37
Original file line numberDiff line numberDiff line change
@@ -3197,6 +3197,43 @@ def test_event_galaxy(self) -> None:
31973197
self.admin_misp_connector.delete_event(event)
31983198
self.admin_misp_connector.toggle_global_pythonify()
31993199

3200+
def test_attach_galaxy_cluster(self) -> None:
3201+
event = self.create_simple_event()
3202+
event = self.admin_misp_connector.add_event(event, pythonify=True)
3203+
try:
3204+
galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True)
3205+
galaxy: MISPGalaxy = galaxies[0]
3206+
if gid := galaxy.id:
3207+
galaxy = self.admin_misp_connector.get_galaxy(gid, withCluster=True, pythonify=True)
3208+
else:
3209+
raise Exception("No galaxy found")
3210+
galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[0]
3211+
self.admin_misp_connector.attach_galaxy_cluster(event, galaxy_cluster)
3212+
event = self.admin_misp_connector.get_event(event.id, pythonify=True)
3213+
3214+
self.assertEqual(len(event.galaxies), 1)
3215+
event_galaxy = event.galaxies[0]
3216+
# The galaxy ID should equal the galaxy from which the cluster came from
3217+
self.assertEqual(event_galaxy.id, galaxy.id)
3218+
# The galaxy cluster should equal the cluster added
3219+
self.assertEqual(event_galaxy.clusters[0].id, galaxy_cluster.id)
3220+
3221+
galaxy_cluster: MISPGalaxyCluster = galaxy.clusters[1]
3222+
3223+
# Test on attribute
3224+
attribute = event.Attribute[0]
3225+
event = self.admin_misp_connector.get_event(event.id, pythonify=True)
3226+
attribute = event.Attribute[0]
3227+
self.assertEqual(len(attribute.galaxies), 1)
3228+
attribute_galaxy = attribute.galaxies[0]
3229+
# The galaxy ID should equal the galaxy from which the cluster came from
3230+
self.assertEqual(attribute_galaxy.id, galaxy.id)
3231+
# The galaxy cluster should equal the cluster added
3232+
self.assertEqual(attribute_galaxy.clusters[0].id, galaxy_cluster.id)
3233+
finally:
3234+
self.admin_misp_connector.delete_event(event)
3235+
self.admin_misp_connector.toggle_global_pythonify()
3236+
32003237
@unittest.skip("Internal use only")
32013238
def missing_methods(self) -> None:
32023239
skip = [

0 commit comments

Comments
 (0)
Please sign in to comment.