Skip to content

Commit

Permalink
Stream injected compute events
Browse files Browse the repository at this point in the history
Pass compute events through a stream instead of injecting them into the
compute event log.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jan 29, 2025
1 parent aea5c7a commit 65e5dc1
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 81 deletions.
19 changes: 14 additions & 5 deletions src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::marker::PhantomData;
use std::rc::Rc;
use std::time::Duration;

use ::timely::container::ContainerBuilder;
use ::timely::container::{CapacityContainerBuilder, ContainerBuilder};
use ::timely::dataflow::channels::pact::Pipeline;
use ::timely::dataflow::channels::pushers::buffer::Session;
use ::timely::dataflow::channels::pushers::{Counter, Tee};
Expand All @@ -36,13 +36,26 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
use mz_timely_util::activator::RcActivator;
use mz_timely_util::containers::ColumnBuilder;
use mz_timely_util::operator::consolidate_pact;

use crate::logging::compute::Logger as ComputeLogger;
use crate::typedefs::RowRowAgent;

pub use crate::logging::initialize::initialize;

/// An update of value `D` at a time and with a diff.
pub(super) type Update<D> = (D, Timestamp, Diff);
/// A pusher for containers `C`.
pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
/// An output session for the specified container builder.
pub(super) type OutputSession<'a, CB> =
Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;
/// An output session for vector-based containers of updates `D`, using a capacity container builder.
pub(super) type OutputSessionVec<'a, D> = OutputSession<'a, CapacityContainerBuilder<Vec<D>>>;
/// An output session for columnar containers of updates `D`, using a column builder.
pub(super) type OutputSessionColumnar<'a, D> = OutputSession<'a, ColumnBuilder<D>>;

/// Logs events as a timely stream, with progress statements.
struct BatchLogger<C, P>
where
Expand Down Expand Up @@ -216,10 +229,6 @@ struct LogCollection {
token: Rc<dyn Any>,
}

pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
pub(super) type OutputSession<'a, CB> =
Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;

/// A single-purpose function to consolidate and pack updates for log collection.
///
/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts
Expand Down
56 changes: 24 additions & 32 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,20 @@ use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, GlobalId, Timestamp};
use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder};
use mz_timely_util::replay::MzReplay;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::Operator;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::{Concatenate, Enter, Operator};
use timely::dataflow::{Scope, Stream, StreamCore};
use timely::scheduling::Scheduler;
use timely::{Container, Data};
use tracing::error;
use uuid::Uuid;

use crate::extensions::arrange::MzArrange;
use crate::logging::{
ComputeLog, EventQueue, LogCollection, LogVariant, PermutedRowPacker, SharedLoggingState,
ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, OutputSessionVec,
PermutedRowPacker, SharedLoggingState, Update,
};
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
use crate::typedefs::RowRowSpine;
Expand Down Expand Up @@ -299,6 +297,7 @@ pub(super) fn construct<A: Scheduler + 'static, S: Scope<Timestamp = Timestamp>>
scheduler: A,
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
compute_event_stream: impl IntoIterator<Item = StreamCore<S, Column<(Duration, ComputeEvent)>>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
) -> BTreeMap<LogVariant, LogCollection> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
Expand All @@ -319,6 +318,12 @@ pub(super) fn construct<A: Scheduler + 'static, S: Scope<Timestamp = Timestamp>>
},
);

let logs = compute_event_stream
.into_iter()
.map(|stream| stream.enter(scope))
.chain(std::iter::once(logs));
let logs = scope.concatenate(logs);

// Build a demux operator that splits the replayed event stream up into the separate
// logging streams.
let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
Expand Down Expand Up @@ -663,34 +668,21 @@ struct ArrangementSizeState {
count: isize,
}

/// An update of value `D` at a time and with a diff.
type Update<D> = (D, Timestamp, Diff);
/// A pusher for updates of value `D` for vector-based containers.
type Pusher<D> = Counter<Timestamp, Vec<Update<D>>, Tee<Timestamp, Vec<Update<D>>>>;
/// A pusher for updates of value `D` for columnar containers.
type PusherColumnar<D> = Counter<Timestamp, Column<Update<D>>, Tee<Timestamp, Column<Update<D>>>>;
/// An output session for vector-based containers of updates `D`, using a capacity container builder.
type OutputSession<'a, D> =
Session<'a, Timestamp, CapacityContainerBuilder<Vec<Update<D>>>, Pusher<D>>;
/// An output session for columnar containers of updates `D`, using a column builder.
type OutputSessionColumnar<'a, D> =
Session<'a, Timestamp, ColumnBuilder<Update<D>>, PusherColumnar<D>>;

/// Bundled output sessions used by the demux operator.
struct DemuxOutput<'a> {
export: OutputSession<'a, ExportDatum>,
frontier: OutputSession<'a, FrontierDatum>,
import_frontier: OutputSession<'a, ImportFrontierDatum>,
peek: OutputSession<'a, PeekDatum>,
peek_duration: OutputSession<'a, PeekDurationDatum>,
shutdown_duration: OutputSession<'a, u128>,
arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>,
hydration_time: OutputSession<'a, HydrationTimeDatum>,
error_count: OutputSession<'a, ErrorCountDatum>,
lir_mapping: OutputSessionColumnar<'a, LirMappingDatum>,
dataflow_global_ids: OutputSession<'a, DataflowGlobalDatum>,
export: OutputSessionVec<'a, Update<ExportDatum>>,
frontier: OutputSessionVec<'a, Update<FrontierDatum>>,
import_frontier: OutputSessionVec<'a, Update<ImportFrontierDatum>>,
peek: OutputSessionVec<'a, Update<PeekDatum>>,
peek_duration: OutputSessionVec<'a, Update<PeekDurationDatum>>,
shutdown_duration: OutputSessionVec<'a, Update<u128>>,
arrangement_heap_size: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
arrangement_heap_capacity: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
arrangement_heap_allocations: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
hydration_time: OutputSessionVec<'a, Update<HydrationTimeDatum>>,
error_count: OutputSessionVec<'a, Update<ErrorCountDatum>>,
lir_mapping: OutputSessionColumnar<'a, Update<LirMappingDatum>>,
dataflow_global_ids: OutputSessionVec<'a, Update<DataflowGlobalDatum>>,
}

#[derive(Clone)]
Expand Down
47 changes: 29 additions & 18 deletions src/compute/src/logging/differential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,30 @@ use differential_dataflow::logging::{
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Timestamp};
use mz_timely_util::containers::{
columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder,
columnar_exchange, Col2ValBatcher, Column, ColumnBuilder, ProvidedBuilder,
};
use mz_timely_util::replay::MzReplay;
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::Leave;
use timely::dataflow::{Scope, Stream, StreamCore};

use crate::extensions::arrange::MzArrangeCore;
use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
use crate::logging::{
consolidate_and_pack, DifferentialLog, EventQueue, LogCollection, LogVariant,
SharedLoggingState,
OutputSessionColumnar, SharedLoggingState,
};
use crate::row_spine::RowRowBuilder;
use crate::typedefs::{KeyBatcher, RowRowSpine};

pub(super) struct Return<S: Scope> {
pub collections: BTreeMap<LogVariant, LogCollection>,
pub compute_events: StreamCore<S, Column<(Duration, ComputeEvent)>>,
}

/// Constructs the logging dataflow for differential logs.
///
/// Params
Expand All @@ -50,7 +56,7 @@ pub(super) fn construct<S: Scope<Timestamp = Timestamp>>(
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
) -> BTreeMap<LogVariant, LogCollection> {
) -> Return<S> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());

scope.scoped("Dataflow: differential logging", move |scope| {
Expand Down Expand Up @@ -82,6 +88,7 @@ pub(super) fn construct<S: Scope<Timestamp = Timestamp>>(
let (mut batcher_size_out, batcher_size) = demux.new_output();
let (mut batcher_capacity_out, batcher_capacity) = demux.new_output();
let (mut batcher_allocations_out, batcher_allocations) = demux.new_output();
let (mut compute_events_out, compute_events) = demux.new_output();

let mut demux_state = Default::default();
demux.build(move |_capability| {
Expand All @@ -93,6 +100,7 @@ pub(super) fn construct<S: Scope<Timestamp = Timestamp>>(
let mut batcher_size = batcher_size_out.activate();
let mut batcher_capacity = batcher_capacity_out.activate();
let mut batcher_allocations = batcher_allocations_out.activate();
let mut compute_events_out = compute_events_out.activate();

input.for_each(|cap, data| {
let mut output_buffers = DemuxOutput {
Expand All @@ -103,6 +111,7 @@ pub(super) fn construct<S: Scope<Timestamp = Timestamp>>(
batcher_size: batcher_size.session_with_builder(&cap),
batcher_capacity: batcher_capacity.session_with_builder(&cap),
batcher_allocations: batcher_allocations.session_with_builder(&cap),
compute_events: compute_events_out.session_with_builder(&cap),
};

for (time, event) in data.drain(..) {
Expand Down Expand Up @@ -157,7 +166,7 @@ pub(super) fn construct<S: Scope<Timestamp = Timestamp>>(
];

// Build the output arrangements.
let mut result = BTreeMap::new();
let mut collections = BTreeMap::new();
for (variant, collection) in logs {
let variant = LogVariant::Differential(variant);
if config.index_logs.contains_key(&variant) {
Expand All @@ -171,11 +180,11 @@ pub(super) fn construct<S: Scope<Timestamp = Timestamp>>(
trace,
token: Rc::clone(&token),
};
result.insert(variant, collection);
collections.insert(variant, collection);
}
}

result
Return { collections, compute_events: compute_events.leave() }
})
}

Expand All @@ -193,6 +202,7 @@ struct DemuxOutput<'a> {
batcher_size: OutputSession<'a, (usize, ())>,
batcher_capacity: OutputSession<'a, (usize, ())>,
batcher_allocations: OutputSession<'a, (usize, ())>,
compute_events: OutputSessionColumnar<'a, (Duration, ComputeEvent)>,
}

/// State maintained by the demux operator.
Expand Down Expand Up @@ -284,17 +294,18 @@ impl DemuxHandler<'_, '_> {
debug_assert_ne!(diff, 0);
self.output.sharing.give(((operator_id, ()), ts, diff));

if let Some(logger) = &mut self.shared_state.compute_logger {
let sharing = self.state.sharing.entry(operator_id).or_default();
*sharing = (i64::try_from(*sharing).expect("must fit") + diff)
.try_into()
.expect("under/overflow");
if *sharing == 0 {
self.state.sharing.remove(&operator_id);
logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
ArrangementHeapSizeOperatorDrop { operator_id },
));
}
let sharing = self.state.sharing.entry(operator_id).or_default();
*sharing = (i64::try_from(*sharing).expect("must fit") + diff)
.try_into()
.expect("under/overflow");
if *sharing == 0 {
self.state.sharing.remove(&operator_id);
self.output.compute_events.give(&(
self.time,
ComputeEvent::ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop {
operator_id,
}),
));
}
}

Expand Down
20 changes: 12 additions & 8 deletions src/compute/src/logging/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,32 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
let dataflow_index = self.worker.next_dataflow_index();
self.worker.dataflow_named("Dataflow: logging", |scope| {
let mut collections = BTreeMap::new();
collections.extend(super::timely::construct(
scope.clone(),
self.config,
self.t_event_queue.clone(),
Rc::clone(&self.shared_state),
));
let super::timely::Return {
collections: timely_collections,
compute_events: compute_events_timely,
} = super::timely::construct(scope.clone(), self.config, self.t_event_queue.clone());
collections.extend(timely_collections);
collections.extend(super::reachability::construct(
scope.clone(),
self.config,
self.r_event_queue.clone(),
));
collections.extend(super::differential::construct(
let super::differential::Return {
collections: differential_collections,
compute_events: compute_events_differential,
} = super::differential::construct(
scope.clone(),
self.config,
self.d_event_queue.clone(),
Rc::clone(&self.shared_state),
));
);
collections.extend(differential_collections);
collections.extend(super::compute::construct(
scope.clone(),
scope.parent.clone(),
self.config,
self.c_event_queue.clone(),
[compute_events_timely, compute_events_differential],
Rc::clone(&self.shared_state),
));

Expand Down
Loading

0 comments on commit 65e5dc1

Please sign in to comment.