+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,11 @@ def _to_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]:
size = 1000
page = 0

query_params = {"_source_includes": ["doc_id", "parent_id", "properties"]}
while True:
res = os_client.search(
body=slice_query,
size=size,
from_=page * size,
**query_params,
)
hits = res["hits"]["hits"]
if hits is None or len(hits) == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,32 @@ def setup_index(os_client):

@pytest.fixture(scope="class")
def setup_index_large(os_client):
index_name = "test_opensearch_read_2"
os_client.indices.delete(index_name, ignore_unavailable=True)
os_client.indices.create(index_name, **TestOpenSearchRead.INDEX_SETTINGS)

path = str(TEST_DIR / "resources/data/pdfs/Ray.pdf")
context = sycamore.init(exec_mode=ExecMode.RAY)

(
context.read.binary(path, binary_format="pdf")
.partition(partitioner=UnstructuredPdfPartitioner())
.explode()
.write.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=index_name,
index_settings=TestOpenSearchRead.INDEX_SETTINGS,
execute=False,
)
.take_all()
)

os_client.indices.refresh(index_name)

yield "test_opensearch_read_large"
yield index_name

# Delete after
os_client.indices.delete(index_name, ignore_unavailable=True)


def get_doc_count(os_client, index_name: str, query: Optional[Dict[str, Any]] = None) -> int:
Expand Down Expand Up @@ -91,32 +115,32 @@ def test_ingest_and_read(self, setup_index, os_client, exec_mode):
.explode()
.write.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=TestOpenSearchRead.INDEX,
index_name=setup_index,
index_settings=TestOpenSearchRead.INDEX_SETTINGS,
execute=False,
)
.take_all()
)

os_client.indices.refresh(TestOpenSearchRead.INDEX)
os_client.indices.refresh(setup_index)

expected_count = len(original_docs)
actual_count = get_doc_count(os_client, TestOpenSearchRead.INDEX)
actual_count = get_doc_count(os_client, setup_index)
# refresh should have made all ingested docs immediately available for search
assert actual_count == expected_count, f"Expected {expected_count} documents, found {actual_count}"

retrieved_docs = context.read.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, index_name=TestOpenSearchRead.INDEX
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, index_name=setup_index
)
target_doc_id = original_docs[-1].doc_id if original_docs[-1].doc_id else ""
query = {"query": {"term": {"_id": target_doc_id}}}
query_docs = context.read.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, index_name=TestOpenSearchRead.INDEX, query=query
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS, index_name=setup_index, query=query
)

retrieved_docs_reconstructed = context.read.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=TestOpenSearchRead.INDEX,
index_name=setup_index,
reconstruct_document=True,
)
original_materialized = sorted(original_docs, key=lambda d: d.doc_id)
Expand All @@ -125,7 +149,6 @@ def test_ingest_and_read(self, setup_index, os_client, exec_mode):
query_materialized = query_docs.take_all()
retrieved_materialized_reconstructed = sorted(retrieved_docs_reconstructed.take_all(), key=lambda d: d.doc_id)

os_client.indices.delete(TestOpenSearchRead.INDEX)
assert len(query_materialized) == 1 # exactly one doc should be returned
compare_connector_docs(original_materialized, retrieved_materialized)

Expand All @@ -136,6 +159,8 @@ def test_ingest_and_read(self, setup_index, os_client, exec_mode):
for i in range(len(doc.elements) - 1):
assert doc.elements[i].element_index < doc.elements[i + 1].element_index

os_client.indices.delete(setup_index, ignore_unavailable=True)

def _test_ingest_and_read_via_docid_reconstructor(self, setup_index, os_client, cache_dir):
"""
Validates data is readable from OpenSearch, and that we can rebuild processed Sycamore documents.
Expand All @@ -146,11 +171,11 @@ def _test_ingest_and_read_via_docid_reconstructor(self, setup_index, os_client,
def doc_reconstructor(index_name: str, doc_id: str) -> Document:
import pickle

data = pickle.load(open(f"{cache_dir}/{TestOpenSearchRead.INDEX}-{doc_id}", "rb"))
data = pickle.load(open(f"{cache_dir}/{setup_index}-{doc_id}", "rb"))
return Document(**data)

def doc_to_name(doc: Document, bin: bytes) -> str:
return f"{TestOpenSearchRead.INDEX}-{doc.doc_id}"
return f"{setup_index}-{doc.doc_id}"

context = sycamore.init(exec_mode=EXEC_LOCAL)
hidden = str(uuid.uuid4())
Expand Down Expand Up @@ -210,25 +235,25 @@ def doc_to_name(doc: Document, bin: bytes) -> str:
.explode()
.write.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=TestOpenSearchRead.INDEX,
index_name=setup_index,
index_settings=TestOpenSearchRead.INDEX_SETTINGS,
execute=False,
)
.take_all()
)

os_client.indices.refresh(TestOpenSearchRead.INDEX)
os_client.indices.refresh(setup_index)

expected_count = len(original_docs)
actual_count = get_doc_count(os_client, TestOpenSearchRead.INDEX)
actual_count = get_doc_count(os_client, setup_index)
# refresh should have made all ingested docs immediately available for search
assert actual_count == expected_count, f"Expected {expected_count} documents, found {actual_count}"

retrieved_docs_reconstructed = context.read.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=TestOpenSearchRead.INDEX,
index_name=setup_index,
reconstruct_document=True,
doc_reconstructor=DocumentReconstructor(TestOpenSearchRead.INDEX, doc_reconstructor),
doc_reconstructor=DocumentReconstructor(setup_index, doc_reconstructor),
).take_all()

assert len(retrieved_docs_reconstructed) == 6
Expand All @@ -237,9 +262,9 @@ def doc_to_name(doc: Document, bin: bytes) -> str:
assert docs == retrieved_sorted

# Clean slate between Execution Modes
os_client.indices.delete(TestOpenSearchRead.INDEX)
os_client.indices.create(TestOpenSearchRead.INDEX, **TestOpenSearchRead.INDEX_SETTINGS)
os_client.indices.refresh(TestOpenSearchRead.INDEX)
os_client.indices.delete(setup_index)
os_client.indices.create(setup_index, **TestOpenSearchRead.INDEX_SETTINGS)
os_client.indices.refresh(setup_index)

def test_ingest_and_read_via_docid_reconstructor(self, setup_index, os_client):
with tempfile.TemporaryDirectory() as cache_dir:
Expand Down Expand Up @@ -466,7 +491,7 @@ def get_ids(

return docs

def test_pagination(self, setup_index_large, os_client):
def _test_pagination(self, setup_index_large, os_client):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why disable these?
also fyi you can add a @pytest.skip decorator which I think is a little clearer

res = os_client.create_pit(index=setup_index_large, keep_alive="10m")
pit_id = res["pit_id"]
bodies = []
Expand Down Expand Up @@ -514,7 +539,7 @@ def search_slice(body, os_client) -> list[dict]:
expected_docs = self.get_ids(os_client, setup_index_large)
assert len(all_hits) == len(expected_docs)

def test_bulk_load(self, setup_index_large, os_client):
def _test_bulk_load(self, setup_index_large, os_client):

# Only run this to populate a test index.
return
Expand Down Expand Up @@ -553,7 +578,7 @@ def test_bulk_load(self, setup_index_large, os_client):

print(f"Current count: {doc_count}")

def test_cat(self, setup_index_large, os_client):
def _test_cat(self, setup_index_large, os_client):
response = os_client.cat.shards(index=setup_index_large, format="json")
print(response)
doc_count = 0
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载