Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ID Mismatch Error in VectorDB During Evaluation #1033 #1056

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
12 changes: 9 additions & 3 deletions autorag/nodes/retrieval/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,30 @@ def cast_queries(queries: Union[str, List[str]]) -> List[str]:


def evenly_distribute_passages(
ids: List[List[str]], scores: List[List[float]], top_k: int
) -> Tuple[List[str], List[float]]:
ids: List[List[str]],
scores: List[List[float]],
contents: List[List[str]],
top_k: int,
) -> Tuple[List[str], List[float], List[str]]:
assert len(ids) == len(scores), "ids and scores must have same length."
query_cnt = len(ids)
avg_len = top_k // query_cnt
remainder = top_k % query_cnt

new_ids = []
new_scores = []
new_contents = []
for i in range(query_cnt):
if i < remainder:
new_ids.extend(ids[i][: avg_len + 1])
new_scores.extend(scores[i][: avg_len + 1])
new_contents.extend(contents[i][: avg_len + 1])
else:
new_ids.extend(ids[i][:avg_len])
new_scores.extend(scores[i][:avg_len])
new_contents.extend(contents[i][:avg_len])

return new_ids, new_scores
return new_ids, new_scores, new_contents


def get_bm25_pkl_name(bm25_tokenizer: str):
Expand Down
5 changes: 4 additions & 1 deletion autorag/nodes/retrieval/bm25.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,11 @@ async def bm25_pure(
id_result.append(ids)
score_result.append(sorted_scores[:top_k])

# dummy contents
dummy_contents = [["" for _ in _id_result] for _id_result in id_result]

# make a total result to top_k
id_result, score_result = evenly_distribute_passages(id_result, score_result, top_k)
id_result, score_result, content_result = evenly_distribute_passages(id_result, score_result, dummy_contents, top_k)
# sort id_result and score_result by score
result = [
(_id, score)
Expand Down
38 changes: 25 additions & 13 deletions autorag/nodes/retrieval/vectordb.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,18 @@ def __del__(self):
def pure(self, previous_result: pd.DataFrame, *args, **kwargs):
queries = self.cast_to_run(previous_result)
pure_params = pop_params(self._pure, kwargs)
ids, scores = self._pure(queries, **pure_params)
contents = fetch_contents(self.corpus_df, ids)
ids, scores, contents = self._pure(queries, **pure_params)

ids = [[_ for _ in _id] for _id in ids]
scores = [[_ for _ in score] for score in scores]

# TODO: Refactor to a single logic that can handle all situations.
if pure_params.get("ids", None) is not None:
contents = []
contents = fetch_contents(self.corpus_df, ids)
else:
contents = [[_ for _ in content] for content in contents]

return contents, ids, scores

def _pure(
Expand All @@ -79,7 +89,7 @@ def _pure(
top_k: int,
embedding_batch: int = 128,
ids: Optional[List[List[str]]] = None,
) -> Tuple[List[List[str]], List[List[float]]]:
) -> Tuple[List[List[str]], List[List[float]], List[List[str]]]:
"""
VectorDB retrieval function.
You have to get a chroma collection that is already ingested.
Expand Down Expand Up @@ -113,7 +123,8 @@ def _pure(
)
id_result = list(map(lambda x: x[0], results))
score_result = list(map(lambda x: x[1], results))
return id_result, score_result
content_result = list(map(lambda x: x[2], results))
return id_result, score_result, content_result

def __get_ids_scores(self, queries, ids, embedding_batch: int):
# truncate queries and embedding execution here.
Expand Down Expand Up @@ -162,12 +173,12 @@ async def run_fetch(ids):
content_embeddings,
)
)
return ids, score_result
return ids, score_result, queries


async def vectordb_pure(
queries: List[str], top_k: int, vectordb: BaseVectorStore
) -> Tuple[List[str], List[float]]:
) -> Tuple[List[str], List[float], List[str]]:
"""
Async VectorDB retrieval function.
Its usage is for async retrieval of vector_db row by row.
Expand All @@ -177,19 +188,20 @@ async def vectordb_pure(
:param vectordb: The vector store instance.
:return: The tuple contains a list of passage ids that are retrieved from vectordb and a list of its scores.
"""
id_result, score_result = await vectordb.query(queries=queries, top_k=top_k)
id_result, score_result, content_result = await vectordb.query(queries=queries, top_k=top_k)

# Distribute passages evenly
id_result, score_result = evenly_distribute_passages(id_result, score_result, top_k)
id_result, score_result, content_result = evenly_distribute_passages(id_result, score_result, content_result, top_k)
# sort id_result and score_result by score
result = [
(_id, score)
for score, _id in sorted(
zip(score_result, id_result), key=lambda pair: pair[0], reverse=True
(_id, score, content)
for score, _id, content in sorted(
zip(score_result, id_result, content_result), key=lambda pair: pair[0], reverse=True
)
]
id_result, score_result = zip(*result)
return list(id_result), list(score_result)
id_result, score_result, content_result = zip(*result)
return list(id_result), list(score_result), list(content_result)



async def filter_exist_ids(
Expand Down
2 changes: 1 addition & 1 deletion autorag/vectordb/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def add(
@abstractmethod
async def query(
self, queries: List[str], top_k: int, **kwargs
) -> Tuple[List[List[str]], List[List[float]]]:
) -> Tuple[List[List[str]], List[List[float]], List[List[str]]]:
pass

@abstractmethod
Expand Down
11 changes: 7 additions & 4 deletions autorag/vectordb/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ async def add(self, ids: List[str], texts: List[str]):
texts = self.truncated_inputs(texts)
text_embeddings = await self.embedding.aget_text_embedding_batch(texts)
if isinstance(self.collection, AsyncCollection):
await self.collection.add(ids=ids, embeddings=text_embeddings)
await self.collection.add(
ids=ids, embeddings=text_embeddings, documents=texts
)
else:
self.collection.add(ids=ids, embeddings=text_embeddings)
self.collection.add(ids=ids, embeddings=text_embeddings, documents=texts)

async def fetch(self, ids: List[str]) -> List[List[float]]:
if isinstance(self.collection, AsyncCollection):
Expand All @@ -92,7 +94,7 @@ async def is_exist(self, ids: List[str]) -> List[bool]:

async def query(
self, queries: List[str], top_k: int, **kwargs
) -> Tuple[List[List[str]], List[List[float]]]:
) -> Tuple[List[List[str]], List[List[float]], List[List[str]]]:
queries = self.truncated_inputs(queries)
query_embeddings: List[
List[float]
Expand All @@ -107,8 +109,9 @@ async def query(
)
ids = query_result["ids"]
scores = query_result["distances"]
contents = query_result["documents"]
scores = apply_recursive(lambda x: 1 - x, scores)
return ids, scores
return ids, scores, contents

async def delete(self, ids: List[str]):
if isinstance(self.collection, AsyncCollection):
Expand Down
16 changes: 9 additions & 7 deletions autorag/vectordb/couchbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
username: str = "",
password: str = "",
ingest_batch: int = 100,
text_key: Optional[str] = "text",
text_key: Optional[str] = "content",
embedding_key: Optional[str] = "embedding",
scoped_index: bool = True,
):
Expand Down Expand Up @@ -123,7 +123,7 @@ async def is_exist(self, ids: List[str]) -> List[bool]:

async def query(
self, queries: List[str], top_k: int, **kwargs
) -> Tuple[List[List[str]], List[List[float]]]:
) -> Tuple[List[List[str]], List[List[float]], List[List[str]]]:
import couchbase.search as search
from couchbase.options import SearchOptions
from couchbase.vector_search import VectorQuery, VectorSearch
Expand All @@ -133,7 +133,7 @@ async def query(
List[float]
] = await self.embedding.aget_text_embedding_batch(queries)

ids, scores = [], []
ids, scores, texts = [], [], []
for query_embedding in query_embeddings:
# Create Search Request
search_req = search.SearchRequest.create(
Expand All @@ -151,27 +151,29 @@ async def query(
search_iter = self.scope.search(
self.index_name,
search_req,
SearchOptions(limit=top_k),
SearchOptions(limit=top_k, fields=[self.text_key]),
)

else:
search_iter = self.cluster.search(
self.index_name,
search_req,
SearchOptions(limit=top_k),
SearchOptions(limit=top_k, fields=[self.text_key]),
)

# Parse the search results
# search_iter.rows() can only be iterated once.
id_list, score_list = [], []
id_list, score_list, text_list = [], [], []
for result in search_iter.rows():
id_list.append(result.id)
score_list.append(result.score)
text_list.append(result.fields[self.text_key])

ids.append(id_list)
scores.append(score_list)
texts.append(text_list)

return ids, scores
return ids, scores, texts

async def delete(self, ids: List[str]):
self.collection.remove_multi(ids)
Expand Down
13 changes: 9 additions & 4 deletions autorag/vectordb/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ def __init__(
field = FieldSchema(
name="vector", dtype=DataType.FLOAT_VECTOR, dim=dimension
)
schema = CollectionSchema(fields=[pk, field])
content = FieldSchema(
name="content", dtype=DataType.VARCHAR, max_length=65535
)
schema = CollectionSchema(fields=[pk, field, content])

self.collection = Collection(name=self.collection_name, schema=schema)
index_params = {
Expand All @@ -90,7 +93,7 @@ async def add(self, ids: List[str], texts: List[str]):

# make data for insertion
data = list(
map(lambda _id, vector: {"id": _id, "vector": vector}, ids, text_embeddings)
map(lambda _id, vector, text: {"id": _id, "vector": vector, "content":text}, ids, text_embeddings, texts)
)

# Insert data into the collection
Expand All @@ -103,7 +106,7 @@ async def add(self, ids: List[str], texts: List[str]):

async def query(
self, queries: List[str], top_k: int, **kwargs
) -> Tuple[List[List[str]], List[List[float]]]:
) -> Tuple[List[List[str]], List[List[float]], List[List[str]]]:
queries = self.truncated_inputs(queries)
query_embeddings: List[
List[float]
Expand All @@ -117,18 +120,20 @@ async def query(
limit=top_k,
anns_field="vector",
param={"metric_type": self.similarity_metric.upper()},
output_fields=["vector","content"],
timeout=self.timeout,
**kwargs,
)

# Extract IDs and distances
ids = [[str(hit.id) for hit in result] for result in results]
distances = [[hit.distance for hit in result] for result in results]
contents = [[str(hit.fields["content"]) for hit in result] for result in results]

if self.similarity_metric in ["l2"]:
distances = apply_recursive(lambda x: -x, distances)

return ids, distances
return ids, distances, contents

async def fetch(self, ids: List[str]) -> List[List[float]]:
try:
Expand Down
16 changes: 12 additions & 4 deletions autorag/vectordb/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(
cloud: Optional[str] = "aws",
region: Optional[str] = "us-east-1",
api_key: Optional[str] = None,
text_key: Optional[str] = "content",
deletion_protection: Optional[str] = "disabled", # "enabled" or "disabled"
namespace: Optional[str] = "default",
ingest_batch: int = 200,
Expand All @@ -31,6 +32,7 @@ def __init__(
self.index_name = index_name
self.namespace = namespace
self.ingest_batch = ingest_batch
self.text_key = text_key

self.client = Pinecone_client(api_key=api_key)

Expand Down Expand Up @@ -58,7 +60,11 @@ async def add(self, ids: List[str], texts: List[str]):
List[float]
] = await self.embedding.aget_text_embedding_batch(texts)

vector_tuples = list(zip(ids, text_embeddings))
metadatas = [{} for _ in texts]
for metadata, text in zip(metadatas, texts):
metadata[self.text_key] = text

vector_tuples = list(zip(ids, text_embeddings, metadatas))
batch_vectors = make_batch(vector_tuples, self.ingest_batch)

async_res = [
Expand Down Expand Up @@ -87,28 +93,30 @@ async def is_exist(self, ids: List[str]) -> List[bool]:

async def query(
self, queries: List[str], top_k: int, **kwargs
) -> Tuple[List[List[str]], List[List[float]]]:
) -> Tuple[List[List[str]], List[List[float]], List[List[str]]]:
queries = self.truncated_inputs(queries)
query_embeddings: List[
List[float]
] = await self.embedding.aget_text_embedding_batch(queries)

ids, scores = [], []
ids, scores, texts = [], []
for query_embedding in query_embeddings:
response = self.index.query(
vector=query_embedding,
top_k=top_k,
include_values=True,
include_metadata=True,
namespace=self.namespace,
)

ids.append([o.id for o in response.matches])
scores.append([o.score for o in response.matches])
scores.append([o.metadata[self.text_key] for o in response.matches])

if self.similarity_metric in ["l2"]:
scores = apply_recursive(lambda x: -x, scores)

return ids, scores
return ids, scores, texts

async def delete(self, ids: List[str]):
# Delete entries by IDs
Expand Down
Loading
Loading