+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 205 additions & 7 deletions lib/sycamore/poetry.lock
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

root?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions lib/sycamore/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pinecone = { version = "^7.3.0", extras = ["grpc"], optional = true }
pinecone-text = { version = "^0.11.0", optional = true }
weaviate-client = { version = "^4.16.10", optional = true }
qdrant-client = { version = "^1.11.2", optional = true }
pyiceberg = { version = "^0.10.0", extras = ["sql-sqlite"], optional = true}


# Local inference dependencies
easyocr = { version = "^1.7.1", optional = true }
Expand Down Expand Up @@ -143,6 +145,7 @@ opensearch = ["opensearch-py"]
pinecone = ["pinecone", "pinecone-text"]
weaviate = ["weaviate-client"]
qdrant = ["qdrant-client"]
iceberg = ["pyiceberg"]

# Partitioner extras
local-inference = [
Expand Down
78 changes: 78 additions & 0 deletions lib/sycamore/sycamore/connectors/iceberg/iceberg_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Any, Callable, Optional, TYPE_CHECKING

from sycamore.utils.import_utils import requires_modules

from sycamore.data import Document, MetadataDocument
from sycamore.plan_nodes import Node, Write
from sycamore.schema import SchemaV2
from sycamore.utils.pyarrow import schema_to_pyarrow, docs_to_pyarrow


if TYPE_CHECKING:
import pyarrow as pa


class IcebergWriter(Write):

@requires_modules(["pyiceberg"], extra="iceberg")
def __init__(
self,
child: Node,
catalog_kwargs: dict[str, Any],
schema: SchemaV2,
table_identifier: str,
property_root: str = "entity",
location: Optional[str] = None,
**resource_args,
):
self._catalog_kwargs = catalog_kwargs
self._schema = schema
self._pa_schema = schema_to_pyarrow(schema)
self._table_id = table_identifier
self._property_root = property_root
self._location = location
super().__init__(child=child, f=self, name="WriteIceberg")

def _get_catalog(self):
from pyiceberg.catalog import load_catalog

catalog = load_catalog(**self._catalog_kwargs)
return catalog

def __str__(self):
return f"iceberg_writer(table_id={self._table_id})"

def _get_table(self):
from pyiceberg.catalog import Catalog

ns = Catalog.namespace_from(self._table_id)
catalog = self._get_catalog()
catalog.create_namespace_if_not_exists(ns)
return catalog.create_table_if_not_exists(self._table_id, self._pa_schema, location=self._location)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do if the table exists but has a mismatching schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it looks like it just matches on name (tries to create the table and if it throws an AlreadyExists exception just returns whats there). I could see some value in checking if the schemas match, but not sure it's worth it yet.


def _to_property_dict(self, property_root: str = "entity") -> Callable[["pa.Table"], "pa.Table"]:
schema = self._pa_schema

def f(batch: "pa.Table") -> "pa.Table":
doc_dict = batch.to_pydict()
all_docs = [Document.deserialize(s) for s in doc_dict["doc"]]
docs = [d for d in all_docs if not isinstance(d, MetadataDocument)]
return docs_to_pyarrow(docs, schema)

return f

def execute(self, **kwargs):
_ = self._get_table() # Creates the table if it does not exist.
dataset = self.child().execute(**kwargs)
dataset.map_batches(self._to_property_dict(), batch_format="pyarrow").write_iceberg(
self._table_id, catalog_kwargs=self._catalog_kwargs, **kwargs
)
return dataset

def local_execute(self, all_docs: list["Document"]) -> list["Document"]:
table = self._get_table()

new_docs = docs_to_pyarrow(all_docs, self._pa_schema)
table.append(new_docs)

return all_docs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from pyiceberg.catalog import load_catalog
import pytest

import sycamore
from sycamore.connectors.iceberg.iceberg_writer import IcebergWriter
from sycamore.data import Document
from sycamore.schema import SchemaV2, make_named_property


@pytest.fixture(scope="function")
def catalog_options(tmp_path) -> dict:
return {
"type": "sql",
"uri": f"sqlite:////{str(tmp_path / 'iceberg_test.db')}",
}


schema = SchemaV2(
properties=[
make_named_property(name="field1", type="string"),
make_named_property(name="field2", type="int"),
]
)

docs = [
Document(properties={"entity": {"field1": "value1", "field2": 123}}),
Document(properties={"entity": {"field1": "value2", "field2": 456}}),
]
table_id = "test_namespace.simple_table"


def check_table(catalog_options):
table = load_catalog(**catalog_options).load_table(table_id)
table_dict = table.scan().to_arrow().to_pydict()

assert table_dict == {"field1": ["value1", "value2"], "field2": [123, 456]} or table_dict == {
"field1": ["value2", "value1"],
"field2": [456, 123],
}


@pytest.mark.parametrize("mode", [sycamore.EXEC_LOCAL, sycamore.EXEC_RAY])
def test_iceberg_writer(mode, catalog_options, tmp_path) -> None:
ctx = sycamore.init(exec_mode=mode)
(
ctx.read.document(docs)
.transform(
IcebergWriter,
catalog_kwargs=catalog_options,
schema=schema,
table_identifier="test_namespace.simple_table",
location=str(tmp_path),
)
.execute()
Comment on lines +47 to +54
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a docset.write.iceberg?

)

check_table(catalog_options)


def test_iceberg_docset_writer(catalog_options, tmp_path) -> None:
ctx = sycamore.init(exec_mode=sycamore.EXEC_LOCAL)
ctx.read.document(docs).write.iceberg(
catalog_kwargs=catalog_options,
schema=schema,
table_identifier="test_namespace.simple_table",
location=str(tmp_path),
)

check_table(catalog_options)
104 changes: 104 additions & 0 deletions lib/sycamore/sycamore/tests/unit/utils/test_pyarrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from datetime import date
import pyarrow as pa

from sycamore.data import Document
from sycamore.utils.pyarrow import schema_to_pyarrow, docs_to_pyarrow
from sycamore.schema import SchemaV2, make_property, make_named_property


def test_scalar_schema_to_pyarrow():
schema = SchemaV2(
properties=[
make_named_property(name="bool", type="bool"),
make_named_property(name="int", type="int"),
make_named_property(name="float", type="float"),
make_named_property(name="string", type="string"),
make_named_property(name="date", type="date"),
make_named_property(name="datetime", type="datetime"),
]
)

pa_schema = schema_to_pyarrow(schema)
assert pa_schema.types == [pa.bool_(), pa.int64(), pa.float64(), pa.string(), pa.date32(), pa.date64()]


def test_choice_schema_to_pyarrow():
schema = SchemaV2(
properties=[
make_named_property(name="choice1", type="choice", choices=["choice1", "choice2"]),
make_named_property(name="choice2", type="choice", choices=[1, 2]),
make_named_property(name="choice3", type="choice", choices=[date(2023, 1, 1), date(2024, 1, 1)]),
]
)

pa_schema = schema_to_pyarrow(schema)
assert pa_schema.types == [pa.string(), pa.int64(), pa.date32()]


def test_nested_schema_to_pyarrow():
schema = SchemaV2(
properties=[
make_named_property(name="name", type="string", required=True),
make_named_property(name="age", type="int"),
make_named_property(
name="children",
type="array",
item_type=make_property(
type="object",
properties=[
make_named_property(name="name", type="string", required=True),
make_named_property(name="age", type="int", required=True),
],
),
),
]
)

pa_schema = schema_to_pyarrow(schema)
types = pa_schema.types
assert types[0] == pa.string()
assert types[1] == pa.int64()
assert types[2] == pa.list_(pa.struct([pa.field("name", pa.string()), pa.field("age", pa.int64())]))


def test_doc_to_pyarrow() -> None:
schema = SchemaV2(
properties=[
make_named_property(name="name", type="string", required=True),
make_named_property(name="age", type="int"),
make_named_property(
name="children",
type="array",
item_type=make_property(
type="object",
properties=[
make_named_property(name="name", type="string", required=True),
make_named_property(name="age", type="int", required=True),
],
),
),
]
)

pa_schema = schema_to_pyarrow(schema)

people = [
{
"name": "Alice",
"age": 40,
"children": [{"name": "Bob", "age": 10}, {"name": "Charlie", "age": 8}],
},
{
"name": "Swathi",
"age": 32,
"children": [
{"name": "Keshav", "age": 4},
],
},
{"name": "John", "age": 45, "children": None},
]

docs = [Document({"properties": {"entity": person}}) for person in people]

table = docs_to_pyarrow(docs, pa_schema)
assert table.to_pylist() == people
17 changes: 17 additions & 0 deletions lib/sycamore/sycamore/utils/pyarrow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sycamore.utils.pyarrow.fs import infer_fs, maybe_use_anonymous_s3_fs, cross_check_infer_fs
from sycamore.utils.pyarrow.types import (
named_property_to_pyarrow,
property_to_pyarrow,
schema_to_pyarrow,
docs_to_pyarrow,
)

__all__ = [
"infer_fs",
"maybe_use_anonymous_s3_fs",
"cross_check_infer_fs",
"named_property_to_pyarrow",
"property_to_pyarrow",
"schema_to_pyarrow",
"docs_to_pyarrow",
]
Loading
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载