Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Refactored indexing, extraction and retrieval #172

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

harini-venkataraman
Copy link
Contributor

@harini-venkataraman harini-venkataraman commented Mar 6, 2025

What

  • Separated indexing and extraction into distinct APIs in prompt service
  • Refactored the SDK to accommodate the new APIs.
  • Subquestion Retrieval using the Llama Index query engine.
  • Refactored the Retriever service.
  • Skipped VectorDB usage for zero chunk size.
  • Added integration tests for the Indexing and Extraction APIs.

...

Why

Pre requisite for cell type merge
...

How

Refactored the existing code
...

Relevant Docs

Related Issues or PRs

Zipstack/unstract#1172
Zipstack/unstract#1149
...

Dependencies Versions / Env Variables

Notes on Testing

Added integration tests
...

Screenshots

...

Checklist

I have read and understood the Contribution Guidelines.

)
raise IndexingError(str(e)) from e

def _delete_existing_nodes_on_reindex(self, vector_db, doc_id, doc_id_found):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function can be renamed to delete_nodes. That way it can be reused elsewhere if there is a need to delete nodes...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please add types for the arguments wherever possible

@CLAassistant
Copy link

CLAassistant commented Mar 12, 2025

CLA assistant check
All committers have signed the CLA.

@harini-venkataraman harini-venkataraman changed the title REFACTOR : Indexing API [FIX] Refactored indexing, extraction and retrieval Mar 12, 2025
@harini-venkataraman harini-venkataraman marked this pull request as ready for review March 12, 2025 14:26
return documents
except Exception as e:
self.tool.stream_log(
f"Error deleting nodes for {doc_id}: {e}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong error msg.

self._capture_metrics = capture_metrics
self._metrics = {}

def extract(self):
Copy link
Contributor

@gaya3-zipstack gaya3-zipstack Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we using this function?


class SimpleRetriever(BaseRetriever):
def __init__(self, vector_db: VectorDB, prompt: str, doc_id: str, top_k: int):
self.vector_db = vector_db
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we merely call the parent's constructor here. Or, we should be even able to remove the init here as the parent covers it right?

class RetrievalError(SdkError):
"""Custom exception raised for errors during retrieval from VectorDB."""

DEFAULT_MESSAGE = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this message is going all the way back to the user, then I think we need to shape this up a bit. User may not connect what the query param here would be. Instead we should rephrase it with parameters that user has control on.

class ProcessingOptions:
reindex: bool = False
enable_highlight: bool = False
usage_kwargs: dict[Any, Any] = field(default_factory=dict)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does usage_kwargs here carry? Wondering why do we keep it as an attribute of ProcessingOptions?


@log_elapsed(operation="INDEX")
@capture_metrics
def perform_indexing(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking if we should retain old function names like index or index_document. That will keep the familiarity level intact for people who read the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also applicable in other places where relevant.

Copy link
Contributor

@chandrasekharan-zipstack chandrasekharan-zipstack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments - my main question is on why we have such helper methods for indexing and an API call to the prompt-service

Comment on lines +13 to +14
# TODO: Inherit from StreamMixin and avoid using BaseTool
self.tool = tool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harini-venkataraman do we do this only for logging? If that's the case can we make stream_logs() a static method and remove this dependency?

self.tool.stream_log(f"No nodes found for {doc_id}")
except Exception as e:
self.tool.stream_log(
f"Error querying {instance_identifiers.vector_db_instance_id}: {e},"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Logging the UUID for an adapter will not be useful to the user currently

  1. Either we show the UUID also in the adapter listing page
  2. Or always use the adapter name to log stuff to the user

except Exception as e:
self.tool.stream_log(
f"Error querying {instance_identifiers.vector_db_instance_id}: {e},"
" proceeding to index",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should assume that we will always index after calling this function. This log should be the responsibility of the caller ideally

Comment on lines +146 to +149
except Exception as e:
self.tool.stream_log(
f"Unexpected error during indexing check: {e}", level=LogLevel.ERROR
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I don't think this is needed here. Let's say an actual error does happen

  1. We lose context on the error since its not propagated and suppressed instead
  2. We might display some pythonic error to the user's logs this way

It might be better to propagate the error up the call stack and ensure we log a trace and also respond to the user with something meaningful

)
raise SdkError(f"Error deleting nodes for {doc_id}: {e}") from e

def _prepare_documents(self, doc_id, full_text) -> list:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Please add typing for these args

Comment on lines +44 to +51
# UN-1288 For Pinecone, we are seeing an inconsistent case where
# query with doc_id fails even though indexing just happened.
# This causes the following retrieve to return no text.
# To rule out any lag on the Pinecone vector DB write,
# the following sleep is added
# Note: This will not fix the issue. Since this issue is inconsistent
# and not reproducible easily, this is just a safety net.
time.sleep(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: This should only apply for pinecone then - why do we do this for all vectorDBs?
cc: @gaya3-zipstack


return doc_id_found

@log_elapsed(operation="INDEX")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@log_elapsed(operation="INDEX")

NIT: Remove this since you've already added it for the index API

]
# Convert raw text to llama index usage Document
documents = self._prepare_documents(doc_id, full_text)
self._delete_existing_nodes_on_reindex(vector_db, doc_id, doc_id_found)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we accept doc_id_found in this function? I feel that we should get rid of it and the caller of this function should ensure that we skip calling this function itself.

Comment on lines +50 to +61
@log_elapsed(operation="INDEX")
def index(
self, payload: dict[str, Any], params: Optional[dict[str, str]] = None
) -> dict[str, Any]:
url_path = "index"
if self.is_public_call:
url_path = "index-public"
return self._post_call(
url_path=url_path,
payload=payload,
params=params,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harini-venkataraman if we have an API to the prompt-service to take care of this, why do we have helper methods in the SDK? Shouldn't they be in prompt-service?

Comment on lines +6 to +10
class InstanceIdentifiers:
embedding_instance_id: str
vector_db_instance_id: str
x2text_instance_id: str
llm_instance_id: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harini-venkataraman I notice that we pass this object and vector_db / embedding adapter to functions. Can't we make use of the adapters alone everywhere? By making use of such UUIDs we force ourselves to use multiple DB queries every now and then

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants