diff --git a/app/src/main/config.ts b/app/src/main/config.ts index 8a3296717..630451408 100644 --- a/app/src/main/config.ts +++ b/app/src/main/config.ts @@ -79,6 +79,9 @@ export const getUserConfig = (path?: string) => { storedConfig.settings = { search_engine: 'google', embedding_model: 'multilingual_small', + embedding_batch_size: 64, + embedding_max_threads: 4, + embedding_max_connections: 8, tabs_orientation: 'vertical', app_style: 'light', use_semantic_search: false, @@ -340,6 +343,20 @@ export const getUserConfig = (path?: string) => { changedConfig = true } + // Embedding performance settings + if (storedConfig.settings.embedding_batch_size === undefined) { + storedConfig.settings.embedding_batch_size = 64 + changedConfig = true + } + if (storedConfig.settings.embedding_max_threads === undefined) { + storedConfig.settings.embedding_max_threads = 4 + changedConfig = true + } + if (storedConfig.settings.embedding_max_connections === undefined) { + storedConfig.settings.embedding_max_connections = 8 + changedConfig = true + } + if (changedConfig) { setUserConfig(storedConfig as UserConfig) } diff --git a/app/src/main/index.ts b/app/src/main/index.ts index 90d9c2ffd..1ccb24a54 100644 --- a/app/src/main/index.ts +++ b/app/src/main/index.ts @@ -146,7 +146,10 @@ const setupBackendServer = async (appPath: string, backendRootPath: string, user surfBackendManager = new SurfBackendServerManager(backendServerPath, [ backendRootPath, 'false', - isDev ? CONFIG.embeddingModelMode : userConfig.settings?.embedding_model + isDev ? CONFIG.embeddingModelMode : userConfig.settings?.embedding_model, + String(userConfig.settings?.embedding_batch_size || 64), + String(userConfig.settings?.embedding_max_threads || 4), + String(userConfig.settings?.embedding_max_connections || 8) ]) surfBackendManager diff --git a/packages/backend-server/Cargo.toml b/packages/backend-server/Cargo.toml index 0aa44c1a5..a63bdf382 100644 --- a/packages/backend-server/Cargo.toml +++ b/packages/backend-server/Cargo.toml @@ -19,6 +19,7 @@ uds_windows = "1.1.0" tracing = "0.1.40" tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } fastembed = { git = "https://github.com/deta/fastembed-rs", tag = "v3.14.1-patch.1", features = ["ort-download-binaries", "online"] } +rayon = "1.10.0" [dev-dependencies] serial_test = "3.2.0" diff --git a/packages/backend-server/src/embeddings/chunking.rs b/packages/backend-server/src/embeddings/chunking.rs index 29c5b0da3..806eca64a 100644 --- a/packages/backend-server/src/embeddings/chunking.rs +++ b/packages/backend-server/src/embeddings/chunking.rs @@ -72,7 +72,7 @@ mod tests { #[test] fn test_sanity_chunker() { - let chunker = ContentChunker::new(100, 1); + let chunker = ContentChunker::new(2500, 1); let content = "Within endurance running comes two different types of respiration. The more prominent side that runners experience more frequently is aerobic respiration. This occurs when oxygen is present, and the body can utilize oxygen to help generate energy and muscle activity. On the other side, anaerobic respiration occurs when the body is deprived of oxygen, and this is common towards the final stretch of races when there is a drive to speed up to a greater intensity. Overall, both types of respiration are used by endurance runners quite often but are very different from each other. \n Among mammals, humans are well adapted for running significant distances, particularly so among primates. The capacity for endurance running is also found in migratory ungulates and a limited number of terrestrial carnivores, such as bears, dogs, wolves, and hyenas. diff --git a/packages/backend-server/src/embeddings/model.rs b/packages/backend-server/src/embeddings/model.rs index d332e9a8b..754128378 100644 --- a/packages/backend-server/src/embeddings/model.rs +++ b/packages/backend-server/src/embeddings/model.rs @@ -32,6 +32,7 @@ pub struct EmbeddingModel { model_name: fastembed::EmbeddingModel, model: TextEmbedding, chunker: ContentChunker, + batch_size: usize, } fn new_fastembed_model( @@ -50,15 +51,16 @@ fn new_fastembed_model( } impl EmbeddingModel { - pub fn new_remote(cache_dir: &Path, mode: EmbeddingModelMode) -> BackendResult { + pub fn new_remote(cache_dir: &Path, mode: EmbeddingModelMode, batch_size: usize) -> BackendResult { let model_name: fastembed::EmbeddingModel = mode.into(); let model = new_fastembed_model(cache_dir, model_name.clone(), false)?; - let chunker = ContentChunker::new(2000, 1); + let chunker = ContentChunker::new(2500, 1); Ok(Self { model_name, model, chunker, + batch_size, }) } @@ -66,9 +68,9 @@ impl EmbeddingModel { TextEmbedding::get_model_info(&self.model_name).dim } - #[instrument(level = "debug", skip(self, sentences), fields(count = sentences.len()))] + #[instrument(level = "debug", skip(self, sentences), fields(count = sentences.len(), batch_size = self.batch_size))] pub fn encode(&self, sentences: &[String]) -> BackendResult>> { - self.model.embed(sentences.to_vec(), Some(1)).map_err(|e| { + self.model.embed(sentences.to_vec(), Some(self.batch_size)).map_err(|e| { error!("Failed to encode {} sentences: {}", sentences.len(), e); BackendError::GenericError(format!("Error encoding sentences: {}", e)) }) diff --git a/packages/backend-server/src/main.rs b/packages/backend-server/src/main.rs index 640e1a1dc..d696b4ec9 100644 --- a/packages/backend-server/src/main.rs +++ b/packages/backend-server/src/main.rs @@ -44,9 +44,9 @@ fn main() { .ok(); let args: Vec = std::env::args().collect(); - if args.len() != 4 { + if args.len() != 7 { eprintln!( - "Usage: {} ", + "Usage: {} ", args[0] ); std::process::exit(1); @@ -74,10 +74,31 @@ fn main() { std::process::exit(1); } }; + let batch_size: usize = match args[4].parse() { + Ok(size) => size, + Err(e) => { + eprintln!("Bad batch_size: {:#?}, error: {:#?}", args[4], e); + std::process::exit(1); + } + }; + let max_threads: usize = match args[5].parse() { + Ok(threads) => threads, + Err(e) => { + eprintln!("Bad max_threads: {:#?}, error: {:#?}", args[5], e); + std::process::exit(1); + } + }; + let max_connections: usize = match args[6].parse() { + Ok(connections) => connections, + Err(e) => { + eprintln!("Bad max_connections: {:#?}, error: {:#?}", args[6], e); + std::process::exit(1); + } + }; info!( - "started with socket_path: {:#?}, local_llm_mode: {:#?}", - socket_path, local_llm_mode + "started with socket_path: {:#?}, local_llm_mode: {:#?}, batch_size: {}, max_threads: {}, max_connections: {}", + socket_path, local_llm_mode, batch_size, max_threads, max_connections ); let server = LocalAIServer::new( &socket_path, @@ -85,6 +106,9 @@ fn main() { &model_cache_dir, local_llm_mode, embedding_model_mode, + batch_size, + max_threads, + max_connections, ) .expect("failed to create new server"); diff --git a/packages/backend-server/src/server/mod.rs b/packages/backend-server/src/server/mod.rs index 43c4db495..e1434e3a1 100644 --- a/packages/backend-server/src/server/mod.rs +++ b/packages/backend-server/src/server/mod.rs @@ -22,6 +22,7 @@ pub struct LocalAIServer { index_path: String, embedding_model: Arc, listener: UnixListener, + max_connections: usize, } impl LocalAIServer { @@ -32,6 +33,9 @@ impl LocalAIServer { model_cache_dir: &Path, local_llm: bool, embedding_model_mode: EmbeddingModelMode, + batch_size: usize, + max_threads: usize, + max_connections: usize, ) -> BackendResult { if socket_path.exists() { fs::remove_file(socket_path)?; @@ -45,9 +49,16 @@ impl LocalAIServer { )); } + // Configure rayon thread pool for parallel processing + rayon::ThreadPoolBuilder::new() + .num_threads(max_threads) + .build_global() + .map_err(|e| BackendError::GenericError(format!("Failed to configure thread pool: {}", e)))?; + let embedding_model = Arc::new(EmbeddingModel::new_remote( model_cache_dir, embedding_model_mode, + batch_size, )?); Ok(Self { @@ -55,6 +66,7 @@ impl LocalAIServer { index_path: index_path.to_string_lossy().to_string(), embedding_model, listener, + max_connections, }) } @@ -117,7 +129,7 @@ impl LocalAIServer { } pub fn listen(&self) { - info!(socket_path = ?self.socket_path, "server starting"); + info!(socket_path = ?self.socket_path, max_connections = self.max_connections, "server starting"); let (tx, rx) = mpsc::channel(); let index_path = self.index_path.clone(); @@ -127,17 +139,32 @@ impl LocalAIServer { Self::handle_main_thread_messages(rx, &index_path, &embedding_dim) }); + // Create a channel-based semaphore to limit concurrent connections + let (permit_tx, permit_rx) = mpsc::sync_channel(self.max_connections); + // Fill the channel with permits + for _ in 0..self.max_connections { + let _ = permit_tx.send(()); + } + info!("listening for incoming connections"); for stream in self.listener.incoming() { match stream { Ok(stream) => { let embedding_model = Arc::clone(&self.embedding_model); let tx = tx.clone(); + let permit_tx_clone = permit_tx.clone(); + let permit_rx_clone = permit_rx.clone(); std::thread::spawn(move || { + // Acquire permit before processing (blocks if none available) + let _permit = permit_rx_clone.recv(); + if let Err(e) = handle_client(tx, &embedding_model, stream) { error!(?e, "client handler error"); } + + // Return permit when done + let _ = permit_tx_clone.send(()); }); } Err(e) => { diff --git a/packages/backend/src/worker/handlers/resource.rs b/packages/backend/src/worker/handlers/resource.rs index cd15596bd..3ab03ea65 100644 --- a/packages/backend/src/worker/handlers/resource.rs +++ b/packages/backend/src/worker/handlers/resource.rs @@ -556,7 +556,12 @@ impl Worker { // NOTE: for Note content type for performance reasons we do not generate the embeddings // right away as updates are too frequent but instead do it lazily only when we need it // we thefore add a tag to the resource indicating that the resource needs post processing - if content_type == ResourceTextContentType::Note { + // Use lazy embeddings for Notes, PDFs, Documents, and Articles to reduce CPU load + if content_type == ResourceTextContentType::Note + || content_type == ResourceTextContentType::PDF + || content_type == ResourceTextContentType::Document + || content_type == ResourceTextContentType::Article + { let generate_embeddings_tag = ResourceTag::new_generate_lazy_embeddings(&resource_id); Database::create_resource_tag_tx(&mut tx, &generate_embeddings_tag)?; tx.commit()?; diff --git a/packages/types/src/config.types.ts b/packages/types/src/config.types.ts index fbd4c41e8..67d9651fa 100644 --- a/packages/types/src/config.types.ts +++ b/packages/types/src/config.types.ts @@ -9,6 +9,9 @@ export type UserConfig = { export type UserSettings = { embedding_model: 'english_small' | 'english_large' | 'multilingual_small' | 'multilingual_large' + embedding_batch_size: number + embedding_max_threads: number + embedding_max_connections: number tabs_orientation: 'vertical' | 'horizontal' app_style: 'light' | 'dark' // Note intentionally used app_style as "app_theme" would be themes in the future? use_semantic_search: boolean