-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: de-duplicate operations received in quick succession from c8y/devicecontrol/notifications #3454
fix: de-duplicate operations received in quick succession from c8y/devicecontrol/notifications #3454
Conversation
Codecov ReportAttention: Patch coverage is
📢 Thoughts on this report? Let us know! 🚀 New features to boost your workflow:
|
Robot Results
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue that exists with this solution is a (admittedly likely very slow) memory leak caused by the mapper storing the operation IDs indefinitely. Ideally, we should delete these at some point where we know the operation is no longer marked as pending, though I'm not sure how to do this.
Although clearing this entry when the operation transitions to successful/failed
states is good enough for majority of the cases, I understand that it is not a fool-proof solution as there is still the possibility of getting a duplicate message while these state transition messages are still in transit to the cloud (or buffered for processing either locally or on the cloud). But since the terminal state transitions usually happen after the executing
transition has already happened(which changes the PENDING
status of the op in the cloud), the risk is reduced even further, although not fully eliminated.
I'm in favour of risking a duplicate operation execution in such rare cases(where the duplicate is delivered even after the terminal state transition is published), compared to the risk of that slow memory leak.
@@ -190,6 +190,8 @@ pub struct CumulocityConverter { | |||
|
|||
supported_operations: SupportedOperations, | |||
pub operation_handler: OperationHandler, | |||
|
|||
processed_ids: HashSet<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we not re-use the active_commands
set? Or you avoided that because the entries in that set are cleared on the terminal state transition of those operations and you didn't want these entries cleared so soon?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the doc comment for active_commands
this sounds indeed as the correct place for that fix.
However, this raises an other point: why do we have this issue with duplicated commands while there is already a mechanism supposed to handle that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think active_commands
is a broken solution to the problem. I think there we are de-duplicating c8y/devicecontrol/notifications
, but we are only tracking an active command once we receive the relevant tedge-topic command message, which will happen a short while later. I think this is what leaves open the window for an operation to be duplicated.
Assuming I've understood correctly, that would indicate that the problem could simply be solved by moving the active_commands
insertion to where I'm currently inserting to processed_ids
, and deleting the processed_ids
stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming I've understood correctly, that would indicate that the problem could simply be solved by moving the
active_commands
insertion to where I'm currently inserting toprocessed_ids
, and deleting theprocessed_ids
stuff?
One point is sure: one should keep only a single de-duplication mechanism. What you propose makes sense: it's better to remove duplicates before any processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have this issue with duplicated commands while there is already a mechanism supposed to handle that?
For the cases where the duplicate messages are delivered after a restart, it is the lack of persistence of this set. But for duplicate messages delivered while the mapper is still live, this should have been sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've now modified active_commands
to insert immediately post-conversion, rather than when we receive our outgoing message. As a result, I've deleted processed_ids
.
I don't see how this can happen as a device is not expected to receive an operation that it hasn't declared as a supported operation (this is my expectation from C8Y) via the operation capability registration. |
Operation can be created via API which don't first look at which operations are supported by the device or not...so in cases of automation, it is not so uncommon for operations to be sent to devices as it is automation will assume that the an unsupported operation will be rejected by the agent (and this is deemed cheaper rather than backend service first checking each device if it supports the intended operation)...this is fairly common in large device fleets (> 200K). |
let original = MqttMessage::new(&Topic::new_unchecked("c8y/devicecontrol/notifications"), json!( | ||
{"delivery":{"log":[],"time":"2025-03-05T08:49:24.986Z","status":"PENDING"},"agentId":"1916574062","creationTime":"2025-03-05T08:49:24.967Z","deviceId":"1916574062","id":"16574089","status":"PENDING","c8y_Restart":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} | ||
).to_string()); | ||
let redelivery = MqttMessage::new(&Topic::new_unchecked("c8y/devicecontrol/notifications"), json!( | ||
{"delivery":{"log":[{"time":"2025-03-05T08:49:24.986Z","status":"PENDING"},{"time":"2025-03-05T08:49:25.000Z","status":"SEND"},{"time":"2025-03-05T08:49:25.162Z","status":"DELIVERED"}],"time":"2025-03-05T08:49:25.707Z","status":"PENDING"},"agentId":"1916574062","creationTime":"2025-03-05T08:49:24.967Z","deviceId":"1916574062","id":"16574089","status":"PENDING","c8y_Restart":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} | ||
).to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to make obvious that original
and redelivery
only differ on the delivery
field`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've now removed some of the extraneous fields and changed things so we send the original message twice, since the converter doesn't care about the delivery
field.
} | ||
|
||
#[tokio::test] | ||
async fn custom_operations_are_not_deduplicated_before_registration() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I struggle to understand what is checked by this test and how.
CumulocityConverter.supported_operations
field is patched before the first request then restored before the second. Okay, but why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tests is checking what happens if we receive a custom operation that is unrecognised and later get redelivered the custom operation after it is registered with the mapper. If the converter naïvely assumes that the operation is active after we first receive it, the de-duplication mechanism will ignore the redelivery. But since we haven't yet processed this message, we should process such a redelivery. This is obviously dependent on something sending a 500 message once the operation is registered, but that could be the case for some custom operation handling service.
The patching of the supported_operations
was intended as an easy way of ensuring the registered operations are made clear, since I'm not trying to test how we update supported_operations
in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is clearer. Might be good to add this response as a comment to the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this is fine. But seeing the detailed discussion above, it felt like this case could have been better represented as an integration test in tests.rs
, where we can better simulate the dynamic custom operation registration during the execution of the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I want the opposite of that, @albinsuresh. The point of this being a unit test is I don't also want to test the operation registration logic at the same time. I've added a comment to explain what it is I'm trying to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look fine. Some minor suggestions on the tests.
assert_ne!( | ||
converter | ||
.parse_json_custom_operation_topic(&original) | ||
.await | ||
.unwrap(), | ||
vec![], | ||
"Initial operation delivery produces outgoing message" | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A slightly stricter check that validates at least the cmd
topic would have been better than this non-empty output check. To avoid false positives like a converted error message sent to te/errors
instead of the expected cmd
beating this assertion.
|
||
converter.supported_operations = after_registration; | ||
|
||
assert_ne!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above, regarding a stricter check.
} | ||
|
||
#[tokio::test] | ||
async fn custom_operations_are_not_deduplicated_before_registration() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this is fine. But seeing the detailed discussion above, it felt like this case could have been better represented as an integration test in tests.rs
, where we can better simulate the dynamic custom operation registration during the execution of the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two forgotten dbg!
to be removed and some questions.
@@ -1399,6 +1410,21 @@ impl CumulocityConverter { | |||
&mut self, | |||
message: &MqttMessage, | |||
) -> Result<Vec<MqttMessage>, ConversionError> { | |||
if dbg!(self.active_commands_last_cleared.elapsed(&*self.clock)) > Duration::from_secs(3600) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a matter of taste: I would prefer an elapsed
or elapsed_since
method on the clock rather than the instant:
if dbg!(self.active_commands_last_cleared.elapsed(&*self.clock)) > Duration::from_secs(3600) | |
if self.clock.elapsed_since(&self.active_commands_last_cleared) > Duration::from_secs(3600) |
PS: dbg!
to be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look much simpler now. I've gone one concern though, regarding the premature eviction from the cache, before the operation really completes.
@@ -1403,6 +1406,20 @@ impl CumulocityConverter { | |||
&mut self, | |||
message: &MqttMessage, | |||
) -> Result<Vec<MqttMessage>, ConversionError> { | |||
if self.active_commands_last_cleared.elapsed() > Duration::from_secs(3600) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was actually thinking that you'd use the timer actor from the mapper to get timeout notification message at desired intervals. But, it was smart to place the eviction logic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
e50c720
to
288ab5e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-confirm my approval.
Redid time-based expiry logic
Signed-off-by: James Rhodes <[email protected]>
…for active_commands
Signed-off-by: James Rhodes <[email protected]>
288ab5e
to
0f2d597
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved
Proposed changes
Fixes the handling of in-progress operations by the Cumulocity converter so that the de-duplication mechanism is applied immediately. Since it takes a small amount of time between triggering the operation and the converter receiving the
te/...
topic message (which was when the operation was marked as "active" in the converter), there was a race that could lead to duplicate messages from Cumulocity both being processed by the converter.Specifically, this PR changes the converter to mark the operation as "active" as it is initially handled by the converter. This has two advantages: firstly, this fixes the aforementioned race condition, and it additionally means that legacy custom operations based on smartrest are also de-duplicated. Since the operation doesn't have an associated
te/...
topic, these "active" operations expire after 12 hours. For workflow-based/built-in operations, the "active" operation will be discarded by the mapper only when it is marked as complete on the relevantte/...
topic. The de-duplication will work across mapper restarts (assuming MQTT broker persistence is approriately configured), as it did before.Types of changes
Paste Link to the issue
Checklist
just prepare-dev
once)cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments