Skip to content
Snippets Groups Projects
Unverified Commit f0321b7e authored by Yarikama's avatar Yarikama Committed by GitHub
Browse files

fix: fix adelete method and add delete_nodes to elasticsearch vector store (#17890)

parent 3c960345
No related branches found
No related tags found
No related merge requests found
...@@ -5,7 +5,6 @@ from logging import getLogger ...@@ -5,7 +5,6 @@ from logging import getLogger
from typing import Any, Callable, Dict, List, Literal, Optional, Union from typing import Any, Callable, Dict, List, Literal, Optional, Union
import nest_asyncio import nest_asyncio
import numpy as np
from llama_index.core.bridge.pydantic import PrivateAttr from llama_index.core.bridge.pydantic import PrivateAttr
from llama_index.core.schema import BaseNode, MetadataMode, TextNode from llama_index.core.schema import BaseNode, MetadataMode, TextNode
from llama_index.core.vector_stores.types import ( from llama_index.core.vector_stores.types import (
...@@ -79,11 +78,13 @@ def _to_elasticsearch_filter( ...@@ -79,11 +78,13 @@ def _to_elasticsearch_filter(
def _to_llama_similarities(scores: List[float]) -> List[float]: def _to_llama_similarities(scores: List[float]) -> List[float]:
if scores is None or len(scores) == 0: if not scores:
return [] return []
min_score = min(scores)
scores_to_norm: np.ndarray = np.array(scores) max_score = max(scores)
return np.exp(scores_to_norm - np.max(scores_to_norm)).tolist() if max_score == min_score:
return [1.0 if max_score > 0 else 0.0 for _ in scores]
return [(x - min_score) / (max_score - min_score) for x in scores]
def _mode_must_match_retrieval_strategy( def _mode_must_match_retrieval_strategy(
...@@ -399,9 +400,62 @@ class ElasticsearchStore(BasePydanticVectorStore): ...@@ -399,9 +400,62 @@ class ElasticsearchStore(BasePydanticVectorStore):
Exception: If AsyncElasticsearch delete_by_query fails. Exception: If AsyncElasticsearch delete_by_query fails.
""" """
await self._store.delete( await self._store.delete(
query={"term": {"metadata.ref_doc_id.keyword": ref_doc_id}}, **delete_kwargs query={"term": {"metadata.ref_doc_id": ref_doc_id}}, **delete_kwargs
)
def delete_nodes(
self,
node_ids: Optional[List[str]] = None,
filters: Optional[MetadataFilters] = None,
**delete_kwargs: Any,
) -> None:
"""
Delete nodes from vector store using node IDs and filters.
Args:
node_ids: Optional list of node IDs to delete.
filters: Optional metadata filters to select nodes to delete.
delete_kwargs: Optional additional arguments to pass to delete operation.
"""
return asyncio.get_event_loop().run_until_complete(
self.adelete_nodes(node_ids, filters, **delete_kwargs)
) )
async def adelete_nodes(
self,
node_ids: Optional[List[str]] = None,
filters: Optional[MetadataFilters] = None,
**delete_kwargs: Any,
) -> None:
"""
Asynchronously delete nodes from vector store using node IDs and filters.
Args:
node_ids (Optional[List[str]], optional): List of node IDs. Defaults to None.
filters (Optional[MetadataFilters], optional): Metadata filters. Defaults to None.
delete_kwargs (Any, optional): Optional additional arguments to pass to delete operation.
"""
if not node_ids and not filters:
return
if node_ids and not filters:
await self._store.delete(ids=node_ids, **delete_kwargs)
return
query = {"bool": {"must": []}}
if node_ids:
query["bool"]["must"].append({"terms": {"_id": node_ids}})
if filters:
es_filter = _to_elasticsearch_filter(filters)
if "bool" in es_filter and "must" in es_filter["bool"]:
query["bool"]["must"].extend(es_filter["bool"]["must"])
else:
query["bool"]["must"].append(es_filter)
await self._store.delete(query=query, **delete_kwargs)
def query( def query(
self, self,
query: VectorStoreQuery, query: VectorStoreQuery,
...@@ -487,11 +541,13 @@ class ElasticsearchStore(BasePydanticVectorStore): ...@@ -487,11 +541,13 @@ class ElasticsearchStore(BasePydanticVectorStore):
top_k_nodes = [] top_k_nodes = []
top_k_ids = [] top_k_ids = []
top_k_scores = [] top_k_scores = []
for hit in hits: for hit in hits:
source = hit["_source"] source = hit["_source"]
metadata = source.get("metadata", {}) metadata = source.get("metadata", {})
text = source.get(self.text_field, None) text = source.get(self.text_field, None)
node_id = hit["_id"] node_id = hit["_id"]
score = hit["_score"]
try: try:
node = metadata_dict_to_node(metadata) node = metadata_dict_to_node(metadata)
...@@ -519,14 +575,7 @@ class ElasticsearchStore(BasePydanticVectorStore): ...@@ -519,14 +575,7 @@ class ElasticsearchStore(BasePydanticVectorStore):
) )
top_k_nodes.append(node) top_k_nodes.append(node)
top_k_ids.append(node_id) top_k_ids.append(node_id)
top_k_scores.append(hit.get("_rank", hit["_score"])) top_k_scores.append(score)
if (
isinstance(self.retrieval_strategy, AsyncDenseVectorStrategy)
and self.retrieval_strategy.hybrid
):
total_rank = sum(top_k_scores)
top_k_scores = [total_rank - rank / total_rank for rank in top_k_scores]
return VectorStoreQueryResult( return VectorStoreQueryResult(
nodes=top_k_nodes, nodes=top_k_nodes,
......
...@@ -27,7 +27,7 @@ exclude = ["**/BUILD"] ...@@ -27,7 +27,7 @@ exclude = ["**/BUILD"]
license = "MIT" license = "MIT"
name = "llama-index-vector-stores-elasticsearch" name = "llama-index-vector-stores-elasticsearch"
readme = "README.md" readme = "README.md"
version = "0.4.1" version = "0.4.2"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.9,<4.0" python = ">=3.9,<4.0"
......
...@@ -118,7 +118,7 @@ def node_embeddings() -> List[TextNode]: ...@@ -118,7 +118,7 @@ def node_embeddings() -> List[TextNode]:
text="I was taught that the way of progress was neither swift nor easy.", text="I was taught that the way of progress was neither swift nor easy.",
id_="0b31ae71-b797-4e88-8495-031371a7752e", id_="0b31ae71-b797-4e88-8495-031371a7752e",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="text-3")}, relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="text-3")},
metadate={ metadata={
"author": "Marie Curie", "author": "Marie Curie",
}, },
embedding=[0.0, 0.0, 0.9], embedding=[0.0, 0.0, 0.9],
...@@ -130,7 +130,7 @@ def node_embeddings() -> List[TextNode]: ...@@ -130,7 +130,7 @@ def node_embeddings() -> List[TextNode]:
), ),
id_="bd2e080b-159a-4030-acc3-d98afd2ba49b", id_="bd2e080b-159a-4030-acc3-d98afd2ba49b",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="text-4")}, relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="text-4")},
metadate={ metadata={
"author": "Albert Einstein", "author": "Albert Einstein",
}, },
embedding=[0.0, 0.0, 0.5], embedding=[0.0, 0.0, 0.5],
...@@ -142,7 +142,7 @@ def node_embeddings() -> List[TextNode]: ...@@ -142,7 +142,7 @@ def node_embeddings() -> List[TextNode]:
), ),
id_="f658de3b-8cef-4d1c-8bed-9a263c907251", id_="f658de3b-8cef-4d1c-8bed-9a263c907251",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="text-5")}, relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="text-5")},
metadate={ metadata={
"author": "Charlotte Bronte", "author": "Charlotte Bronte",
}, },
embedding=[0.0, 0.0, 0.3], embedding=[0.0, 0.0, 0.3],
...@@ -488,7 +488,7 @@ async def test_add_to_es_and_text_query_ranked_hybrid( ...@@ -488,7 +488,7 @@ async def test_add_to_es_and_text_query_ranked_hybrid(
query_str="human", query_str="human",
query_embedding=[0.0, 0.0, 0.5], query_embedding=[0.0, 0.0, 0.5],
mode=VectorStoreQueryMode.HYBRID, mode=VectorStoreQueryMode.HYBRID,
similarity_top_k=2, similarity_top_k=3,
) )
await check_top_match( await check_top_match(
es_hybrid_store, node_embeddings, use_async, query_get_1_first, node1, node2 es_hybrid_store, node_embeddings, use_async, query_get_1_first, node1, node2
...@@ -590,3 +590,60 @@ def test_metadata_filter_to_es_filter() -> None: ...@@ -590,3 +590,60 @@ def test_metadata_filter_to_es_filter() -> None:
] ]
} }
} }
@pytest.mark.asyncio()
@pytest.mark.parametrize("use_async", [True, False])
async def test_delete_nodes(
es_store: ElasticsearchStore,
node_embeddings: List[TextNode],
use_async: bool,
) -> None:
if use_async:
await es_store.async_add(node_embeddings)
else:
es_store.add(node_embeddings)
node_ids = [node_embeddings[0].node_id, node_embeddings[1].node_id]
if use_async:
await es_store.adelete_nodes(node_ids=node_ids)
else:
es_store.delete_nodes(node_ids=node_ids)
res = es_store.query(
VectorStoreQuery(query_embedding=[1.0, 0.0, 0.0], similarity_top_k=5)
)
assert len(res.nodes) == 4
assert all(node.node_id not in node_ids for node in res.nodes)
filters = MetadataFilters(
filters=[ExactMatchFilter(key="author", value="Marie Curie")]
)
if use_async:
await es_store.adelete_nodes(filters=filters)
else:
es_store.delete_nodes(filters=filters)
res = es_store.query(
VectorStoreQuery(query_embedding=[1.0, 0.0, 0.0], similarity_top_k=5)
)
assert len(res.nodes) == 3
assert all(node.metadata.get("author") != "Marie Curie" for node in res.nodes)
remaining_node_ids = [node.node_id for node in res.nodes[:2]]
filters = MetadataFilters(
filters=[ExactMatchFilter(key="author", value="Albert Einstein")]
)
if use_async:
await es_store.adelete_nodes(node_ids=remaining_node_ids, filters=filters)
else:
es_store.delete_nodes(node_ids=remaining_node_ids, filters=filters)
res = es_store.query(
VectorStoreQuery(query_embedding=[1.0, 0.0, 0.0], similarity_top_k=5)
)
assert len(res.nodes) == 2
assert any(node.metadata.get("author") == "Charlotte Bronte" for node in res.nodes)
assert any(
node.metadata.get("director") == "Christopher Nolan" for node in res.nodes
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment