From b941d3bd58ca00ffae475e8d469f76bc6f13f315 Mon Sep 17 00:00:00 2001 From: Austin Lee Date: Mon, 13 Jan 2025 12:08:00 -0800 Subject: [PATCH 1/4] Fix OpenSearch tests that require pre-loaded index --- .../opensearch/test_opensearch_read.py | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py b/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py index 8109d2180..d577e8cf0 100644 --- a/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py +++ b/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py @@ -39,9 +39,32 @@ def setup_index(os_client): @pytest.fixture(scope="class") def setup_index_large(os_client): + index_name = "test_opensearch_read_2" + os_client.indices.delete(index_name, ignore_unavailable=True) + os_client.indices.create(index_name, **TestOpenSearchRead.INDEX_SETTINGS) - yield "test_opensearch_read_large" + path = str(TEST_DIR / "resources/data/pdfs/Ray.pdf") + context = sycamore.init(exec_mode=ExecMode.RAY) + ds = ( + context.read.binary(path, binary_format="pdf") + .partition(partitioner=UnstructuredPdfPartitioner()) + .explode() + .write.opensearch( + os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, + index_name=index_name, + index_settings=TestOpenSearchRead.INDEX_SETTINGS, + execute=False, + ) + .take_all() + ) + + os_client.indices.refresh(index_name) + + yield index_name + + # Delete after + os_client.indices.delete(index_name, ignore_unavailable=True) def get_doc_count(os_client, index_name: str, query: Optional[Dict[str, Any]] = None) -> int: res = os_client.count(index=index_name) @@ -466,7 +489,7 @@ def get_ids( return docs - def test_pagination(self, setup_index_large, os_client): + def _test_pagination(self, setup_index_large, os_client): res = os_client.create_pit(index=setup_index_large, keep_alive="10m") pit_id = res["pit_id"] bodies = [] @@ -514,7 +537,7 @@ def search_slice(body, os_client) -> list[dict]: expected_docs = self.get_ids(os_client, setup_index_large) assert len(all_hits) == len(expected_docs) - def test_bulk_load(self, setup_index_large, os_client): + def _test_bulk_load(self, setup_index_large, os_client): # Only run this to populate a test index. return @@ -553,7 +576,7 @@ def test_bulk_load(self, setup_index_large, os_client): print(f"Current count: {doc_count}") - def test_cat(self, setup_index_large, os_client): + def _test_cat(self, setup_index_large, os_client): response = os_client.cat.shards(index=setup_index_large, format="json") print(response) doc_count = 0 From 383bdef3173b9a68c9aebbe900865dde4b7fd068 Mon Sep 17 00:00:00 2001 From: Austin Lee Date: Mon, 13 Jan 2025 13:03:45 -0800 Subject: [PATCH 2/4] Fix lint --- .../integration/connectors/opensearch/test_opensearch_read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py b/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py index d577e8cf0..b20346b6c 100644 --- a/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py +++ b/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py @@ -46,7 +46,7 @@ def setup_index_large(os_client): path = str(TEST_DIR / "resources/data/pdfs/Ray.pdf") context = sycamore.init(exec_mode=ExecMode.RAY) - ds = ( + ( context.read.binary(path, binary_format="pdf") .partition(partitioner=UnstructuredPdfPartitioner()) .explode() From 873601484b69b1284f5216b73f0427f9ffc72970 Mon Sep 17 00:00:00 2001 From: Austin Lee Date: Mon, 13 Jan 2025 15:13:05 -0800 Subject: [PATCH 3/4] Fix all fields in to-doc --- .../opensearch/opensearch_reader.py | 2 -- .../opensearch/test_opensearch_read.py | 35 ++++++++++--------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py b/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py index e4fb80f8c..62da581ae 100644 --- a/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py +++ b/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py @@ -375,13 +375,11 @@ def _to_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]: size = 1000 page = 0 - query_params = {"_source_includes": ["doc_id", "parent_id", "properties"]} while True: res = os_client.search( body=slice_query, size=size, from_=page * size, - **query_params, ) hits = res["hits"]["hits"] if hits is None or len(hits) == 0: diff --git a/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py b/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py index b20346b6c..70c691bbd 100644 --- a/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py +++ b/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py @@ -114,32 +114,32 @@ def test_ingest_and_read(self, setup_index, os_client, exec_mode): .explode() .write.opensearch( os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, - index_name=TestOpenSearchRead.INDEX, + index_name=setup_index, index_settings=TestOpenSearchRead.INDEX_SETTINGS, execute=False, ) .take_all() ) - os_client.indices.refresh(TestOpenSearchRead.INDEX) + os_client.indices.refresh(setup_index) expected_count = len(original_docs) - actual_count = get_doc_count(os_client, TestOpenSearchRead.INDEX) + actual_count = get_doc_count(os_client, setup_index) # refresh should have made all ingested docs immediately available for search assert actual_count == expected_count, f"Expected {expected_count} documents, found {actual_count}" retrieved_docs = context.read.opensearch( - os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, index_name=TestOpenSearchRead.INDEX + os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, index_name=setup_index ) target_doc_id = original_docs[-1].doc_id if original_docs[-1].doc_id else "" query = {"query": {"term": {"_id": target_doc_id}}} query_docs = context.read.opensearch( - os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, index_name=TestOpenSearchRead.INDEX, query=query + os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, index_name=setup_index, query=query ) retrieved_docs_reconstructed = context.read.opensearch( os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, - index_name=TestOpenSearchRead.INDEX, + index_name=setup_index, reconstruct_document=True, ) original_materialized = sorted(original_docs, key=lambda d: d.doc_id) @@ -148,7 +148,6 @@ def test_ingest_and_read(self, setup_index, os_client, exec_mode): query_materialized = query_docs.take_all() retrieved_materialized_reconstructed = sorted(retrieved_docs_reconstructed.take_all(), key=lambda d: d.doc_id) - os_client.indices.delete(TestOpenSearchRead.INDEX) assert len(query_materialized) == 1 # exactly one doc should be returned compare_connector_docs(original_materialized, retrieved_materialized) @@ -159,6 +158,8 @@ def test_ingest_and_read(self, setup_index, os_client, exec_mode): for i in range(len(doc.elements) - 1): assert doc.elements[i].element_index < doc.elements[i + 1].element_index + os_client.indices.delete(setup_index, ignore_unavailable=True) + def _test_ingest_and_read_via_docid_reconstructor(self, setup_index, os_client, cache_dir): """ Validates data is readable from OpenSearch, and that we can rebuild processed Sycamore documents. @@ -169,11 +170,11 @@ def _test_ingest_and_read_via_docid_reconstructor(self, setup_index, os_client, def doc_reconstructor(index_name: str, doc_id: str) -> Document: import pickle - data = pickle.load(open(f"{cache_dir}/{TestOpenSearchRead.INDEX}-{doc_id}", "rb")) + data = pickle.load(open(f"{cache_dir}/{setup_index}-{doc_id}", "rb")) return Document(**data) def doc_to_name(doc: Document, bin: bytes) -> str: - return f"{TestOpenSearchRead.INDEX}-{doc.doc_id}" + return f"{setup_index}-{doc.doc_id}" context = sycamore.init(exec_mode=EXEC_LOCAL) hidden = str(uuid.uuid4()) @@ -233,25 +234,25 @@ def doc_to_name(doc: Document, bin: bytes) -> str: .explode() .write.opensearch( os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, - index_name=TestOpenSearchRead.INDEX, + index_name=setup_index, index_settings=TestOpenSearchRead.INDEX_SETTINGS, execute=False, ) .take_all() ) - os_client.indices.refresh(TestOpenSearchRead.INDEX) + os_client.indices.refresh(setup_index) expected_count = len(original_docs) - actual_count = get_doc_count(os_client, TestOpenSearchRead.INDEX) + actual_count = get_doc_count(os_client, setup_index) # refresh should have made all ingested docs immediately available for search assert actual_count == expected_count, f"Expected {expected_count} documents, found {actual_count}" retrieved_docs_reconstructed = context.read.opensearch( os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, - index_name=TestOpenSearchRead.INDEX, + index_name=setup_index, reconstruct_document=True, - doc_reconstructor=DocumentReconstructor(TestOpenSearchRead.INDEX, doc_reconstructor), + doc_reconstructor=DocumentReconstructor(setup_index, doc_reconstructor), ).take_all() assert len(retrieved_docs_reconstructed) == 6 @@ -260,9 +261,9 @@ def doc_to_name(doc: Document, bin: bytes) -> str: assert docs == retrieved_sorted # Clean slate between Execution Modes - os_client.indices.delete(TestOpenSearchRead.INDEX) - os_client.indices.create(TestOpenSearchRead.INDEX, **TestOpenSearchRead.INDEX_SETTINGS) - os_client.indices.refresh(TestOpenSearchRead.INDEX) + os_client.indices.delete(setup_index) + os_client.indices.create(setup_index, **TestOpenSearchRead.INDEX_SETTINGS) + os_client.indices.refresh(setup_index) def test_ingest_and_read_via_docid_reconstructor(self, setup_index, os_client): with tempfile.TemporaryDirectory() as cache_dir: From f5b5e0c173171183985adc271924e785c2a76ec9 Mon Sep 17 00:00:00 2001 From: Austin Lee Date: Mon, 13 Jan 2025 15:22:25 -0800 Subject: [PATCH 4/4] Fix lint --- .../opensearch/test_opensearch_read.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py b/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py index 70c691bbd..fc20585c3 100644 --- a/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py +++ b/lib/sycamore/sycamore/tests/integration/connectors/opensearch/test_opensearch_read.py @@ -48,15 +48,15 @@ def setup_index_large(os_client): ( context.read.binary(path, binary_format="pdf") - .partition(partitioner=UnstructuredPdfPartitioner()) - .explode() - .write.opensearch( - os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, - index_name=index_name, - index_settings=TestOpenSearchRead.INDEX_SETTINGS, - execute=False, - ) - .take_all() + .partition(partitioner=UnstructuredPdfPartitioner()) + .explode() + .write.opensearch( + os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, + index_name=index_name, + index_settings=TestOpenSearchRead.INDEX_SETTINGS, + execute=False, + ) + .take_all() ) os_client.indices.refresh(index_name) @@ -66,6 +66,7 @@ def setup_index_large(os_client): # Delete after os_client.indices.delete(index_name, ignore_unavailable=True) + def get_doc_count(os_client, index_name: str, query: Optional[Dict[str, Any]] = None) -> int: res = os_client.count(index=index_name) return res["count"]