diff --git a/packages/baseai/package.json b/packages/baseai/package.json index f31fb54a..1d9e00a6 100644 --- a/packages/baseai/package.json +++ b/packages/baseai/package.json @@ -113,4 +113,4 @@ "langbase.com", "generative AI" ] -} \ No newline at end of file +} diff --git a/packages/baseai/src/deploy/document.ts b/packages/baseai/src/deploy/document.ts index 484adbca..9c7b112c 100644 --- a/packages/baseai/src/deploy/document.ts +++ b/packages/baseai/src/deploy/document.ts @@ -7,7 +7,7 @@ import { handleError, handleInvalidConfig, listMemoryDocuments, - uploadDocumentsToMemory, + uploadDocumentsToMemory } from '.'; import path from 'path'; import fs from 'fs/promises'; @@ -19,7 +19,10 @@ import { } from '@/utils/memory/load-memory-files'; import type { MemoryI } from 'types/memory'; import { compareDocumentLists } from '@/utils/memory/compare-docs-list'; -import { retrieveAuthentication, type Account } from '@/utils/retrieve-credentials'; +import { + retrieveAuthentication, + type Account +} from '@/utils/retrieve-credentials'; type Spinner = ReturnType; @@ -114,11 +117,18 @@ async function deployDocument({ process.exit(1); } + // Fetch the existing documents + const prodDocs = await listMemoryDocuments({ + account, + memoryName + }); + await handleSingleDocDeploy({ memory: memoryObject, account, document, - overwrite + overwrite, + prodDocs }); spinner.stop( @@ -139,23 +149,19 @@ export async function handleSingleDocDeploy({ memory, account, document, - overwrite + overwrite, + prodDocs }: { memory: MemoryI; account: Account; document: MemoryDocumentI; overwrite: boolean; + prodDocs: string[]; }) { p.log.info( `Checking "${memory.name}" memory for document "${document.name}".` ); - // Fetch the existing documents - const prodDocs = await listMemoryDocuments({ - account, - memoryName: memory.name - }); - // If overwrite is present, deploy. if (overwrite) { await uploadDocumentsToMemory({ @@ -163,9 +169,9 @@ export async function handleSingleDocDeploy({ documents: [document], name: memory.name }); - p.log.success( - `Document "${document.name}" uploaded to memory "${memory.name}".` - ); + // p.log.success( + // `Document "${document.name}" uploaded to memory "${memory.name}".` + // ); return; } @@ -185,9 +191,9 @@ export async function handleSingleDocDeploy({ documents: [document], name: memory.name }); - p.log.success( - `Document "${document.name}" uploaded to memory "${memory.name}".` - ); + // p.log.success( + // `Document "${document.name}" uploaded to memory "${memory.name}".` + // ); return; } diff --git a/packages/baseai/src/deploy/index.ts b/packages/baseai/src/deploy/index.ts index 8e2a64ba..718aca20 100644 --- a/packages/baseai/src/deploy/index.ts +++ b/packages/baseai/src/deploy/index.ts @@ -577,12 +577,6 @@ export async function upsertMemory({ p.log.info( `Memory "${memory.name}" already exists. Updating changed documents.` ); - await handleGitSyncMemoryDeploy({ - memory, - account, - documents, - overwrite - }); if (docsToDelete?.length > 0) { await deleteDocumentsFromMemory({ @@ -592,6 +586,13 @@ export async function upsertMemory({ }); } + await handleGitSyncMemoryDeploy({ + memory, + account, + documents, + overwrite + }); + await updateDeployedCommitHash(memory.name); p.log.info( @@ -643,24 +644,43 @@ export async function uploadDocumentsToMemory({ name: string; account: Account; }) { - for (const doc of documents) { - try { - p.log.message(`Uploading document: ${doc.name} ....`); - await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting - const signedUrl = await getSignedUploadUrl({ - documentName: doc.name, - memoryName: name, - account, - meta: doc.meta - }); + const BATCH_SIZE = 5; // Number of concurrent uploads + const RATE_LIMIT_DELAY = 1500; // 1.5 second delay between requests + + // Process documents in batches to avoid rate limiting + for (let i = 0; i < documents.length; i += BATCH_SIZE) { + const batch = documents.slice(i, i + BATCH_SIZE); + + const batchUploadPromises = batch.map(async (doc, index) => { + try { + // Stagger requests within batch + await new Promise(resolve => + setTimeout(resolve, index * RATE_LIMIT_DELAY) + ); - const uploadResponse = await uploadDocument(signedUrl, doc.blob); - dlog(`Upload response status: ${uploadResponse.status}`); + // p.log.message(`Uploading document: ${doc.name} ....`); + const signedUrl = await getSignedUploadUrl({ + documentName: doc.name, + memoryName: name, + account, + meta: doc.meta + }); - p.log.message(`Uploaded document: ${doc.name}`); - } catch (error) { - throw error; - } + const uploadResponse = await uploadDocument( + signedUrl, + doc.blob + ); + dlog(`Upload response status: ${uploadResponse.status}`); + + p.log.message(`Uploaded document: ${doc.name}`); + } catch (error: any) { + throw new Error( + `Failed to upload ${doc.name}: ${error.message ?? error}` + ); + } + }); + + await Promise.all(batchUploadPromises); } } @@ -673,25 +693,37 @@ export async function deleteDocumentsFromMemory({ name: string; account: Account; }) { - p.log.info(`Deleting documents from memory: ${name}`); + const BATCH_SIZE = 5; // Number of concurrent uploads + const RATE_LIMIT_DELAY = 1500; // 1.5 second delay between requests - for (const doc of documents) { - try { - p.log.message(`Deleting document: ${doc} ....`); - await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting + p.log.info(`Deleting ${documents.length} documents from memory: ${name}`); - const deleteResponse = await deleteDocument({ - documentName: doc, - memoryName: name, - account - }); + for (let i = 0; i < documents.length; i += BATCH_SIZE) { + const batch = documents.slice(i, i + BATCH_SIZE); + const batchPromises = batch.map(async (doc, index) => { + try { + await new Promise(resolve => + setTimeout(resolve, index * RATE_LIMIT_DELAY) + ); - dlog(`Delete response status: ${deleteResponse.status}`); + // p.log.message(`Deleting document: ${doc}`); + const deleteResponse = await deleteDocument({ + documentName: doc, + memoryName: name, + account + }); - p.log.message(`Deleted document: ${doc}`); - } catch (error) { - throw error; - } + dlog(`Delete response status: ${deleteResponse.status}`); + p.log.message(`Deleted document: ${doc}`); + return deleteResponse; + } catch (error: any) { + throw new Error( + `Failed to delete ${doc}: ${error.message ?? error}` + ); + } + }); + + await Promise.all(batchPromises); } p.log.info(`Deleted documents from memory: ${name}`); } @@ -1091,14 +1123,32 @@ export async function handleGitSyncMemoryDeploy({ documents: MemoryDocumentI[]; overwrite: boolean; }) { - for (const doc in documents) { - await new Promise(resolve => setTimeout(resolve, 800)); // To avoid rate limiting - await handleSingleDocDeploy({ - memory, - account, - document: documents[doc], - overwrite: true // TODO: Implement overwrite for git-sync memories + const BATCH_SIZE = 5; + const RATE_LIMIT_DELAY = 1500; + + // Fetch existing documents once + const prodDocs = await listMemoryDocuments({ + account, + memoryName: memory.name + }); + + // Process in batches + for (let i = 0; i < documents.length; i += BATCH_SIZE) { + const batch = documents.slice(i, i + BATCH_SIZE); + const batchPromises = batch.map(async (doc, index) => { + await new Promise(resolve => + setTimeout(resolve, index * RATE_LIMIT_DELAY) + ); + return handleSingleDocDeploy({ + memory, + account, + document: doc, + overwrite: true, + prodDocs + }); }); + + await Promise.all(batchPromises); } } diff --git a/packages/baseai/src/utils/memory/load-memory-files.ts b/packages/baseai/src/utils/memory/load-memory-files.ts index e643309e..9570f863 100644 --- a/packages/baseai/src/utils/memory/load-memory-files.ts +++ b/packages/baseai/src/utils/memory/load-memory-files.ts @@ -65,8 +65,6 @@ export const loadMemoryFilesFromCustomDir = async ({ process.exit(1); } - console.log('Reading documents in memory...'); - // Get all files that match the glob patterns and are tracked by git let allFiles: string[]; try { diff --git a/packages/core/package.json b/packages/core/package.json index 108327e2..61eb0758 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -119,4 +119,4 @@ "langbase.com", "generative AI" ] -} \ No newline at end of file +}