Skip to content

Commit

Permalink
ref(rust): Add --python-max-queue-depth option
Browse files Browse the repository at this point in the history
The python message processor is not using all cores. Up until now we
thought that was because check_for_results blocks too often, and tried
to fix that in #4703, reverted in
#4757. But perhaps all we have to
do is to raise MessageRejected less often?
  • Loading branch information
untitaker committed Nov 15, 2023
1 parent fa3cf88 commit c821f74
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 8 deletions.
4 changes: 4 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub fn consumer(
skip_write: bool,
concurrency: usize,
use_rust_processor: bool,
python_max_queue_depth: Option<usize>
) {
py.allow_threads(|| {
consumer_impl(
Expand All @@ -39,6 +40,7 @@ pub fn consumer(
skip_write,
concurrency,
use_rust_processor,
python_max_queue_depth,
)
});
}
Expand All @@ -50,6 +52,7 @@ pub fn consumer_impl(
skip_write: bool,
concurrency: usize,
use_rust_processor: bool,
python_max_queue_depth: Option<usize>,
) {
setup_logging();

Expand Down Expand Up @@ -135,6 +138,7 @@ pub fn consumer_impl(
max_batch_time,
skip_write,
concurrency,
python_max_queue_depth,
use_rust_processor,
)),
);
Expand Down
4 changes: 4 additions & 0 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct ConsumerStrategyFactory {
max_batch_time: Duration,
skip_write: bool,
concurrency: usize,
python_max_queue_depth: Option<usize>,
use_rust_processor: bool,
}

Expand All @@ -35,6 +36,7 @@ impl ConsumerStrategyFactory {
max_batch_time: Duration,
skip_write: bool,
concurrency: usize,
python_max_queue_depth: Option<usize>,
use_rust_processor: bool,
) -> Self {
Self {
Expand All @@ -44,6 +46,7 @@ impl ConsumerStrategyFactory {
max_batch_time,
skip_write,
concurrency,
python_max_queue_depth,
use_rust_processor,
}
}
Expand Down Expand Up @@ -140,6 +143,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
PythonTransformStep::new(
self.storage_config.message_processor.clone(),
self.concurrency,
self.python_max_queue_depth,
next_step,
)
.unwrap(),
Expand Down
12 changes: 4 additions & 8 deletions rust_snuba/src/strategies/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ pub struct PythonTransformStep {
handles: VecDeque<TaskHandle>,
message_carried_over: Option<Message<BytesInsertBatch>>,
processing_pool: Option<procspawn::Pool>,
max_queue_depth: usize,
}

impl PythonTransformStep {
pub fn new<N>(
processor_config: MessageProcessorConfig,
processes: usize,
max_queue_depth: Option<usize>,
next_step: N,
) -> Result<Self, Error>
where
Expand Down Expand Up @@ -73,6 +75,7 @@ impl PythonTransformStep {
handles: VecDeque::new(),
message_carried_over: None,
processing_pool,
max_queue_depth: max_queue_depth.unwrap_or(processes)
})
}

Expand Down Expand Up @@ -131,17 +134,10 @@ impl ProcessingStrategy<KafkaPayload> for PythonTransformStep {
}

fn submit(&mut self, message: Message<KafkaPayload>) -> Result<(), SubmitError<KafkaPayload>> {
self.check_for_results();

// if there are a lot of "queued" messages (=messages waiting for a free process), let's
// not enqueue more.
//
// this threshold was chosen arbitrarily with no performance measuring, and we may also
// check for queued_count() > 0 instead, but the rough idea of comparing with
// active_count() is that we allow for one pending message per process, so that procspawn
// is able to keep CPU saturation high.
if let Some(ref processing_pool) = self.processing_pool {
if processing_pool.queued_count() > processing_pool.active_count() {
if processing_pool.queued_count() > self.max_queue_depth {
log::debug!("python strategy provides backpressure");
return Err(SubmitError::MessageRejected(MessageRejected { message }));
}
Expand Down
8 changes: 8 additions & 0 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@
default=None,
help="Kafka group instance id. passing a value here will run kafka with static membership.",
)
@click.option(
"--python-max-queue-depth",
type=int,
default=None,
help="How many messages should be queued up before the Python message processor before backpressure kicks in. Defaults to the number of processes.",
)
def rust_consumer(
*,
storage_names: Sequence[str],
Expand All @@ -125,6 +131,7 @@ def rust_consumer(
concurrency: Optional[int],
use_rust_processor: bool,
group_instance_id: Optional[str],
python_max_queue_depth: Optional[int],
) -> None:
"""
Experimental alternative to `snuba consumer`
Expand Down Expand Up @@ -165,4 +172,5 @@ def rust_consumer(
skip_write,
concurrency_override or concurrency or 1,
use_rust_processor,
python_max_queue_depth,
)

0 comments on commit c821f74

Please sign in to comment.