Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import base, external
55 changes: 55 additions & 0 deletions src/langbot/pkg/api/http/controller/groups/knowledge/external.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import quart
from ... import group


@group.group_class('external_knowledge_base', '/api/v1/knowledge/external-bases')
class ExternalKnowledgeBaseRouterGroup(group.RouterGroup):
async def initialize(self) -> None:
@self.route('', methods=['POST', 'GET'])
async def handle_external_knowledge_bases() -> quart.Response:
if quart.request.method == 'GET':
external_kbs = await self.ap.knowledge_service.get_external_knowledge_bases()
return self.success(data={'bases': external_kbs})

elif quart.request.method == 'POST':
json_data = await quart.request.json
kb_uuid = await self.ap.knowledge_service.create_external_knowledge_base(json_data)
return self.success(data={'uuid': kb_uuid})

return self.http_status(405, -1, 'Method not allowed')

@self.route(
'/<kb_uuid>',
methods=['GET', 'DELETE', 'PUT'],
)
async def handle_specific_external_knowledge_base(kb_uuid: str) -> quart.Response:
if quart.request.method == 'GET':
external_kb = await self.ap.knowledge_service.get_external_knowledge_base(kb_uuid)

if external_kb is None:
return self.http_status(404, -1, 'external knowledge base not found')

return self.success(
data={
'base': external_kb,
}
)

elif quart.request.method == 'PUT':
json_data = await quart.request.json
await self.ap.knowledge_service.update_external_knowledge_base(kb_uuid, json_data)
return self.success({})

elif quart.request.method == 'DELETE':
await self.ap.knowledge_service.delete_external_knowledge_base(kb_uuid)
return self.success({})

@self.route(
'/<kb_uuid>/retrieve',
methods=['POST'],
)
async def retrieve_external_knowledge_base(kb_uuid: str) -> str:
json_data = await quart.request.json
query = json_data.get('query')
results = await self.ap.knowledge_service.retrieve_knowledge_base(kb_uuid, query)
return self.success(data={'results': results})
82 changes: 79 additions & 3 deletions src/langbot/pkg/api/http/service/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,26 @@ async def store_file(self, kb_uuid: str, file_id: str) -> int:
runtime_kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if runtime_kb is None:
raise Exception('Knowledge base not found')
# Only internal KBs support file storage
if runtime_kb.get_type() != 'internal':
raise Exception('Only internal knowledge bases support file storage')
return await runtime_kb.store_file(file_id)

async def retrieve_knowledge_base(self, kb_uuid: str, query: str) -> list[dict]:
"""检索知识库"""
runtime_kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if runtime_kb is None:
raise Exception('Knowledge base not found')
return [
result.model_dump() for result in await runtime_kb.retrieve(query, runtime_kb.knowledge_base_entity.top_k)
]

# Get top_k based on KB type
if runtime_kb.get_type() == 'internal':
top_k = runtime_kb.knowledge_base_entity.top_k
elif runtime_kb.get_type() == 'external':
top_k = runtime_kb.external_kb_entity.top_k
else:
top_k = 5 # default fallback

return [result.model_dump() for result in await runtime_kb.retrieve(query, top_k)]

async def get_files_by_knowledge_base(self, kb_uuid: str) -> list[dict]:
"""获取知识库文件"""
Expand All @@ -95,6 +105,9 @@ async def delete_file(self, kb_uuid: str, file_id: str) -> None:
runtime_kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if runtime_kb is None:
raise Exception('Knowledge base not found')
# Only internal KBs support file deletion
if runtime_kb.get_type() != 'internal':
raise Exception('Only internal knowledge bases support file deletion')
await runtime_kb.delete_file(file_id)

async def delete_knowledge_base(self, kb_uuid: str) -> None:
Expand All @@ -118,3 +131,66 @@ async def delete_knowledge_base(self, kb_uuid: str) -> None:
await self.ap.persistence_mgr.execute_async(
sqlalchemy.delete(persistence_rag.File).where(persistence_rag.File.uuid == file.uuid)
)

# External Knowledge Base methods
async def get_external_knowledge_bases(self) -> list[dict]:
"""获取所有外部知识库"""
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_rag.ExternalKnowledgeBase)
)
external_kbs = result.all()
return [
self.ap.persistence_mgr.serialize_model(persistence_rag.ExternalKnowledgeBase, external_kb)
for external_kb in external_kbs
]

async def get_external_knowledge_base(self, kb_uuid: str) -> dict | None:
"""获取外部知识库"""
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_rag.ExternalKnowledgeBase).where(
persistence_rag.ExternalKnowledgeBase.uuid == kb_uuid
)
)
external_kb = result.first()
if external_kb is None:
return None
return self.ap.persistence_mgr.serialize_model(persistence_rag.ExternalKnowledgeBase, external_kb)

async def create_external_knowledge_base(self, kb_data: dict) -> str:
"""创建外部知识库"""
kb_data['uuid'] = str(uuid.uuid4())
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_rag.ExternalKnowledgeBase).values(kb_data)
)

kb = await self.get_external_knowledge_base(kb_data['uuid'])

await self.ap.rag_mgr.load_external_knowledge_base(kb)

return kb_data['uuid']

async def update_external_knowledge_base(self, kb_uuid: str, kb_data: dict) -> None:
"""更新外部知识库"""
if 'uuid' in kb_data:
del kb_data['uuid']

await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_rag.ExternalKnowledgeBase)
.values(kb_data)
.where(persistence_rag.ExternalKnowledgeBase.uuid == kb_uuid)
)
await self.ap.rag_mgr.remove_knowledge_base_from_runtime(kb_uuid)

kb = await self.get_external_knowledge_base(kb_uuid)

await self.ap.rag_mgr.load_external_knowledge_base(kb)

async def delete_external_knowledge_base(self, kb_uuid: str) -> None:
"""删除外部知识库"""
await self.ap.rag_mgr.delete_knowledge_base(kb_uuid)

await self.ap.persistence_mgr.execute_async(
sqlalchemy.delete(persistence_rag.ExternalKnowledgeBase).where(
persistence_rag.ExternalKnowledgeBase.uuid == kb_uuid
)
)
11 changes: 11 additions & 0 deletions src/langbot/pkg/entity/persistence/rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ class Chunk(Base):
text = sqlalchemy.Column(sqlalchemy.Text)


class ExternalKnowledgeBase(Base):
__tablename__ = 'external_knowledge_bases'
uuid = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True, unique=True)
name = sqlalchemy.Column(sqlalchemy.String, index=True)
description = sqlalchemy.Column(sqlalchemy.Text)
api_url = sqlalchemy.Column(sqlalchemy.String, nullable=False)
api_key = sqlalchemy.Column(sqlalchemy.String, nullable=True)
created_at = sqlalchemy.Column(sqlalchemy.DateTime, default=sqlalchemy.func.now())
top_k = sqlalchemy.Column(sqlalchemy.Integer, default=5)


# class Vector(Base):
# __tablename__ = 'knowledge_base_vectors'
# uuid = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True, unique=True)
Expand Down
10 changes: 9 additions & 1 deletion src/langbot/pkg/provider/runners/localagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ async def run(
self.ap.logger.warning(f'Knowledge base {kb_uuid} not found, skipping')
continue

result = await kb.retrieve(user_message_text, kb.knowledge_base_entity.top_k)
# Get top_k based on KB type
if kb.get_type() == 'internal':
top_k = kb.knowledge_base_entity.top_k
elif kb.get_type() == 'external':
top_k = kb.external_kb_entity.top_k
else:
top_k = 5 # default fallback

result = await kb.retrieve(user_message_text, top_k)

if result:
all_results.extend(result)
Expand Down
55 changes: 55 additions & 0 deletions src/langbot/pkg/rag/knowledge/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Base classes and interfaces for knowledge bases"""
from __future__ import annotations

import abc
import typing

from langbot.pkg.core import app
from langbot.pkg.entity.rag import retriever as retriever_entities


class KnowledgeBaseInterface(metaclass=abc.ABCMeta):
"""Abstract interface for all knowledge base types"""

ap: app.Application

def __init__(self, ap: app.Application):
self.ap = ap

@abc.abstractmethod
async def initialize(self):
"""Initialize the knowledge base"""
pass

@abc.abstractmethod
async def retrieve(self, query: str, top_k: int) -> list[retriever_entities.RetrieveResultEntry]:
"""Retrieve relevant documents from the knowledge base

Args:
query: The query string
top_k: Number of top results to return

Returns:
List of retrieve result entries
"""
pass

@abc.abstractmethod
def get_uuid(self) -> str:
"""Get the UUID of the knowledge base"""
pass

@abc.abstractmethod
def get_name(self) -> str:
"""Get the name of the knowledge base"""
pass

@abc.abstractmethod
def get_type(self) -> str:
"""Get the type of knowledge base (internal/external)"""
pass

@abc.abstractmethod
async def dispose(self):
"""Clean up resources"""
pass
137 changes: 137 additions & 0 deletions src/langbot/pkg/rag/knowledge/external.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""External knowledge base implementation"""
from __future__ import annotations

import aiohttp
import typing

from langbot.pkg.core import app
from langbot.pkg.entity.persistence import rag as persistence_rag
from langbot.pkg.entity.rag import retriever as retriever_entities
from .base import KnowledgeBaseInterface


class ExternalKnowledgeBase(KnowledgeBaseInterface):
"""External knowledge base that queries via HTTP API"""

external_kb_entity: persistence_rag.ExternalKnowledgeBase

def __init__(self, ap: app.Application, external_kb_entity: persistence_rag.ExternalKnowledgeBase):
super().__init__(ap)
self.external_kb_entity = external_kb_entity

async def initialize(self):
"""Initialize the external knowledge base"""
pass

async def retrieve(self, query: str, top_k: int) -> list[retriever_entities.RetrieveResultEntry]:
"""Retrieve documents from external knowledge base via HTTP API

The API should follow this format:
POST {api_url}
Content-Type: application/json
Authorization: Bearer {api_key} (if api_key is provided)

Request body:
{
"query": "user query text",
"top_k": 5
}

Response format:
{
"records": [
{
"content": "document text content",
"score": 0.95,
"title": "optional document title",
"metadata": {}
}
]
}
"""
try:
headers = {
'Content-Type': 'application/json'
}

if self.external_kb_entity.api_key:
headers['Authorization'] = f'Bearer {self.external_kb_entity.api_key}'

request_data = {
'query': query,
'top_k': top_k
}

timeout = aiohttp.ClientTimeout(total=30)

async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
self.external_kb_entity.api_url,
json=request_data,
headers=headers
) as response:
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(
f'External KB API error: status={response.status}, body={error_text}'
)
return []

response_data = await response.json()

# Parse response
records = response_data.get('records', [])
results = []

for record in records:
content = record.get('content', '')
score = record.get('score', 0.0)
title = record.get('title', '')
metadata = record.get('metadata', {})

# Build metadata for result
result_metadata = {
'text': content,
'score': score,
'source': 'external_kb',
'kb_uuid': self.external_kb_entity.uuid,
'kb_name': self.external_kb_entity.name,
}

if title:
result_metadata['title'] = title

# Merge additional metadata
result_metadata.update(metadata)

results.append(
retriever_entities.RetrieveResultEntry(
score=score,
metadata=result_metadata
)
)

return results

except aiohttp.ClientError as e:
self.ap.logger.error(f'External KB HTTP error: {e}')
return []
except Exception as e:
self.ap.logger.error(f'External KB retrieval error: {e}')
return []

def get_uuid(self) -> str:
"""Get the UUID of the external knowledge base"""
return self.external_kb_entity.uuid

def get_name(self) -> str:
"""Get the name of the external knowledge base"""
return self.external_kb_entity.name

def get_type(self) -> str:
"""Get the type of knowledge base"""
return 'external'

async def dispose(self):
"""Clean up resources - no cleanup needed for external KB"""
pass
Loading
Loading