Skip to content
Snippets Groups Projects
Commit fd69cf59 authored by siddicky's avatar siddicky
Browse files

Implement batch upsert to handle large data payloads

parent ce824593
No related branches found
No related tags found
No related merge requests found
......@@ -110,21 +110,33 @@ class PineconeIndex(BaseIndex):
self.host = self.client.describe_index(self.index_name)["host"]
return index
def _batch_upsert(self, batch: List[dict]):
"""Helper method for upserting a single batch of records."""
if self.index is not None:
self.index.upsert(vectors=batch)
else:
raise ValueError("Index is None, could not upsert.")
def add(
self, embeddings: List[List[float]], routes: List[str], utterances: List[str]
self,
embeddings: List[List[float]],
routes: List[str],
utterances: List[str],
batch_size: int = 100,
):
"""Add vectors to Pinecone in batches."""
if self.index is None:
self.dimensions = self.dimensions or len(embeddings[0])
# we set force_create to True as we MUST have an index to add data
self.index = self._init_index(force_create=True)
vectors_to_upsert = []
for vector, route, utterance in zip(embeddings, routes, utterances):
record = PineconeRecord(values=vector, route=route, utterance=utterance)
vectors_to_upsert.append(record.to_dict())
if self.index is not None:
self.index.upsert(vectors=vectors_to_upsert)
else:
raise ValueError("Index is None could not upsert.")
vectors_to_upsert = [
PineconeRecord(values=vector, route=route, utterance=utterance).to_dict()
for vector, route, utterance in zip(embeddings, routes, utterances)
]
for i in range(0, len(vectors_to_upsert), batch_size):
batch = vectors_to_upsert[i : i + batch_size]
self._batch_upsert(batch)
def _get_route_ids(self, route_name: str):
clean_route = clean_route_name(route_name)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment