-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Moving queue size and making node flume queue bigger #724
base: main
Are you sure you want to change the base?
Conversation
After double checking, it seems that we're back to the problem that if Meaning that the current I think we should replace our current I have opened #726 to temporarely ignore this issue. I have double checked that this actually does not create memory leak as DropToken are well reported back to the origin nodes cleaning up the shared memory. |
} | ||
let event = loop { | ||
if !self.queue.is_empty() { | ||
let event = self.queue.remove(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call copies all items in the queue to move them one slot left. It would be better to make self.queue
a VecDeque
instead of Vec
and use the more efficient push_back
/pop_front
operations.
break; | ||
} | ||
} else { | ||
match select(Delay::new(Duration::from_nanos(1000)), self.receiver.next()).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use the flume::Receiver::try_recv
function here, then we don't need the timeout dance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're using self receiver which is a RecvStream
. There is no such function. Replacing it with a receiver would need a significant refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can prepare a PR against this PR if you like.
match &event { | ||
EventItem::TimeoutError(_) => break Some(event), | ||
EventItem::FatalError(_) => break Some(event), | ||
EventItem::NodeEvent { | ||
event: node_event, | ||
ack_channel: _, | ||
} => match node_event { | ||
NodeEvent::Stop => break Some(event), | ||
NodeEvent::Reload { operator_id: _ } => break Some(event), | ||
NodeEvent::InputClosed { id: _ } => break Some(event), | ||
NodeEvent::AllInputsClosed => break Some(event), | ||
NodeEvent::Input { | ||
id: current_event_id, | ||
metadata: _, | ||
data: _, | ||
} => { | ||
if self | ||
.queue | ||
.iter() | ||
.filter(|queue_event| { | ||
if let EventItem::NodeEvent { | ||
event: | ||
NodeEvent::Input { | ||
id, | ||
data: _, | ||
metadata: _, | ||
}, | ||
ack_channel: _, | ||
} = queue_event | ||
{ | ||
id == current_event_id | ||
} else { | ||
false | ||
} | ||
}) | ||
.count() | ||
> self | ||
.input_config | ||
.get(current_event_id) | ||
.map(|input| input.queue_size.unwrap_or(1)) | ||
.unwrap_or(1) | ||
- 1 | ||
{ | ||
continue; | ||
} else { | ||
break Some(event); | ||
} | ||
} | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we reuse the code from the drop_oldest_inputs
function from binaries/daemon/src/node_communication/mod.rs
here? This function has the advantage that we do the cleanup for all input IDs in a single traversal of the list. For readability I would prefer to keep the cleanup code in a separate function.
We should also to the cleanup directly after receiving all the events. Then we only need to traverse the queue once per batch of events, instead of once per event.
In addition to reducing the number of queue traversals, this approach also has the advantage that we drop events earlier, thereby freeing memory and allowing the sender to cleanup/reuse the shared memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand. I have rewriten drop_oldest_inputs
with a scheduler in #728 in a way that we go through the queue only once and we create fairness between inputs.
This new scheduler should free memory as soon as possible.
Moving queue size to node fixes long latency issues for python node and make it possible to set the right queue_size.
I also changed flume queue length within nodes to makes this possible.
This seems to fix some of the graceful stop issue in Python but further investigation is required.