+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def __init__(self, client: "OpenSearch"):
@classmethod
@requires_modules("opensearchpy", extra="opensearch")
def from_client_params(cls, params: BaseDBReader.ClientParams) -> "OpenSearchReaderClient":
from opensearchpy import OpenSearch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here

from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

assert isinstance(params, OpenSearchReaderClientParams)
client = OpenSearch(**params.os_client_args)
client = OpenSearchClientWithLogging(**params.os_client_args)
return OpenSearchReaderClient(client)

def read_records(self, query_params: BaseDBReader.QueryParams) -> "OpenSearchReaderQueryResponse":
Expand Down Expand Up @@ -276,7 +276,7 @@ def __init__(
):
assert isinstance(
query_params, OpenSearchReaderQueryParams
), f"Wrong kind of query parameters found: {self._query_params}"
), f"Wrong kind of query parameters found: {query_params}"

super().__init__(client_params, query_params, **kwargs)
self._client_params = client_params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ def __init__(self, os_client: "OpenSearch"):
@classmethod
@requires_modules(["opensearchpy", "opensearchpy.helpers"], extra="opensearch")
def from_client_params(cls, params: BaseDBWriter.ClientParams) -> "OpenSearchWriterClient":
from opensearchpy import OpenSearch
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

assert isinstance(
params, OpenSearchWriterClientParams
), f"Provided params was not of type OpenSearchWriterClientParams:\n{params}"
paramsdict = asdict(params)
os_client = OpenSearch(**paramsdict)
os_client = OpenSearchClientWithLogging(**paramsdict)
os_client.ping()
return OpenSearchWriterClient(os_client)

Expand Down
18 changes: 17 additions & 1 deletion lib/sycamore/sycamore/connectors/opensearch/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
from typing import Optional
import logging
from typing import Optional, Any

from opensearchpy import OpenSearch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's the unconditional import


from sycamore import Context
from sycamore.context import context_params
from sycamore.transforms import Embedder


logger = logging.getLogger("opensearch")


class OpenSearchClientWithLogging(OpenSearch):
def search(self, **kwargs) -> Any:
"""Helper method to execute OpenSearch search queries, and silent errors."""
response = super().search(**kwargs)
shards = response.get("_shards", {})
if shards.get("total") != shards.get("successful"):
logger.error(f"OpenSearch query skipped shards: {response}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to fish out "query" from kwargs (if it's present) and log it along with the response? Do you think we need to log the entire response payload? It could be big.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this in for now while we're debugging so we know which results actually got back. If it happens frequently enough and we handle it we can clean

return response


@context_params("opensearch")
def get_knn_query(
text_embedder: Embedder,
Expand Down
5 changes: 3 additions & 2 deletions lib/sycamore/sycamore/query/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import structlog
import yaml
from rich.console import Console

from sycamore.schema import Schema

import sycamore
Expand Down Expand Up @@ -131,7 +132,7 @@ def __init__(
llm: Optional[Union[LLM, str]] = None,
query_plan_strategy: Optional[QueryPlanStrategy] = None,
):
from opensearchpy import OpenSearch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here

from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

self.llm_cache_dir = llm_cache_dir
self.os_config = os_config
Expand All @@ -155,7 +156,7 @@ def __init__(

assert self.context.params, "Could not find required params in Context"
self.os_client_args = self.context.params.get("opensearch", {}).get("os_client_args", os_client_args)
self._os_client = OpenSearch(**self.os_client_args)
self._os_client = OpenSearchClientWithLogging(**self.os_client_args)
self._os_query_executor = OpenSearchQueryExecutor(self.os_client_args)

def get_opensearch_indices(self) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import os
import tempfile

from opensearchpy import OpenSearch
import sycamore
from sycamore.connectors.file.file_scan import JsonManifestMetadataProvider
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.tests.config import TEST_DIR
from sycamore.transforms.embed import SentenceTransformerEmbedder
from sycamore.transforms.partition import HtmlPartitioner
Expand Down Expand Up @@ -75,4 +75,4 @@ def test_html_to_opensearch(exec_mode):
ds.write.opensearch(os_client_args=os_client_args, index_name="toyindex", index_settings=index_settings)
finally:
tmp_manifest.close()
OpenSearch(**os_client_args).indices.delete("toyindex")
OpenSearchClientWithLogging(**os_client_args).indices.delete("toyindex")
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Optional, Dict, Any

import pytest
from opensearchpy import OpenSearch

import sycamore
from sycamore import EXEC_LOCAL, ExecMode
Expand All @@ -21,7 +20,9 @@

@pytest.fixture(scope="class")
def os_client():
client = OpenSearch(**TestOpenSearchRead.OS_CLIENT_ARGS)
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

client = OpenSearchClientWithLogging(**TestOpenSearchRead.OS_CLIENT_ARGS)
yield client


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import boto3
from urllib.parse import urlparse
from opensearchpy import OpenSearch

import sycamore
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.context import OperationTypes, ExecMode
from sycamore.functions import HuggingFaceTokenizer
from sycamore.llms import OpenAIModels, OpenAI
Expand Down Expand Up @@ -143,7 +143,7 @@ def test_pdf_to_opensearch_with_llm_caching():
)
ds.write.opensearch()

OpenSearch(**os_client_args).indices.delete("toyindex")
OpenSearchClientWithLogging(**os_client_args).indices.delete("toyindex")

# validate caching

Expand Down
4 changes: 2 additions & 2 deletions lib/sycamore/sycamore/tests/integration/query/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
import pytest
from opensearchpy import OpenSearch

import sycamore
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.functions import HuggingFaceTokenizer
from sycamore.tests.config import TEST_DIR
from sycamore.transforms.embed import SentenceTransformerEmbedder
Expand Down Expand Up @@ -69,7 +69,7 @@ def query_integration_test_index():
index_name=QUERY_INTEGRATION_TEST_INDEX_NAME,
index_settings=index_settings,
)
osc = OpenSearch(**OS_CLIENT_ARGS)
osc = OpenSearchClientWithLogging(**OS_CLIENT_ARGS)
osc.indices.refresh(QUERY_INTEGRATION_TEST_INDEX_NAME)
yield QUERY_INTEGRATION_TEST_INDEX_NAME
osc.indices.delete(QUERY_INTEGRATION_TEST_INDEX_NAME)
5 changes: 2 additions & 3 deletions lib/sycamore/sycamore/tests/integration/query/test_planner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from opensearchpy import OpenSearch

from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.tests.integration.query.conftest import OS_CLIENT_ARGS, OS_CONFIG
from sycamore.query.planner import LlmPlanner
from sycamore.query.schema import OpenSearchSchema, OpenSearchSchemaField
Expand All @@ -10,7 +9,7 @@ def test_simple_llm_planner(query_integration_test_index: str):
Simple test ensuring nodes are being created and dependencies are being set.
Using a simple query here for consistent query plans.
"""
os_client = OpenSearch(OS_CLIENT_ARGS)
os_client = OpenSearchClientWithLogging(OS_CLIENT_ARGS)

schema = OpenSearchSchema(
fields={
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import pytest
import sycamore
from sycamore import ExecMode
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.data.document import OpenSearchQuery
from sycamore.tests.config import TEST_DIR
from sycamore.transforms.embed import SentenceTransformerEmbedder
from sycamore.transforms.partition import UnstructuredPdfPartitioner
from sycamore.transforms.query import OpenSearchQueryExecutor
from opensearchpy import OpenSearch


@pytest.fixture(scope="class")
Expand Down Expand Up @@ -47,7 +47,7 @@ def setup_index():
index_name=TestQueryOpenSearch.INDEX,
index_settings=index_settings,
)
osc = OpenSearch(**TestQueryOpenSearch.OS_CLIENT_ARGS)
osc = OpenSearchClientWithLogging(**TestQueryOpenSearch.OS_CLIENT_ARGS)
osc.indices.refresh(TestQueryOpenSearch.INDEX)
yield TestQueryOpenSearch.INDEX
osc.indices.delete(TestQueryOpenSearch.INDEX)
Expand Down
5 changes: 3 additions & 2 deletions lib/sycamore/sycamore/transforms/query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import abstractmethod, ABC
from typing import Any

from sycamore.utils.import_utils import requires_modules

from sycamore.data import OpenSearchQueryResult, Element, OpenSearchQuery
Expand All @@ -26,10 +27,10 @@ def __init__(self, os_client_args: dict) -> None:

@requires_modules("opensearchpy", extra="opensearch")
def query(self, query: OpenSearchQuery) -> OpenSearchQueryResult:
from opensearchpy import OpenSearch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob need to keep the import sycamore.connectors.opensearch.utils here instead of at the top

from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

logger.debug("Executing OS query: " + str(query))
client = OpenSearch(**self._os_client_args)
client = OpenSearchClientWithLogging(**self._os_client_args)

os_result = client.transport.perform_request(
"POST",
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载