+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,17 @@ def _to_parent_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]:
and hit["_source"]["parent_id"] is not None
and hit["_source"]["parent_id"] not in parent_ids
):
results.append(hit)
# Only add a child doc whose parent_id has not been found, yet.
parent_ids.add(hit["_source"]["parent_id"])
results.append(hit)
elif ("parent_id" not in hit["_source"] or hit["_source"]["parent_id"] is None) and hit[
"_id"
] not in parent_ids:
# Add a parent doc if it's a match.
parent_id = hit["_id"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually use _id? I thought all our indexing is done by _doc_id

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we always pass document.doc_id as _id when we call OpenSearch.

parent_ids.add(parent_id)
hit["_source"]["parent_id"] = parent_id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to add the loopback?

results.append(hit)

page += 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,6 @@ def get_doc_count(os_client, index_name: str, query: Optional[Dict[str, Any]] =
return res["count"]


"""
class MockLLM(LLM):
def __init__(self):
super().__init__(model_name="mock_model")

def generate(self, *, prompt: RenderedPrompt, llm_kwargs: Optional[dict] = None) -> str:
return str(uuid.uuid4())

def is_chat_mode(self):
return True
"""


class TestOpenSearchRead:
INDEX_SETTINGS = {
"body": {
Expand Down Expand Up @@ -575,6 +562,139 @@ def test_parallel_query_with_pit(self, setup_index_large, os_client):
if doc.parent_id is not None:
assert doc.parent_id == expected_docs[doc.doc_id]["parent_id"]

def test_parallel_query_on_property_with_pit(self, setup_index, os_client):
context = sycamore.init(exec_mode=ExecMode.RAY)
key = "property1"
hidden = str(uuid.uuid4())
query = {"query": {"match": {f"properties.{key}": hidden}}}
# make sure we read from pickle files -- this part won't be written into opensearch.
dicts = [
{
"doc_id": "1",
"properties": {key: hidden},
"elements": [
{"properties": {"_element_index": 1}, "text_representation": "here is an animal that meows"},
],
},
{
"doc_id": "2",
"elements": [
{"id": 7, "properties": {"_element_index": 7}, "text_representation": "this is a cat"},
{
"id": 1,
"properties": {"_element_index": 1},
"text_representation": "here is an animal that moos",
},
],
},
{
"doc_id": "3",
"elements": [
{"properties": {"_element_index": 1}, "text_representation": "here is an animal that moos"},
],
},
{
"doc_id": "4",
"elements": [
{"id": 1, "properties": {"_element_index": 1}},
],
},
{
"doc_id": "5",
"elements": [
{
"properties": {"_element_index": 1},
"text_representation": "the number of pages in this document are 253",
}
],
},
{
"doc_id": "6",
"elements": [
{"id": 1, "properties": {"_element_index": 1}},
],
},
]
docs = [Document(item) for item in dicts]

original_docs = (
context.read.document(docs)
# .materialize(path={"root": cache_dir, "name": doc_to_name})
.explode()
.write.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=setup_index,
index_settings=TestOpenSearchRead.INDEX_SETTINGS,
execute=False,
)
.take_all()
)

os_client.indices.refresh(setup_index)

expected_count = len(original_docs)
actual_count = get_doc_count(os_client, setup_index)
# refresh should have made all ingested docs immediately available for search
assert actual_count == expected_count, f"Expected {expected_count} documents, found {actual_count}"

t0 = time.time()
retrieved_docs = context.read.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=setup_index,
query=query,
reconstruct_document=True,
).take_all()
t1 = time.time()

print(f"Retrieved {len(retrieved_docs)} documents in {t1 - t0} seconds")
expected_docs = self.get_ids(os_client, setup_index, True, query)
assert len(retrieved_docs) == len(expected_docs)
assert "1" == retrieved_docs[0].doc_id
assert hidden == retrieved_docs[0].properties[key]

def test_parallel_query_on_extracted_property_with_pit(self, setup_index, os_client):

path = str(TEST_DIR / "resources/data/pdfs/Ray.pdf")
context = sycamore.init(exec_mode=ExecMode.RAY)
llm = OpenAI(OpenAIModels.GPT_4O_MINI)
extractor = OpenAIEntityExtractor("title", llm=llm)
original_docs = (
context.read.binary(path, binary_format="pdf")
.partition(ArynPartitioner(aryn_api_key=ARYN_API_KEY))
.extract_entity(extractor)
# .materialize(path={"root": materialized_dir, "name": doc_to_name})
.explode()
.write.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=setup_index,
index_settings=TestOpenSearchRead.INDEX_SETTINGS,
execute=False,
)
.take_all()
)

os_client.indices.refresh(setup_index)

expected_count = len(original_docs)
actual_count = get_doc_count(os_client, setup_index)
# refresh should have made all ingested docs immediately available for search
assert actual_count == expected_count, f"Expected {expected_count} documents, found {actual_count}"

query = {"query": {"match": {"properties.title": "ray"}}}

t0 = time.time()
retrieved_docs = context.read.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=setup_index,
query=query,
reconstruct_document=True,
).take_all()
t1 = time.time()

print(f"Retrieved {len(retrieved_docs)} documents in {t1 - t0} seconds")
expected_docs = self.get_ids(os_client, setup_index, True, query)
assert len(retrieved_docs) == len(expected_docs)

@staticmethod
def get_ids(
os_client, index_name, parents_only: bool = False, query: Optional[Dict[str, Any]] = None
Expand Down
73 changes: 38 additions & 35 deletions lib/sycamore/sycamore/tests/unit/test_materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,47 +703,50 @@ def failing_map(doc):
assert retry_counter.x == 23 # 2 successful, 21 unsuccessful

def test_mrr_path_handling(self):
from unittest.mock import patch
from pyarrow.fs import S3FileSystem, LocalFileSystem
from sycamore.docset import DocSet

"""Test MaterializeReadReliability path handling for both local and S3 paths"""
ctx = sycamore.init(exec_mode=ExecMode.LOCAL)
mrr = MaterializeReadReliability(max_batch=3)
mrr._refresh_seen_files = lambda: None
mrr.seen = set()
ctx.rewrite_rules.append(mrr)

# Test various path formats
test_cases = [
# Local paths
{"path": "/tmp/local/path", "expected_fs": "LocalFileSystem"},
{"path": Path("/tmp/local/path2"), "expected_fs": "LocalFileSystem"},
{"path": {"root": "/tmp/local/path3"}, "expected_fs": "LocalFileSystem"},
{"path": {"root": Path("/tmp/local/path4")}, "expected_fs": "LocalFileSystem"},
# S3 paths
{"path": "s3://test-example/path", "should_execute": True, "expected_fs": "S3FileSystem"},
{"path": {"root": "s3://test-example/a/path"}, "should_execute": True, "expected_fs": "S3FileSystem"},
]

MaterializeReadReliability.execute_reliably = lambda context, plan, mrr, **kwargs: None
for case in test_cases:
# Create a dummy materialize plan
plan = Materialize(None, ctx, path=case["path"])

# Test should_execute_reliably

MaterializeReadReliability.maybe_execute_reliably(DocSet(context=ctx, plan=plan))

# Verify the path was properly initialized in mrr_instance
assert hasattr(mrr, "path"), f"mrr_instance missing path attribute for {case['path']}"
assert hasattr(mrr, "fs"), f"mrr_instance missing fs attribute for {case['path']}"

# Verify correct filesystem type
if case["expected_fs"] == "S3FileSystem":
assert isinstance(
mrr.fs, S3FileSystem
), f"Expected S3FileSystem for path {case['path']}, got {type(mrr.fs)}"
else:
assert isinstance(
mrr.fs, LocalFileSystem
), f"Expected LocalFileSystem for path {case['path']}, got {type(mrr.fs)}"
# Use patch instead of modifying class
with patch.object(MaterializeReadReliability, "execute_reliably", return_value=None):

# Test various path formats
test_cases = [
# Local paths
{"path": "/tmp/local/path", "expected_fs": "LocalFileSystem"},
{"path": Path("/tmp/local/path2"), "expected_fs": "LocalFileSystem"},
{"path": {"root": "/tmp/local/path3"}, "expected_fs": "LocalFileSystem"},
{"path": {"root": Path("/tmp/local/path4")}, "expected_fs": "LocalFileSystem"},
# S3 paths
{"path": "s3://test-example/path", "should_execute": True, "expected_fs": "S3FileSystem"},
{"path": {"root": "s3://test-example/a/path"}, "should_execute": True, "expected_fs": "S3FileSystem"},
]

MaterializeReadReliability.execute_reliably = lambda context, plan, mrr, **kwargs: None
for case in test_cases:
# Create a dummy materialize plan
plan = Materialize(None, ctx, path=case["path"])

# Test should_execute_reliably

MaterializeReadReliability.maybe_execute_reliably(DocSet(context=ctx, plan=plan))

# Verify the path was properly initialized in mrr_instance
assert hasattr(mrr, "path"), f"mrr_instance missing path attribute for {case['path']}"
assert hasattr(mrr, "fs"), f"mrr_instance missing fs attribute for {case['path']}"

# Verify correct filesystem type
if case["expected_fs"] == "S3FileSystem":
assert isinstance(
mrr.fs, S3FileSystem
), f"Expected S3FileSystem for path {case['path']}, got {type(mrr.fs)}"
else:
assert isinstance(
mrr.fs, LocalFileSystem
), f"Expected LocalFileSystem for path {case['path']}, got {type(mrr.fs)}"
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载