Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
.vscode/

# Ruff stuff:
.ruff_cache/

Expand Down
121 changes: 100 additions & 21 deletions veadk/integrations/ve_tos/ve_tos.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from io import StringIO
import asyncio
import os
from datetime import datetime
from typing import TYPE_CHECKING, Union, List, Optional
from io import StringIO
from typing import TYPE_CHECKING, List, Optional, Union
from urllib.parse import urlparse
from veadk.utils.misc import getenv

from veadk.consts import DEFAULT_TOS_BUCKET_NAME
from veadk.utils.logger import get_logger
from veadk.utils.misc import getenv

if TYPE_CHECKING:
pass
Expand Down Expand Up @@ -178,7 +179,18 @@ def _set_cors_rules(self, bucket_name: str) -> bool:
return False

def _build_object_key_for_file(self, data_path: str) -> str:
"""generate TOS object key"""
"""Builds the TOS object key and URL for the given parameters.

Args:
user_id (str): User ID
app_name (str): App name
session_id (str): Session ID
data_path (str): Data path

Returns:
tuple[str, str]: Object key and TOS URL.
"""

parsed_url = urlparse(data_path)

# Generate object key
Expand Down Expand Up @@ -244,14 +256,33 @@ def build_tos_url(self, object_key: str, bucket_name: str = "") -> str:

# deprecated
def upload(
self, data: Union[str, bytes], bucket_name: str = "", object_key: str = ""
self,
data: Union[str, bytes],
bucket_name: str = "",
object_key: str = "",
metadata: dict | None = None,
):
"""Uploads data to TOS.

Args:
data (Union[str, bytes]): The data to upload, either as a file path or raw bytes.
bucket_name (str): The name of the TOS bucket to upload to.
object_key (str): The object key for the uploaded data.
metadata (dict | None, optional): Metadata to associate with the object. Defaults to None.

Raises:
ValueError: If the data type is unsupported.
"""
if isinstance(data, str):
# data is a file path
return asyncio.to_thread(self.upload_file, data, bucket_name, object_key)
return asyncio.to_thread(
self.upload_file, data, bucket_name, object_key, metadata
)
elif isinstance(data, bytes):
# data is bytes content
return asyncio.to_thread(self.upload_bytes, data, bucket_name, object_key)
return asyncio.to_thread(
self.upload_bytes, data, bucket_name, object_key, metadata
)
else:
error_msg = f"Upload failed: data type error. Only str (file path) and bytes are supported, got {type(data)}"
logger.error(error_msg)
Expand All @@ -275,14 +306,19 @@ def _ensure_client_and_bucket(self, bucket_name: str) -> bool:
return True

def upload_text(
self, text: str, bucket_name: str = "", object_key: str = ""
self,
text: str,
bucket_name: str = "",
object_key: str = "",
metadata: dict | None = None,
) -> None:
"""Upload text content to TOS bucket

Args:
text: Text content to upload
bucket_name: TOS bucket name
object_key: Object key, auto-generated if None
metadata: Metadata to associate with the object
"""
bucket_name = self._check_bucket_name(bucket_name)
if not object_key:
Expand All @@ -292,7 +328,9 @@ def upload_text(
return
data = StringIO(text)
try:
self._client.put_object(bucket=bucket_name, key=object_key, content=data)
self._client.put_object(
bucket=bucket_name, key=object_key, content=data, meta=metadata
)
logger.debug(f"Upload success, object_key: {object_key}")
return
except Exception as e:
Expand All @@ -302,14 +340,19 @@ def upload_text(
data.close()

async def async_upload_text(
self, text: str, bucket_name: str = "", object_key: str = ""
self,
text: str,
bucket_name: str = "",
object_key: str = "",
metadata: dict | None = None,
) -> None:
"""Asynchronously upload text content to TOS bucket

Args:
text: Text content to upload
bucket_name: TOS bucket name
object_key: Object key, auto-generated if None
metadata: Metadata to associate with the object
"""
bucket_name = self._check_bucket_name(bucket_name)
if not object_key:
Expand All @@ -325,6 +368,7 @@ async def async_upload_text(
bucket=bucket_name,
key=object_key,
content=data,
meta=metadata,
)
logger.debug(f"Async upload success, object_key: {object_key}")
return
Expand All @@ -335,14 +379,19 @@ async def async_upload_text(
data.close()

def upload_bytes(
self, data: bytes, bucket_name: str = "", object_key: str = ""
self,
data: bytes,
bucket_name: str = "",
object_key: str = "",
metadata: dict | None = None,
) -> None:
"""Upload byte data to TOS bucket

Args:
data: Byte data to upload
bucket_name: TOS bucket name
object_key: Object key, auto-generated if None
metadata: Metadata to associate with the object
"""
bucket_name = self._check_bucket_name(bucket_name)
if not object_key:
Expand All @@ -351,22 +400,29 @@ def upload_bytes(
if not self._ensure_client_and_bucket(bucket_name):
return
try:
self._client.put_object(bucket=bucket_name, key=object_key, content=data)
self._client.put_object(
bucket=bucket_name, key=object_key, content=data, meta=metadata
)
logger.debug(f"Upload success, object_key: {object_key}")
return
except Exception as e:
logger.error(f"Upload failed: {e}")
return

async def async_upload_bytes(
self, data: bytes, bucket_name: str = "", object_key: str = ""
self,
data: bytes,
bucket_name: str = "",
object_key: str = "",
metadata: dict | None = None,
) -> None:
"""Asynchronously upload byte data to TOS bucket

Args:
data: Byte data to upload
bucket_name: TOS bucket name
object_key: Object key, auto-generated if None
metadata: Metadata to associate with the object
"""
bucket_name = self._check_bucket_name(bucket_name)
if not object_key:
Expand All @@ -381,6 +437,7 @@ async def async_upload_bytes(
bucket=bucket_name,
key=object_key,
content=data,
meta=metadata,
)
logger.debug(f"Async upload success, object_key: {object_key}")
return
Expand All @@ -389,14 +446,19 @@ async def async_upload_bytes(
return

def upload_file(
self, file_path: str, bucket_name: str = "", object_key: str = ""
self,
file_path: str,
bucket_name: str = "",
object_key: str = "",
metadata: dict | None = None,
) -> None:
"""Upload file to TOS bucket

Args:
file_path: Local file path
bucket_name: TOS bucket name
object_key: Object key, auto-generated if None
metadata: Metadata to associate with the object
"""
bucket_name = self._check_bucket_name(bucket_name)
if not object_key:
Expand All @@ -406,7 +468,7 @@ def upload_file(
return
try:
self._client.put_object_from_file(
bucket=bucket_name, key=object_key, file_path=file_path
bucket=bucket_name, key=object_key, file_path=file_path, meta=metadata
)
logger.debug(f"Upload success, object_key: {object_key}")
return
Expand All @@ -415,14 +477,19 @@ def upload_file(
return

async def async_upload_file(
self, file_path: str, bucket_name: str = "", object_key: str = ""
self,
file_path: str,
bucket_name: str = "",
object_key: str = "",
metadata: dict | None = None,
) -> None:
"""Asynchronously upload file to TOS bucket

Args:
file_path: Local file path
bucket_name: TOS bucket name
object_key: Object key, auto-generated if None
metadata: Metadata to associate with the object
"""
bucket_name = self._check_bucket_name(bucket_name)
if not object_key:
Expand All @@ -437,6 +504,7 @@ async def async_upload_file(
bucket=bucket_name,
key=object_key,
file_path=file_path,
meta=metadata,
)
logger.debug(f"Async upload success, object_key: {object_key}")
return
Expand All @@ -449,13 +517,15 @@ def upload_files(
file_paths: List[str],
bucket_name: str = "",
object_keys: Optional[List[str]] = None,
metadata: dict | None = None,
) -> None:
"""Upload multiple files to TOS bucket

Args:
file_paths: List of local file paths
bucket_name: TOS bucket name
object_keys: List of object keys, auto-generated if empty or length mismatch
metadata: Metadata to associate with the object
"""
bucket_name = self._check_bucket_name(bucket_name)

Expand All @@ -475,7 +545,7 @@ def upload_files(
try:
for file_path, object_key in zip(file_paths, object_keys):
# Note: upload_file method doesn't return value, we use exceptions to determine success
self.upload_file(file_path, bucket_name, object_key)
self.upload_file(file_path, bucket_name, object_key, metadata=metadata)
return
except Exception as e:
logger.error(f"Upload files failed: {str(e)}")
Expand All @@ -486,13 +556,15 @@ async def async_upload_files(
file_paths: List[str],
bucket_name: str = "",
object_keys: Optional[List[str]] = None,
metadata: dict | None = None,
) -> None:
"""Asynchronously upload multiple files to TOS bucket

Args:
file_paths: List of local file paths
bucket_name: TOS bucket name
object_keys: List of object keys, auto-generated if empty or length mismatch
metadata: Metadata to associate with the object
"""
bucket_name = self._check_bucket_name(bucket_name)

Expand All @@ -517,19 +589,23 @@ async def async_upload_files(
bucket=bucket_name,
key=object_key,
file_path=file_path,
metadata=metadata,
)
logger.debug(f"Async upload success, object_key: {object_key}")
return
except Exception as e:
logger.error(f"Async upload files failed: {str(e)}")
return

def upload_directory(self, directory_path: str, bucket_name: str = "") -> None:
def upload_directory(
self, directory_path: str, bucket_name: str = "", metadata: dict | None = None
) -> None:
"""Upload entire directory to TOS bucket

Args:
directory_path: Local directory path
bucket_name: TOS bucket name
metadata: Metadata to associate with the objects
"""
bucket_name = self._check_bucket_name(bucket_name)

Expand All @@ -543,7 +619,7 @@ def _upload_dir(root_dir):
# Use relative path of file as object key
object_key = os.path.relpath(path, directory_path)
# upload_file method doesn't return value, use exceptions to determine success
self.upload_file(path, bucket_name, object_key)
self.upload_file(path, bucket_name, object_key, metadata=metadata)

try:
_upload_dir(directory_path)
Expand All @@ -554,13 +630,14 @@ def _upload_dir(root_dir):
raise

async def async_upload_directory(
self, directory_path: str, bucket_name: str = ""
self, directory_path: str, bucket_name: str = "", metadata: dict | None = None
) -> None:
"""Asynchronously upload entire directory to TOS bucket

Args:
directory_path: Local directory path
bucket_name: TOS bucket name
metadata: Metadata to associate with the objects
"""
bucket_name = self._check_bucket_name(bucket_name)

Expand All @@ -574,7 +651,9 @@ async def _aupload_dir(root_dir):
# Use relative path of file as object key
object_key = os.path.relpath(path, directory_path)
# Asynchronously upload single file
await self.async_upload_file(path, bucket_name, object_key)
await self.async_upload_file(
path, bucket_name, object_key, metadata=metadata
)

try:
await _aupload_dir(directory_path)
Expand Down
8 changes: 4 additions & 4 deletions veadk/knowledgebase/backends/base_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ def precheck_index_naming(self) -> None:
"""

@abstractmethod
def add_from_directory(self, directory: str, **kwargs) -> bool:
def add_from_directory(self, directory: str, *args, **kwargs) -> bool:
"""Add knowledge from file path to knowledgebase"""

@abstractmethod
def add_from_files(self, files: list[str], **kwargs) -> bool:
def add_from_files(self, files: list[str], *args, **kwargs) -> bool:
"""Add knowledge (e.g, documents, strings, ...) to knowledgebase"""

@abstractmethod
def add_from_text(self, text: str | list[str], **kwargs) -> bool:
def add_from_text(self, text: str | list[str], *args, **kwargs) -> bool:
"""Add knowledge from text to knowledgebase"""

@abstractmethod
def search(self, **kwargs) -> list:
def search(self, *args, **kwargs) -> list:
"""Search knowledge from knowledgebase"""

# Optional methods for future use:
Expand Down
Loading