Skip to content

Commit 9b038aa

Browse files
committed
Improve parsing of fetch response
1 parent 53082f1 commit 9b038aa

2 files changed

Lines changed: 266 additions & 47 deletions

File tree

pinecone/grpc/utils.py

Lines changed: 104 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -55,38 +55,49 @@ def parse_sparse_values(sparse_values: dict | None) -> SparseValues:
5555
def parse_fetch_response(
5656
response: Message, initial_metadata: dict[str, str] | None = None
5757
) -> FetchResponse:
58-
json_response = json_format.MessageToDict(response)
58+
"""Parse a FetchResponse protobuf message directly without MessageToDict conversion.
59+
60+
This optimized version directly accesses protobuf fields for better performance.
61+
"""
62+
# Extract response info from initial metadata
63+
from pinecone.utils.response_info import extract_response_info
5964

65+
metadata = initial_metadata or {}
66+
response_info = extract_response_info(metadata)
67+
68+
# Directly access protobuf fields instead of converting entire message to dict
6069
vd = {}
61-
vectors = json_response.get("vectors", {})
62-
namespace = json_response.get("namespace", "")
70+
# namespace is a required string field, so it will always have a value (default empty string)
71+
namespace = response.namespace
6372

64-
for id, vec in vectors.items():
65-
# Convert to Vector dataclass
66-
sparse_vals = vec.get("sparseValues")
73+
# Iterate over vectors map directly
74+
for vec_id, vec in response.vectors.items():
75+
# Convert vector.values (RepeatedScalarFieldContainer) to list
76+
values = list(vec.values) if vec.values else []
77+
78+
# Handle sparse_values if present (check if field is set and not empty)
6779
parsed_sparse = None
68-
if sparse_vals:
80+
if vec.HasField("sparse_values") and vec.sparse_values:
6981
from pinecone.db_data.dataclasses import SparseValues
7082

7183
parsed_sparse = SparseValues(
72-
indices=sparse_vals.get("indices", []), values=sparse_vals.get("values", [])
84+
indices=list(vec.sparse_values.indices), values=list(vec.sparse_values.values)
7385
)
74-
vd[id] = Vector(
75-
id=vec["id"],
76-
values=vec.get("values") or [],
77-
sparse_values=parsed_sparse,
78-
metadata=vec.get("metadata", None),
79-
)
8086

81-
# Extract response info from initial metadata
82-
from pinecone.utils.response_info import extract_response_info
87+
# Convert metadata Struct to dict only when needed
88+
metadata_dict = None
89+
if vec.HasField("metadata") and vec.metadata:
90+
metadata_dict = json_format.MessageToDict(vec.metadata)
8391

84-
metadata = initial_metadata or {}
85-
response_info = extract_response_info(metadata)
92+
vd[vec_id] = Vector(
93+
id=vec.id, values=values, sparse_values=parsed_sparse, metadata=metadata_dict
94+
)
8695

96+
# Parse usage if present (usage is optional, so check HasField)
8797
usage = None
88-
if json_response.get("usage"):
89-
usage = parse_usage(json_response.get("usage", {}))
98+
if response.HasField("usage") and response.usage:
99+
usage = parse_usage({"readUnits": response.usage.read_units})
100+
90101
fetch_response = FetchResponse(
91102
vectors=vd, namespace=namespace, usage=usage, _response_info=response_info
92103
)
@@ -204,40 +215,86 @@ def parse_query_response(
204215
_check_type: bool = False,
205216
initial_metadata: dict[str, str] | None = None,
206217
) -> QueryResponse:
207-
if isinstance(response, Message):
208-
json_response = json_format.MessageToDict(response)
209-
else:
210-
json_response = response
211-
212-
matches = []
213-
for item in json_response.get("matches", []):
214-
sc = ScoredVector(
215-
id=item["id"],
216-
score=item.get("score", 0.0),
217-
values=item.get("values", []),
218-
sparse_values=parse_sparse_values(item.get("sparseValues")),
219-
metadata=item.get("metadata", None),
220-
_check_type=_check_type,
221-
)
222-
matches.append(sc)
223-
224-
# Due to OpenAPI model classes / actual parsing cost, we want to avoid
225-
# creating empty `Usage` objects and then passing them into QueryResponse
226-
# when they are not actually present in the response from the server.
227-
args = {"namespace": json_response.get("namespace", ""), "matches": matches}
228-
usage = json_response.get("usage")
229-
if usage:
230-
args["usage"] = parse_usage(usage)
218+
"""Parse a QueryResponse protobuf message directly without MessageToDict conversion.
231219
220+
This optimized version directly accesses protobuf fields for better performance.
221+
For dict responses (REST API), falls back to the original dict-based parsing.
222+
"""
232223
# Extract response info from initial metadata
233-
# For gRPC, LSN headers are in initial_metadata
234224
from pinecone.utils.response_info import extract_response_info
235225

236226
metadata = initial_metadata or {}
237227
response_info = extract_response_info(metadata)
238228

239-
query_response = QueryResponse(**args, _response_info=response_info)
240-
return query_response
229+
if isinstance(response, Message):
230+
# Optimized path: directly access protobuf fields
231+
matches = []
232+
# namespace is a required string field, so it will always have a value (default empty string)
233+
namespace = response.namespace
234+
235+
# Iterate over matches directly
236+
for match in response.matches:
237+
# Convert match.values (RepeatedScalarFieldContainer) to list
238+
values = list(match.values) if match.values else []
239+
240+
# Handle sparse_values if present (check if field is set and not empty)
241+
parsed_sparse = None
242+
if match.HasField("sparse_values") and match.sparse_values:
243+
parsed_sparse = SparseValues(
244+
indices=list(match.sparse_values.indices),
245+
values=list(match.sparse_values.values),
246+
)
247+
248+
# Convert metadata Struct to dict only when needed
249+
metadata_dict = None
250+
if match.HasField("metadata") and match.metadata:
251+
metadata_dict = json_format.MessageToDict(match.metadata)
252+
253+
sc = ScoredVector(
254+
id=match.id,
255+
score=match.score,
256+
values=values,
257+
sparse_values=parsed_sparse,
258+
metadata=metadata_dict,
259+
_check_type=_check_type,
260+
)
261+
matches.append(sc)
262+
263+
# Parse usage if present (usage is optional, so check HasField)
264+
usage = None
265+
if response.HasField("usage") and response.usage:
266+
usage = parse_usage({"readUnits": response.usage.read_units})
267+
268+
query_response = QueryResponse(
269+
namespace=namespace, matches=matches, usage=usage, _response_info=response_info
270+
)
271+
return query_response
272+
else:
273+
# Fallback for dict responses (REST API)
274+
json_response = response
275+
276+
matches = []
277+
for item in json_response.get("matches", []):
278+
sc = ScoredVector(
279+
id=item["id"],
280+
score=item.get("score", 0.0),
281+
values=item.get("values", []),
282+
sparse_values=parse_sparse_values(item.get("sparseValues")),
283+
metadata=item.get("metadata", None),
284+
_check_type=_check_type,
285+
)
286+
matches.append(sc)
287+
288+
# Due to OpenAPI model classes / actual parsing cost, we want to avoid
289+
# creating empty `Usage` objects and then passing them into QueryResponse
290+
# when they are not actually present in the response from the server.
291+
args = {"namespace": json_response.get("namespace", ""), "matches": matches}
292+
usage = json_response.get("usage")
293+
if usage:
294+
args["usage"] = parse_usage(usage)
295+
296+
query_response = QueryResponse(**args, _response_info=response_info)
297+
return query_response
241298

242299

243300
def parse_stats_response(response: dict) -> "DescribeIndexStatsResponse":
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
"""Performance benchmarks for gRPC response parsing functions.
2+
3+
These tests measure the performance of parse_fetch_response and parse_query_response
4+
to establish baselines and verify optimizations.
5+
"""
6+
7+
import random
8+
import pytest
9+
from google.protobuf import struct_pb2
10+
11+
from pinecone.core.grpc.protos.db_data_2025_10_pb2 import (
12+
FetchResponse,
13+
QueryResponse,
14+
Vector,
15+
ScoredVector,
16+
SparseValues,
17+
Usage,
18+
)
19+
from pinecone.grpc.utils import parse_fetch_response, parse_query_response
20+
21+
22+
def create_vector(id: str, dimension: int, include_sparse: bool = False) -> Vector:
23+
"""Create a Vector protobuf message with random values."""
24+
values = [random.random() for _ in range(dimension)]
25+
26+
# Create sparse values if needed
27+
sparse_values_obj = None
28+
if include_sparse:
29+
# Create sparse values with ~10% of dimension as non-zero
30+
sparse_size = max(1, dimension // 10)
31+
indices = sorted(random.sample(range(dimension), sparse_size))
32+
sparse_values_list = [random.random() for _ in range(sparse_size)]
33+
sparse_values_obj = SparseValues(indices=indices, values=sparse_values_list)
34+
35+
# Add some metadata
36+
metadata = struct_pb2.Struct()
37+
metadata.update({"category": f"cat_{random.randint(1, 10)}", "score": random.random()})
38+
39+
# Create vector with all fields
40+
if sparse_values_obj:
41+
vector = Vector(id=id, values=values, sparse_values=sparse_values_obj, metadata=metadata)
42+
else:
43+
vector = Vector(id=id, values=values, metadata=metadata)
44+
45+
return vector
46+
47+
48+
def create_scored_vector(id: str, dimension: int, include_sparse: bool = False) -> ScoredVector:
49+
"""Create a ScoredVector protobuf message with random values."""
50+
values = [random.random() for _ in range(dimension)]
51+
52+
# Create sparse values if needed
53+
sparse_values_obj = None
54+
if include_sparse:
55+
# Create sparse values with ~10% of dimension as non-zero
56+
sparse_size = max(1, dimension // 10)
57+
indices = sorted(random.sample(range(dimension), sparse_size))
58+
sparse_values_list = [random.random() for _ in range(sparse_size)]
59+
sparse_values_obj = SparseValues(indices=indices, values=sparse_values_list)
60+
61+
# Add some metadata
62+
metadata = struct_pb2.Struct()
63+
metadata.update({"category": f"cat_{random.randint(1, 10)}", "score": random.random()})
64+
65+
# Create scored vector with all fields
66+
if sparse_values_obj:
67+
scored_vector = ScoredVector(
68+
id=id,
69+
score=random.random(),
70+
values=values,
71+
sparse_values=sparse_values_obj,
72+
metadata=metadata,
73+
)
74+
else:
75+
scored_vector = ScoredVector(id=id, score=random.random(), values=values, metadata=metadata)
76+
77+
return scored_vector
78+
79+
80+
def create_fetch_response(
81+
num_vectors: int, dimension: int, include_sparse: bool = False
82+
) -> FetchResponse:
83+
"""Create a FetchResponse protobuf message with specified number of vectors."""
84+
vectors = {}
85+
for i in range(num_vectors):
86+
vector = create_vector(f"vec_{i}", dimension, include_sparse)
87+
vectors[f"vec_{i}"] = vector
88+
89+
return FetchResponse(
90+
vectors=vectors, namespace="test_namespace", usage=Usage(read_units=num_vectors)
91+
)
92+
93+
94+
def create_query_response(
95+
num_matches: int, dimension: int, include_sparse: bool = False
96+
) -> QueryResponse:
97+
"""Create a QueryResponse protobuf message with specified number of matches."""
98+
matches = [
99+
create_scored_vector(f"match_{i}", dimension, include_sparse) for i in range(num_matches)
100+
]
101+
102+
return QueryResponse(
103+
matches=matches, namespace="test_namespace", usage=Usage(read_units=num_matches)
104+
)
105+
106+
107+
class TestFetchResponseParsingPerf:
108+
"""Performance benchmarks for parse_fetch_response."""
109+
110+
@pytest.mark.parametrize(
111+
"num_vectors,dimension",
112+
[
113+
(10, 128),
114+
(10, 512),
115+
(10, 1024),
116+
(100, 128),
117+
(100, 512),
118+
(100, 1024),
119+
(1000, 128),
120+
(1000, 512),
121+
(1000, 1024),
122+
],
123+
)
124+
def test_parse_fetch_response_dense(self, benchmark, num_vectors, dimension):
125+
"""Benchmark parse_fetch_response with dense vectors."""
126+
response = create_fetch_response(num_vectors, dimension, include_sparse=False)
127+
benchmark(parse_fetch_response, response, None)
128+
129+
@pytest.mark.parametrize("num_vectors,dimension", [(10, 128), (100, 128), (1000, 128)])
130+
def test_parse_fetch_response_sparse(self, benchmark, num_vectors, dimension):
131+
"""Benchmark parse_fetch_response with sparse vectors."""
132+
response = create_fetch_response(num_vectors, dimension, include_sparse=True)
133+
benchmark(parse_fetch_response, response, None)
134+
135+
136+
class TestQueryResponseParsingPerf:
137+
"""Performance benchmarks for parse_query_response."""
138+
139+
@pytest.mark.parametrize(
140+
"num_matches,dimension",
141+
[
142+
(10, 128),
143+
(10, 512),
144+
(10, 1024),
145+
(100, 128),
146+
(100, 512),
147+
(100, 1024),
148+
(1000, 128),
149+
(1000, 512),
150+
(1000, 1024),
151+
],
152+
)
153+
def test_parse_query_response_dense(self, benchmark, num_matches, dimension):
154+
"""Benchmark parse_query_response with dense vectors."""
155+
response = create_query_response(num_matches, dimension, include_sparse=False)
156+
benchmark(parse_query_response, response, False, None)
157+
158+
@pytest.mark.parametrize("num_matches,dimension", [(10, 128), (100, 128), (1000, 128)])
159+
def test_parse_query_response_sparse(self, benchmark, num_matches, dimension):
160+
"""Benchmark parse_query_response with sparse vectors."""
161+
response = create_query_response(num_matches, dimension, include_sparse=True)
162+
benchmark(parse_query_response, response, False, None)

0 commit comments

Comments
 (0)