Skip to content

Commit 03e933d

Browse files
refactor/sarif amqp delivery impl
1 parent 9e5438b commit 03e933d

14 files changed

+247
-93
lines changed

agent/resolver/results_agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from workflows.base_service import BaseWorkflowService
1+
from workflows.base_service import CxOneFlowAbstractWorkflowService
22
from workflows.messaging import DelegatedScanResultMessage
33
from services import CxOneFlowServices
44
from api_utils.auth_factories import EventContext
@@ -8,7 +8,7 @@
88
import aio_pika, gzip, importlib
99
from typing import List
1010

11-
class ResolverResultsAgent(BaseWorkflowService):
11+
class ResolverResultsAgent(CxOneFlowAbstractWorkflowService):
1212

1313
def __init__(self, services : CxOneFlowServices):
1414
self.__services = services

agent/resolver/runner_agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from workflows.resolver_workflow import ResolverScanningWorkflow
2-
from workflows.base_service import BaseWorkflowService
2+
from workflows.base_service import CxOneFlowAbstractWorkflowService
33
from workflows.resolver_scan_service import ResolverScanService
44
from workflows.messaging import (
55
DelegatedScanMessage,
@@ -17,7 +17,7 @@
1717
from _version import __version__
1818

1919

20-
class ResolverRunnerAgent(BaseWorkflowService):
20+
class ResolverRunnerAgent(CxOneFlowAbstractWorkflowService):
2121

2222
def __init__(
2323
self,

agent/resolver/timeout_agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from workflows.base_service import BaseWorkflowService
1+
from workflows.base_service import CxOneFlowAbstractWorkflowService
22
from services import CxOneFlowServices
33
from workflows.messaging import DelegatedScanMessage
44
import aio_pika
55

6-
class ResolverTimeoutAgent(BaseWorkflowService):
6+
class ResolverTimeoutAgent(CxOneFlowAbstractWorkflowService):
77

88
def __init__(self, services : CxOneFlowServices):
99
self.__services = services

config/server/__init__.py

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
from _agent import __agent__
33
from pathlib import Path
44
from config import ConfigurationException, RouteNotFoundException, CommonConfig
5-
import re, uuid, sys
5+
import re, uuid, sys, dataclasses_json
6+
from dataclasses import make_dataclass, asdict
67
from importlib import import_module
78
from scm_services import SCMService, ADOEService, BBDCService, GHService, GLService
89
from scm_services.cloner import Cloner
@@ -18,7 +19,7 @@
1819
from workflows.resolver_scan_service import ResolverScanService
1920
from workflows.pull_request import PullRequestWorkflow
2021
from workflows.push import PushWorkflow
21-
from workflows.base_service import BaseWorkflowService
22+
from workflows.base_service import CxOneFlowAbstractWorkflowService
2223
from workflows.resolver_workflow import (
2324
DummyResolverScanningWorkflow,
2425
ResolverScanningWorkflow,
@@ -29,6 +30,8 @@
2930
from cxone_api import CxOneClient
3031
from kickoff_services import DummyKickoffService, KickoffService
3132
from naming_services import ProjectNamingService
33+
from cxone_sarif import get_sarif_v210_log_for_scan
34+
from cxone_sarif.opts import DEFAULT as SARIF_DEFAULT_OPTS, ReportOpts
3235

3336

3437
class CxOneFlowConfig(CommonConfig):
@@ -229,7 +232,7 @@ def __resolver_service_factory(
229232

230233
@staticmethod
231234
def __polling_service_factory(
232-
config_path, services : List[BaseWorkflowService], **kwargs
235+
config_path, services : List[CxOneFlowAbstractWorkflowService], **kwargs
233236
) -> ScanPollingService:
234237

235238

@@ -330,16 +333,26 @@ def __pr_feedback_service_factory(
330333
)
331334

332335

336+
@staticmethod
337+
def __sarif_ReportOpts_factory(config_path : str, opts : dict) -> ReportOpts:
338+
if opts is None:
339+
return SARIF_DEFAULT_OPTS
340+
@dataclasses_json.dataclass_json
341+
class JsonSarifReportOpts(ReportOpts):
342+
@classmethod
343+
def from_dict(clazz, json : dict):
344+
return make_dataclass(clazz.__name__, json)
345+
346+
config = asdict(SARIF_DEFAULT_OPTS)
347+
config.update(opts)
348+
return JsonSarifReportOpts.from_dict(config)
349+
350+
333351

334352
@staticmethod
335353
def __push_feedback_service_factory(
336354
config_path, moniker, **kwargs
337355
) -> PushFeedbackService:
338-
339-
# TODO: Need to get the defaults for timeouts.
340-
# scan-timeout-hours
341-
# poll-interval-seconds
342-
343356
if kwargs is None or len(kwargs.keys()) == 0:
344357
return PushFeedbackService(
345358
moniker, CxOneFlowConfig.__server_base_url, PushWorkflow(),
@@ -349,8 +362,42 @@ def __push_feedback_service_factory(
349362
True,
350363
)
351364
else:
365+
push_config_dict = CxOneFlowConfig._get_value_for_key_or_default("push", kwargs, {})
366+
scan_monitor_dict = CxOneFlowConfig._get_value_for_key_or_default(
367+
"scan-monitor", kwargs, {}
368+
)
369+
370+
sarif_opts_dict = CxOneFlowConfig._get_value_for_key_or_default("sarif-opts", push_config_dict, None)
371+
372+
amqp_delivery_dict = CxOneFlowConfig._get_value_for_key_or_default("via-amqp", push_config_dict, None)
373+
374+
delivery_agents = []
375+
376+
if amqp_delivery_dict is not None and not 'amqp' in amqp_delivery_dict.keys():
377+
raise ConfigurationException.missing_key_path(f"{config_path}/via-amqp/amqp")
378+
elif amqp_delivery_dict is not None:
379+
delivery_agents.append(PushFeedbackService.amqp_agent_factory
380+
(moniker,
381+
CxOneFlowConfig._get_secret_from_value_of_key_or_fail
382+
(f"{config_path}/via-amqp", "shared-secret", amqp_delivery_dict),
383+
*CxOneFlowConfig._load_amqp_settings(config_path, **amqp_delivery_dict)))
384+
352385
return PushFeedbackService(
353-
moniker, PushWorkflow(),
386+
moniker, delivery_agents, CxOneFlowConfig.__sarif_ReportOpts_factory(f"{config_path}/push", sarif_opts_dict),
387+
PushWorkflow(
388+
CxOneFlowConfig._get_value_for_key_or_default(
389+
"enabled", push_config_dict, False
390+
),
391+
int(
392+
CxOneFlowConfig._get_value_for_key_or_default(
393+
"poll-interval-seconds", scan_monitor_dict, CxOneFlowConfig.DEFAULT_POLL_INTERVAL_SECS
394+
)
395+
),
396+
int(
397+
CxOneFlowConfig._get_value_for_key_or_default(
398+
"scan-timeout-hours", scan_monitor_dict, CxOneFlowConfig.DEFAULT_SCAN_TIMEOUT_HOURS
399+
)
400+
)),
354401
*CxOneFlowConfig._load_amqp_settings(config_path, **kwargs)
355402
)
356403

cxone_service/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,16 @@ def __init__(self, moniker : str, cxone_client : CxOneClient, default_engines :
5050
self.__group_service = grouping_service
5151

5252
@property
53-
def moniker(self):
53+
def moniker(self) -> str:
5454
return self.__moniker
5555

5656
@property
57-
def display_link(self):
57+
def display_link(self) -> str:
5858
return self.__client.display_endpoint
59+
60+
@property
61+
def client(self) -> CxOneClient:
62+
return self.__client
5963

6064
@staticmethod
6165
def __get_json_or_fail(response):

rabbit_config.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from workflows.push_feedback_service import PushFeedbackService
99
from workflows.resolver_scan_service import ResolverScanService
1010
from workflows.scan_polling_service import ScanPollingService
11-
from workflows.base_service import BaseWorkflowService
11+
from workflows.base_service import CxOneFlowAbstractWorkflowService
1212

1313

1414

@@ -41,7 +41,7 @@ async def setup() -> None:
4141
durable=True,
4242
)
4343
scan_in_exchange = await channel.declare_exchange(
44-
BaseWorkflowService.EXCHANGE_SCAN_INPUT,
44+
CxOneFlowAbstractWorkflowService.EXCHANGE_SCAN_INPUT,
4545
aio_pika.ExchangeType.FANOUT,
4646
durable=True,
4747
)
@@ -53,7 +53,7 @@ async def setup() -> None:
5353
internal=True,
5454
)
5555
scan_await_exchange = await channel.declare_exchange(
56-
BaseWorkflowService.EXCHANGE_SCAN_WAIT,
56+
CxOneFlowAbstractWorkflowService.EXCHANGE_SCAN_WAIT,
5757
aio_pika.ExchangeType.TOPIC,
5858
durable=True,
5959
internal=True,
@@ -101,7 +101,7 @@ async def setup() -> None:
101101
internal=True,
102102
)
103103
polling_delivery_exchange = await channel.declare_exchange(
104-
BaseWorkflowService.EXCHANGE_SCAN_POLLING,
104+
CxOneFlowAbstractWorkflowService.EXCHANGE_SCAN_POLLING,
105105
aio_pika.ExchangeType.TOPIC,
106106
durable=True,
107107
internal=True,
@@ -125,7 +125,7 @@ async def setup() -> None:
125125
"x-queue-type": "quorum",
126126
"x-dead-letter-strategy": "at-least-once",
127127
"x-overflow": "reject-publish",
128-
"x-dead-letter-exchange": BaseWorkflowService.EXCHANGE_SCAN_POLLING,
128+
"x-dead-letter-exchange": CxOneFlowAbstractWorkflowService.EXCHANGE_SCAN_POLLING,
129129
},
130130
)
131131

workflow_agent.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from config.server import CxOneFlowConfig
55
from workflows.scan_polling_service import ScanPollingService
66
from workflows.pr_feedback_service import PRFeedbackService
7+
from workflows.push_feedback_service import PushFeedbackService
78
from workflows.resolver_scan_service import ResolverScanService
89
from workflows.messaging import (
910
ScanAwaitMessage,
@@ -18,6 +19,20 @@
1819
__log = logging.getLogger("WorkflowAgent")
1920

2021

22+
async def process_gen_sarif(msg: aio_pika.abc.AbstractIncomingMessage) -> None:
23+
try:
24+
__log.debug(
25+
f"Received Sarif generation message on channel {msg.channel.number}: {msg.info()}"
26+
)
27+
28+
sm = ScanFeedbackMessage.from_binary(msg.body)
29+
services = CxOneFlowConfig.retrieve_services_by_moniker(sm.moniker)
30+
await services.push.execute_sarif_generation(msg, services.cxone.client)
31+
except BaseException as ex:
32+
__log.exception(ex)
33+
await msg.nack(requeue=False)
34+
35+
2136
async def process_poll(msg: aio_pika.abc.AbstractIncomingMessage) -> None:
2237
try:
2338
__log.debug(
@@ -66,6 +81,16 @@ async def spawn_agents():
6681
async with asyncio.TaskGroup() as g:
6782
for moniker in CxOneFlowConfig.get_service_monikers():
6883
services = CxOneFlowConfig.retrieve_services_by_moniker(moniker)
84+
85+
g.create_task(
86+
mq_agent(
87+
process_gen_sarif,
88+
await services.pr.mq_client(),
89+
moniker,
90+
PushFeedbackService.QUEUE_SARIF_GEN,
91+
)
92+
)
93+
6994
g.create_task(
7095
mq_agent(
7196
process_poll,
@@ -82,7 +107,6 @@ async def spawn_agents():
82107
ScanPollingService.QUEUE_SCAN_POLLING,
83108
)
84109
)
85-
86110
g.create_task(
87111
mq_agent(
88112
process_pr_annotate,

workflows/base_service.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,12 @@
11
import urllib.parse, aio_pika, logging, asyncio, os
2-
from workflows import ScanStates
32
from ssl import create_default_context, CERT_NONE
43
from cxoneflow_logging import SecretRegistry
54
from workflows.messaging.base_message import BaseMessage
65
from workflows.messaging import ScanAwaitMessage
76
from typing import Any
87

9-
class BaseWorkflowService:
10-
11-
ELEMENT_PREFIX = "cx:"
12-
TOPIC_PREFIX = "cx."
13-
14-
EXCHANGE_SCAN_INPUT = f"{ELEMENT_PREFIX}Scan In"
15-
EXCHANGE_SCAN_WAIT = f"{ELEMENT_PREFIX}Scan Await"
16-
EXCHANGE_SCAN_POLLING = f"{ELEMENT_PREFIX}Scan Polling Delivery"
178

9+
class AMQPClient:
1810

1911
def __init__(self, amqp_url : str, amqp_user : str, amqp_password : str, ssl_verify : bool):
2012
self.__lock = asyncio.Lock()
@@ -42,7 +34,7 @@ async def mq_client(self) -> aio_pika.abc.AbstractRobustConnection:
4234
async with self.__lock:
4335

4436
if self.__client is None:
45-
BaseWorkflowService.log().debug(f"Creating AMQP connection to: {self.__amqp_url}")
37+
AMQPClient.log().debug(f"Creating AMQP connection to: {self.__amqp_url}")
4638
ctx = None
4739

4840
if isinstance(self.__ssl_verify, bool):
@@ -63,17 +55,26 @@ async def mq_client(self) -> aio_pika.abc.AbstractRobustConnection:
6355
ssl_context=ctx)
6456
return self.__client
6557

58+
59+
class CxOneFlowAbstractWorkflowService(AMQPClient):
60+
ELEMENT_PREFIX = "cx:"
61+
TOPIC_PREFIX = "cx."
62+
63+
EXCHANGE_SCAN_INPUT = f"{ELEMENT_PREFIX}Scan In"
64+
EXCHANGE_SCAN_WAIT = f"{ELEMENT_PREFIX}Scan Await"
65+
EXCHANGE_SCAN_POLLING = f"{ELEMENT_PREFIX}Scan Polling Delivery"
66+
6667
async def _safe_deserialize_body(self, msg : aio_pika.abc.AbstractIncomingMessage, msg_class : BaseMessage) -> Any:
6768
try:
6869
ret_val = msg_class.from_binary(msg.body)
6970
return ret_val
7071
except BaseException as ex:
71-
BaseWorkflowService.log().exception(ex)
72+
CxOneFlowAbstractWorkflowService.log().exception(ex)
7273
await msg.nack(requeue=False)
7374
raise
7475

7576
async def handle_completed_scan(self, msg : ScanAwaitMessage) -> None:
7677
raise NotImplementedError("handle_awaited_scan")
7778

7879
async def handle_awaited_scan_error(self, msg : ScanAwaitMessage, error_msg : str) -> None:
79-
raise NotImplementedError("handle_awaited_scan_error")
80+
raise NotImplementedError("handle_awaited_scan_error")

workflows/pr_feedback_service.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,38 @@
55
from workflows.feedback_workflow_base import AbstractPRFeedbackWorkflow
66
from workflows import ScanStates, ScanWorkflow, FeedbackWorkflow
77
from workflows.pr import PullRequestAnnotation, PullRequestFeedback
8-
from workflows.base_service import BaseWorkflowService
8+
from workflows.base_service import CxOneFlowAbstractWorkflowService
99
from cxone_service import CxOneException
1010

11-
class PRFeedbackService(BaseWorkflowService):
11+
class PRFeedbackService(CxOneFlowAbstractWorkflowService):
1212
PR_ELEMENT_PREFIX = "pr:"
1313
PR_TOPIC_PREFIX = "pr."
1414

15-
EXCHANGE_SCAN_INPUT_LEGACY = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan In"
16-
EXCHANGE_SCAN_WAIT_LEGACY = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan Await"
17-
EXCHANGE_SCAN_POLLING_LEGACY = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan Polling Delivery"
15+
EXCHANGE_SCAN_INPUT_LEGACY = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan In"
16+
EXCHANGE_SCAN_WAIT_LEGACY = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan Await"
17+
EXCHANGE_SCAN_POLLING_LEGACY = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan Polling Delivery"
1818

1919

20-
EXCHANGE_SCAN_ANNOTATE = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan Annotate"
21-
EXCHANGE_SCAN_FEEDBACK = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan Feedback"
20+
EXCHANGE_SCAN_ANNOTATE = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan Annotate"
21+
EXCHANGE_SCAN_FEEDBACK = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Scan Feedback"
2222

23-
QUEUE_SCAN_POLLING_LEGACY = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Polling Scans"
24-
QUEUE_SCAN_WAIT_LEGACY = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Awaited Scans"
23+
QUEUE_SCAN_POLLING_LEGACY = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Polling Scans"
24+
QUEUE_SCAN_WAIT_LEGACY = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}Awaited Scans"
2525

2626

27-
QUEUE_ANNOTATE_PR = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}PR Annotating"
28-
QUEUE_FEEDBACK_PR = f"{BaseWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}PR Feedback"
27+
QUEUE_ANNOTATE_PR = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}PR Annotating"
28+
QUEUE_FEEDBACK_PR = f"{CxOneFlowAbstractWorkflowService.ELEMENT_PREFIX}{PR_ELEMENT_PREFIX}PR Feedback"
2929

30-
ROUTEKEY_POLL_BINDING_LEGACY = f"{BaseWorkflowService.TOPIC_PREFIX}{PR_TOPIC_PREFIX}{ScanStates.AWAIT}.*.*"
30+
ROUTEKEY_POLL_BINDING_LEGACY = f"{CxOneFlowAbstractWorkflowService.TOPIC_PREFIX}{PR_TOPIC_PREFIX}{ScanStates.AWAIT}.*.*"
3131

3232

33-
ROUTEKEY_FEEDBACK_PR = f"{BaseWorkflowService.TOPIC_PREFIX}{PR_TOPIC_PREFIX}{ScanStates.FEEDBACK}.{FeedbackWorkflow.PR}.*"
34-
ROUTEKEY_ANNOTATE_PR = f"{BaseWorkflowService.TOPIC_PREFIX}{PR_TOPIC_PREFIX}{ScanStates.ANNOTATE}.{FeedbackWorkflow.PR}.*"
33+
ROUTEKEY_FEEDBACK_PR = f"{CxOneFlowAbstractWorkflowService.TOPIC_PREFIX}{PR_TOPIC_PREFIX}{ScanStates.FEEDBACK}.{FeedbackWorkflow.PR}.*"
34+
ROUTEKEY_ANNOTATE_PR = f"{CxOneFlowAbstractWorkflowService.TOPIC_PREFIX}{PR_TOPIC_PREFIX}{ScanStates.ANNOTATE}.{FeedbackWorkflow.PR}.*"
3535

3636

3737
@staticmethod
3838
def make_topic(state : ScanStates, workflow : FeedbackWorkflow, moniker : str):
39-
return f"{BaseWorkflowService.TOPIC_PREFIX}{PRFeedbackService.PR_TOPIC_PREFIX}{state}.{workflow}.{moniker}"
39+
return f"{CxOneFlowAbstractWorkflowService.TOPIC_PREFIX}{PRFeedbackService.PR_TOPIC_PREFIX}{state}.{workflow}.{moniker}"
4040

4141
@staticmethod
4242
def log():

0 commit comments

Comments
 (0)