diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index 9cf8844c8d94..ee588aa329d0 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod flow; + use std::borrow::Borrow; use std::hash::Hash; use std::sync::Arc; @@ -24,12 +26,12 @@ use crate::cache_invalidator::{CacheInvalidator, Context}; use crate::error::{self, Error, Result}; use crate::instruction::CacheIdent; -type TokenFilter = Box) -> Vec + Send + Sync>; +pub type TokenFilter = Box) -> Vec + Send + Sync>; -type Invalidator = +pub type Invalidator = Box>, Vec) -> BoxFuture<'static, Result<()>> + Send + Sync>; -type Init = Arc BoxFuture<'_, Result>> + Send + Sync>; +pub type Init = Arc BoxFuture<'_, Result>> + Send + Sync>; pub type AdvancedCacheRef = Arc>; diff --git a/src/common/meta/src/cache/flow.rs b/src/common/meta/src/cache/flow.rs new file mode 100644 index 000000000000..51b16d8999e6 --- /dev/null +++ b/src/common/meta/src/cache/flow.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod table_flownode; +pub use table_flownode::{new_table_flownode_set_cache, TableFlownodeSetCache}; diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs new file mode 100644 index 000000000000..d94118a72f0b --- /dev/null +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -0,0 +1,200 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use futures::future::BoxFuture; +use futures::TryStreamExt; +use moka::future::Cache; +use moka::ops::compute::Op; +use table::metadata::TableId; + +use crate::cache::{AdvancedCache, Init}; +use crate::error::Result; +use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; +use crate::key::flow::{TableFlowManager, TableFlowManagerRef}; +use crate::kv_backend::KvBackendRef; +use crate::FlownodeId; + +type FlownodeSet = HashSet; + +/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping. +pub type TableFlownodeSetCache = AdvancedCache; + +/// Constructs a [TableFlownodeSetCache]. +pub fn new_table_flownode_set_cache( + cache: Cache, + kv_backend: KvBackendRef, +) -> TableFlownodeSetCache { + let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend)); + let init = init_factory(table_flow_manager); + + AdvancedCache::new(cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory(table_flow_manager: TableFlowManagerRef) -> Init { + Arc::new(move |&table_id| { + let table_flow_manager = table_flow_manager.clone(); + Box::pin(async move { + table_flow_manager + .flows(table_id) + .map_ok(|key| key.flownode_id()) + .try_collect::>() + .await + .map(Some) + }) + }) +} + +async fn invalidate_create_flow( + cache: &Cache, + CreateFlow { + source_table_ids, + flownode_ids, + }: CreateFlow, +) { + for table_id in source_table_ids { + let entry = cache.entry(table_id); + entry + .and_compute_with( + async |entry: Option>>| match entry { + Some(entry) => { + let mut set = entry.into_value(); + set.extend(flownode_ids.clone()); + + Op::Put(set) + } + None => Op::Put(HashSet::from_iter(flownode_ids.clone())), + }, + ) + .await; + } +} + +async fn invalidate_drop_flow( + cache: &Cache, + DropFlow { + source_table_ids, + flownode_ids, + }: DropFlow, +) { + for table_id in source_table_ids { + let entry = cache.entry(table_id); + entry + .and_compute_with( + async |entry: Option>>| match entry { + Some(entry) => { + let mut set = entry.into_value(); + for flownode_id in &flownode_ids { + set.remove(flownode_id); + } + + Op::Put(set) + } + None => { + // Do nothing + Op::Nop + } + }, + ) + .await; + } +} + +fn invalidator( + cache: Arc>, + caches: Vec, +) -> BoxFuture<'static, Result<()>> { + Box::pin(async move { + for ident in caches { + match ident { + CacheIdent::CreateFlow(create_flow) => { + invalidate_create_flow(&cache, create_flow).await + } + CacheIdent::DropFlow(drop_flow) => invalidate_drop_flow(&cache, drop_flow).await, + _ => {} + } + } + + Ok(()) + }) +} + +fn filter(caches: Vec) -> Vec { + caches + .into_iter() + .filter(|cache| matches!(cache, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))) + .collect() +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::sync::Arc; + + use moka::future::CacheBuilder; + + use crate::cache::flow::table_flownode::new_table_flownode_set_cache; + use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; + use crate::kv_backend::memory::MemoryKvBackend; + + #[tokio::test] + async fn test_create_flow() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_flownode_set_cache(cache, mem_kv); + let ident = vec![CacheIdent::CreateFlow(CreateFlow { + source_table_ids: vec![1024, 1025], + flownode_ids: vec![1, 2, 3, 4, 5], + })]; + cache.invalidate(ident).await.unwrap(); + let set = cache.get(&1024).await.unwrap().unwrap(); + assert_eq!(set.len(), 5); + let set = cache.get(&1025).await.unwrap().unwrap(); + assert_eq!(set.len(), 5); + } + + #[tokio::test] + async fn test_drop_flow() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_flownode_set_cache(cache, mem_kv); + let ident = vec![ + CacheIdent::CreateFlow(CreateFlow { + source_table_ids: vec![1024, 1025], + flownode_ids: vec![1, 2, 3, 4, 5], + }), + CacheIdent::CreateFlow(CreateFlow { + source_table_ids: vec![1024, 1025], + flownode_ids: vec![11, 12], + }), + ]; + cache.invalidate(ident).await.unwrap(); + let set = cache.get(&1024).await.unwrap().unwrap(); + assert_eq!(set.len(), 7); + let set = cache.get(&1025).await.unwrap().unwrap(); + assert_eq!(set.len(), 7); + + let ident = vec![CacheIdent::DropFlow(DropFlow { + source_table_ids: vec![1024, 1025], + flownode_ids: vec![1, 2, 3, 4, 5], + })]; + cache.invalidate(ident).await.unwrap(); + let set = cache.get(&1024).await.unwrap().unwrap(); + assert_eq!(set, HashSet::from([11, 12])); + let set = cache.get(&1025).await.unwrap().unwrap(); + assert_eq!(set, HashSet::from([11, 12])); + } +} diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index fb62e7a61a25..f547040b9a5c 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -112,6 +112,10 @@ where let key: SchemaNameKey = (&schema_name).into(); self.invalidate_key(&key.to_bytes()).await; } + CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => { + // TODO(weny): implements it + unimplemented!() + } } } Ok(()) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 7981f2449d35..79a641edc2c6 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -23,7 +23,7 @@ use table::metadata::TableId; use crate::key::schema_name::SchemaName; use crate::table_name::TableName; -use crate::{ClusterId, DatanodeId}; +use crate::{ClusterId, DatanodeId, FlownodeId}; #[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct RegionIdent { @@ -152,12 +152,26 @@ pub struct UpgradeRegion { pub wait_for_replay_timeout: Option, } -#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] /// The identifier of cache. pub enum CacheIdent { TableId(TableId), TableName(TableName), SchemaName(SchemaName), + CreateFlow(CreateFlow), + DropFlow(DropFlow), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CreateFlow { + pub source_table_ids: Vec, + pub flownode_ids: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DropFlow { + pub source_table_ids: Vec, + pub flownode_ids: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]