-
Notifications
You must be signed in to change notification settings - Fork 10
Improved command data structure & handler #529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d897644
1e005c0
dc5777f
c5f4251
2d30deb
0b35c50
14b34ed
cf3bf1c
a81cfa5
d179445
964ad49
48d6df9
fba6c2c
7d4c053
346d4e2
f811a61
fa94194
b7a25ac
dcfe12d
88676cd
5d481a4
e240d03
5a7e6d1
4901d34
9884fee
34d1a32
2ac1c58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,12 +10,12 @@ | |
| from aikido_zen.storage.users import Users | ||
| from aikido_zen.storage.hostnames import Hostnames | ||
| from ..realtime.start_polling_for_changes import start_polling_for_changes | ||
| from ...helpers.get_current_unixtime_ms import get_unixtime_ms | ||
| from ...storage.ai_statistics import AIStatistics | ||
| from ...storage.firewall_lists import FirewallLists | ||
| from ...storage.statistics import Statistics | ||
|
|
||
| # Import functions : | ||
| from .on_detected_attack import on_detected_attack | ||
| from .get_manager_info import get_manager_info | ||
| from .update_service_config import update_service_config | ||
| from .on_start import on_start | ||
|
|
@@ -58,7 +58,7 @@ def __init__(self, block, api, token, serverless): | |
|
|
||
| def start(self, event_scheduler): | ||
| """Send out start event and add heartbeats""" | ||
| res = self.on_start() | ||
| res = on_start(self) | ||
| if res.get("error", None) == "invalid_token": | ||
| logger.info( | ||
| "Token was invalid, not starting heartbeats and realtime polling." | ||
|
|
@@ -82,26 +82,37 @@ def report_initial_stats(self): | |
| if should_report_initial_stats: | ||
| self.send_heartbeat() | ||
|
|
||
| def on_detected_attack(self, attack, context, blocked, stack): | ||
| """This will send something to the API when an attack is detected""" | ||
| return on_detected_attack(self, attack, context, blocked, stack) | ||
|
|
||
| def on_start(self): | ||
| """This will send out an Event signalling the start to the server""" | ||
| return on_start(self) | ||
|
|
||
| def send_heartbeat(self): | ||
| """This will send a heartbeat to the server""" | ||
| return send_heartbeat(self) | ||
|
|
||
| def get_manager_info(self): | ||
| """This returns info about the connection_manager""" | ||
| return get_manager_info(self) | ||
|
|
||
| def update_service_config(self, res): | ||
| """Update configuration based on the server's response""" | ||
| return update_service_config(self, res) | ||
|
|
||
| def update_firewall_lists(self): | ||
| """Will update service config with blocklist of IP addresses""" | ||
| return update_firewall_lists(self) | ||
|
|
||
| def report_api_event(self, event): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Method Details✨ AI Reasoning 🔧 How do I fix it? More info - Comment |
||
| if not self.token: | ||
| return {"success": False, "error": "invalid_token"} | ||
| try: | ||
| payload = { | ||
| "time": get_unixtime_ms(), | ||
| "agent": get_manager_info(self), | ||
| } | ||
| payload.update(event) # Merge default fields with event fields | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. report_api_event merges the incoming 'event' dict via payload.update(event) without copying, causing unsafe access to shared mutable state across threads Details✨ AI Reasoning 🔧 How do I fix it? More info - Comment |
||
|
|
||
| result = self.api.report(self.token, payload, self.timeout_in_sec) | ||
| if not result.get("success", True): | ||
| logger.error( | ||
| "CloudConnectionManager: Reporting to api failed, error=%s", | ||
| result.get("error", "unknown"), | ||
| ) | ||
| return result | ||
| except Exception as e: | ||
| logger.debug(e) | ||
| logger.error( | ||
| "CloudConnectionManager: Reporting to api failed, unexpected error (see debug logs)" | ||
| ) | ||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,26 +5,16 @@ | |
|
|
||
|
|
||
| def on_start(connection_manager): | ||
| """ | ||
| This will send out an Event signalling the start to the server | ||
| """ | ||
| if not connection_manager.token: | ||
| return | ||
| res = connection_manager.api.report( | ||
| connection_manager.token, | ||
| { | ||
| "type": "started", | ||
| "time": get_unixtime_ms(), | ||
| "agent": connection_manager.get_manager_info(), | ||
| }, | ||
| connection_manager.timeout_in_sec, | ||
| ) | ||
| event = {"type": "started"} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function on_start no longer fails fast when connection_manager.token is missing; the prior early-return was removed, reducing clarity of control flow Details✨ AI Reasoning 🔧 How do I fix it? More info - Comment |
||
| res = connection_manager.report_api_event(event) | ||
|
|
||
| if not res.get("success", True): | ||
| # Update config time even in failure : | ||
| connection_manager.conf.last_updated_at = get_unixtime_ms() | ||
| logger.error("Failed to communicate with Aikido Server : %s", res["error"]) | ||
| connection_manager.conf.last_updated_at = ( | ||
| get_unixtime_ms() | ||
| ) # Update config time even in failure | ||
| else: | ||
| connection_manager.update_service_config(res) | ||
| connection_manager.update_firewall_lists() | ||
| logger.info("Established connection with Aikido Server") | ||
|
|
||
| return res | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,6 @@ | |
|
|
||
| from aikido_zen.background_process.packages import PackagesStore | ||
| from aikido_zen.helpers.logging import logger | ||
| from aikido_zen.helpers.get_current_unixtime_ms import get_unixtime_ms | ||
|
|
||
|
|
||
| def send_heartbeat(connection_manager): | ||
|
|
@@ -26,20 +25,16 @@ def send_heartbeat(connection_manager): | |
| connection_manager.ai_stats.clear() | ||
| PackagesStore.clear() | ||
|
|
||
| res = connection_manager.api.report( | ||
| connection_manager.token, | ||
| res = connection_manager.report_api_event( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed outbound heartbeat reporting: switched to connection_manager.report_api_event and removed 'time' and 'agent' from the heartbeat payload which may break the server-side API contract. Details🔧 How do I fix it? More info - Comment |
||
| { | ||
| "type": "heartbeat", | ||
| "time": get_unixtime_ms(), | ||
| "agent": connection_manager.get_manager_info(), | ||
| "stats": stats, | ||
| "ai": ai_stats, | ||
| "hostnames": outgoing_domains, | ||
| "packages": packages, | ||
| "routes": routes, | ||
| "users": users, | ||
| "middlewareInstalled": connection_manager.middleware_installed, | ||
| }, | ||
| connection_manager.timeout_in_sec, | ||
| } | ||
| ) | ||
| connection_manager.update_service_config(res) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,34 +1,34 @@ | ||
| """Commands __init__.py file""" | ||
|
|
||
| from aikido_zen.helpers.logging import logger | ||
| from .attack import process_attack | ||
| from aikido_zen.helpers.ipc.command_types import CommandContext | ||
| from .put_event import PutEventCommand | ||
| from .check_firewall_lists import process_check_firewall_lists | ||
| from .read_property import process_read_property | ||
| from .should_ratelimit import process_should_ratelimit | ||
| from .ping import process_ping | ||
| from .sync_data import process_sync_data | ||
|
|
||
| commands_map = { | ||
| # This maps to a tuple : (function, returns_data?) | ||
| # Commands that don't return data : | ||
| "ATTACK": (process_attack, False), | ||
| # Commands that return data : | ||
| "SYNC_DATA": (process_sync_data, True), | ||
| "READ_PROPERTY": (process_read_property, True), | ||
| "SHOULD_RATELIMIT": (process_should_ratelimit, True), | ||
| "PING": (process_ping, True), | ||
| "CHECK_FIREWALL_LISTS": (process_check_firewall_lists, True), | ||
| "SYNC_DATA": process_sync_data, | ||
| "READ_PROPERTY": process_read_property, | ||
| "SHOULD_RATELIMIT": process_should_ratelimit, | ||
| "PING": process_ping, | ||
| "CHECK_FIREWALL_LISTS": process_check_firewall_lists, | ||
| } | ||
|
|
||
| modern_commands = [PutEventCommand] | ||
|
|
||
|
|
||
| def process_incoming_command(connection_manager, obj, conn, queue): | ||
| """Processes an incoming command""" | ||
| action = obj[0] | ||
| data = obj[1] | ||
| if action in commands_map: | ||
| func, returns_data = commands_map[action] | ||
| if returns_data: | ||
| return conn.send(func(connection_manager, data, queue)) | ||
| func(connection_manager, data, queue) | ||
| else: | ||
| logger.debug("Command : `%s` not found, aborting", action) | ||
| inbound_identifier = obj[0] | ||
| inbound_request = obj[1] | ||
| if inbound_identifier in commands_map: | ||
| func = commands_map[inbound_identifier] | ||
| return conn.send(func(connection_manager, inbound_request)) | ||
|
|
||
| for cmd in modern_commands: | ||
| if cmd.identifier() == inbound_identifier: | ||
| cmd.run(CommandContext(connection_manager, queue, conn), inbound_request) | ||
| return None | ||
|
|
||
| logger.debug("Command : `%s` not found - did not execute", inbound_identifier) | ||
| return None |
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced explicit unpacking and on_detected_attack call with direct report_api_event(self.queue.get()) without documenting the new expected queue payload or rationale
Details
✨ AI Reasoning
1) The send_to_connection_manager method changed how queue items are processed: previously the code unpacked a 4-tuple and called connection_manager.on_detected_attack; the updated code now forwards queue.get() directly to connection_manager.report_api_event.
2) This is a meaningful behavioural change that affects the expected queue payload format and routing logic; the method has a docstring but no commentary about the new expected item shape or rationale for the changed dispatch, which reduces clarity for maintainers.
3) The change occurred in this diff (replaced explicit unpack-and-call with direct pass-through), so documentation was effectively reduced/worsened by this PR.
🔧 How do I fix it?
Add docstrings or comments explaining what the function does, its parameters, and return values.
More info - Comment
@AikidoSec feedback: [FEEDBACK]to get better review comments in the future.