Skip to content

Commit

Permalink
drop inconsistency branch
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 7, 2025
1 parent 147d1f3 commit 0f5e5ab
Showing 1 changed file with 1 addition and 15 deletions.
16 changes: 1 addition & 15 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use super::join::hash_join::*;
use super::join::row::JoinRow;
use super::join::*;
use super::watermark::*;
use crate::consistency::enable_strict_consistency;
use crate::executor::join::builder::JoinStreamChunkBuilder;
use crate::executor::join::hash_join::CacheResult;
use crate::executor::prelude::*;
Expand Down Expand Up @@ -788,19 +787,6 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
Ok(watermarks_to_emit)
}

/// the data the hash table and match the coming
/// data chunk with the executor state
async fn hash_eq_match(
key: &K,
ht: &mut JoinHashMap<K, S>,
) -> StreamExecutorResult<CacheResult> {
if enable_strict_consistency() {
Ok(ht.take_state_opt(key))
} else {
ht.take_state(key).await.map(CacheResult::Hit)
}
}

fn row_concat(
row_update: impl Row,
update_start_pos: usize,
Expand Down Expand Up @@ -898,7 +884,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
let build_non_null_requirement_satisfied =
key.null_bitmap().is_subset(side_match.ht.null_matched());
if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
Self::hash_eq_match(key, &mut side_match.ht).await?
side_match.ht.take_state_opt(key)
} else {
CacheResult::NeverMatch
}
Expand Down

0 comments on commit 0f5e5ab

Please sign in to comment.