-
Notifications
You must be signed in to change notification settings - Fork 65
Initial Iceberg writer. #1499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial Iceberg writer. #1499
Changes from all commits
ddc3e1c
2312532
1713b6a
3613025
7646d2e
42321fe
b2d9d6f
c788caf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
from typing import Any, Callable, Optional, TYPE_CHECKING | ||
|
||
from sycamore.utils.import_utils import requires_modules | ||
|
||
from sycamore.data import Document, MetadataDocument | ||
from sycamore.plan_nodes import Node, Write | ||
from sycamore.schema import SchemaV2 | ||
from sycamore.utils.pyarrow import schema_to_pyarrow, docs_to_pyarrow | ||
|
||
|
||
if TYPE_CHECKING: | ||
import pyarrow as pa | ||
|
||
|
||
class IcebergWriter(Write): | ||
|
||
@requires_modules(["pyiceberg"], extra="iceberg") | ||
def __init__( | ||
self, | ||
child: Node, | ||
catalog_kwargs: dict[str, Any], | ||
schema: SchemaV2, | ||
table_identifier: str, | ||
property_root: str = "entity", | ||
location: Optional[str] = None, | ||
**resource_args, | ||
): | ||
self._catalog_kwargs = catalog_kwargs | ||
self._schema = schema | ||
self._pa_schema = schema_to_pyarrow(schema) | ||
self._table_id = table_identifier | ||
self._property_root = property_root | ||
self._location = location | ||
super().__init__(child=child, f=self, name="WriteIceberg") | ||
|
||
def _get_catalog(self): | ||
from pyiceberg.catalog import load_catalog | ||
|
||
catalog = load_catalog(**self._catalog_kwargs) | ||
return catalog | ||
|
||
def __str__(self): | ||
return f"iceberg_writer(table_id={self._table_id})" | ||
|
||
def _get_table(self): | ||
from pyiceberg.catalog import Catalog | ||
|
||
ns = Catalog.namespace_from(self._table_id) | ||
catalog = self._get_catalog() | ||
catalog.create_namespace_if_not_exists(ns) | ||
return catalog.create_table_if_not_exists(self._table_id, self._pa_schema, location=self._location) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does this do if the table exists but has a mismatching schema? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately it looks like it just matches on name (tries to create the table and if it throws an AlreadyExists exception just returns whats there). I could see some value in checking if the schemas match, but not sure it's worth it yet. |
||
|
||
def _to_property_dict(self, property_root: str = "entity") -> Callable[["pa.Table"], "pa.Table"]: | ||
schema = self._pa_schema | ||
|
||
def f(batch: "pa.Table") -> "pa.Table": | ||
doc_dict = batch.to_pydict() | ||
all_docs = [Document.deserialize(s) for s in doc_dict["doc"]] | ||
docs = [d for d in all_docs if not isinstance(d, MetadataDocument)] | ||
return docs_to_pyarrow(docs, schema) | ||
|
||
return f | ||
|
||
def execute(self, **kwargs): | ||
_ = self._get_table() # Creates the table if it does not exist. | ||
dataset = self.child().execute(**kwargs) | ||
dataset.map_batches(self._to_property_dict(), batch_format="pyarrow").write_iceberg( | ||
self._table_id, catalog_kwargs=self._catalog_kwargs, **kwargs | ||
) | ||
return dataset | ||
|
||
def local_execute(self, all_docs: list["Document"]) -> list["Document"]: | ||
table = self._get_table() | ||
|
||
new_docs = docs_to_pyarrow(all_docs, self._pa_schema) | ||
table.append(new_docs) | ||
|
||
return all_docs |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from pyiceberg.catalog import load_catalog | ||
import pytest | ||
|
||
import sycamore | ||
from sycamore.connectors.iceberg.iceberg_writer import IcebergWriter | ||
from sycamore.data import Document | ||
from sycamore.schema import SchemaV2, make_named_property | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def catalog_options(tmp_path) -> dict: | ||
return { | ||
"type": "sql", | ||
"uri": f"sqlite:////{str(tmp_path / 'iceberg_test.db')}", | ||
} | ||
|
||
|
||
schema = SchemaV2( | ||
properties=[ | ||
make_named_property(name="field1", type="string"), | ||
make_named_property(name="field2", type="int"), | ||
] | ||
) | ||
|
||
docs = [ | ||
Document(properties={"entity": {"field1": "value1", "field2": 123}}), | ||
Document(properties={"entity": {"field1": "value2", "field2": 456}}), | ||
] | ||
table_id = "test_namespace.simple_table" | ||
|
||
|
||
def check_table(catalog_options): | ||
table = load_catalog(**catalog_options).load_table(table_id) | ||
table_dict = table.scan().to_arrow().to_pydict() | ||
|
||
assert table_dict == {"field1": ["value1", "value2"], "field2": [123, 456]} or table_dict == { | ||
"field1": ["value2", "value1"], | ||
"field2": [456, 123], | ||
} | ||
|
||
|
||
@pytest.mark.parametrize("mode", [sycamore.EXEC_LOCAL, sycamore.EXEC_RAY]) | ||
def test_iceberg_writer(mode, catalog_options, tmp_path) -> None: | ||
ctx = sycamore.init(exec_mode=mode) | ||
( | ||
ctx.read.document(docs) | ||
.transform( | ||
IcebergWriter, | ||
catalog_kwargs=catalog_options, | ||
schema=schema, | ||
table_identifier="test_namespace.simple_table", | ||
location=str(tmp_path), | ||
) | ||
.execute() | ||
Comment on lines
+47
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add a |
||
) | ||
|
||
check_table(catalog_options) | ||
|
||
|
||
def test_iceberg_docset_writer(catalog_options, tmp_path) -> None: | ||
ctx = sycamore.init(exec_mode=sycamore.EXEC_LOCAL) | ||
ctx.read.document(docs).write.iceberg( | ||
catalog_kwargs=catalog_options, | ||
schema=schema, | ||
table_identifier="test_namespace.simple_table", | ||
location=str(tmp_path), | ||
) | ||
|
||
check_table(catalog_options) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
from datetime import date | ||
import pyarrow as pa | ||
|
||
from sycamore.data import Document | ||
from sycamore.utils.pyarrow import schema_to_pyarrow, docs_to_pyarrow | ||
from sycamore.schema import SchemaV2, make_property, make_named_property | ||
|
||
|
||
def test_scalar_schema_to_pyarrow(): | ||
schema = SchemaV2( | ||
properties=[ | ||
make_named_property(name="bool", type="bool"), | ||
make_named_property(name="int", type="int"), | ||
make_named_property(name="float", type="float"), | ||
make_named_property(name="string", type="string"), | ||
make_named_property(name="date", type="date"), | ||
make_named_property(name="datetime", type="datetime"), | ||
] | ||
) | ||
|
||
pa_schema = schema_to_pyarrow(schema) | ||
assert pa_schema.types == [pa.bool_(), pa.int64(), pa.float64(), pa.string(), pa.date32(), pa.date64()] | ||
|
||
|
||
def test_choice_schema_to_pyarrow(): | ||
schema = SchemaV2( | ||
properties=[ | ||
make_named_property(name="choice1", type="choice", choices=["choice1", "choice2"]), | ||
make_named_property(name="choice2", type="choice", choices=[1, 2]), | ||
make_named_property(name="choice3", type="choice", choices=[date(2023, 1, 1), date(2024, 1, 1)]), | ||
] | ||
) | ||
|
||
pa_schema = schema_to_pyarrow(schema) | ||
assert pa_schema.types == [pa.string(), pa.int64(), pa.date32()] | ||
|
||
|
||
def test_nested_schema_to_pyarrow(): | ||
schema = SchemaV2( | ||
properties=[ | ||
make_named_property(name="name", type="string", required=True), | ||
make_named_property(name="age", type="int"), | ||
make_named_property( | ||
name="children", | ||
type="array", | ||
item_type=make_property( | ||
type="object", | ||
properties=[ | ||
make_named_property(name="name", type="string", required=True), | ||
make_named_property(name="age", type="int", required=True), | ||
], | ||
), | ||
), | ||
] | ||
) | ||
|
||
pa_schema = schema_to_pyarrow(schema) | ||
types = pa_schema.types | ||
assert types[0] == pa.string() | ||
assert types[1] == pa.int64() | ||
assert types[2] == pa.list_(pa.struct([pa.field("name", pa.string()), pa.field("age", pa.int64())])) | ||
|
||
|
||
def test_doc_to_pyarrow() -> None: | ||
schema = SchemaV2( | ||
properties=[ | ||
make_named_property(name="name", type="string", required=True), | ||
make_named_property(name="age", type="int"), | ||
make_named_property( | ||
name="children", | ||
type="array", | ||
item_type=make_property( | ||
type="object", | ||
properties=[ | ||
make_named_property(name="name", type="string", required=True), | ||
make_named_property(name="age", type="int", required=True), | ||
], | ||
), | ||
), | ||
] | ||
) | ||
|
||
pa_schema = schema_to_pyarrow(schema) | ||
|
||
people = [ | ||
{ | ||
"name": "Alice", | ||
"age": 40, | ||
"children": [{"name": "Bob", "age": 10}, {"name": "Charlie", "age": 8}], | ||
}, | ||
{ | ||
"name": "Swathi", | ||
"age": 32, | ||
"children": [ | ||
{"name": "Keshav", "age": 4}, | ||
], | ||
}, | ||
{"name": "John", "age": 45, "children": None}, | ||
] | ||
|
||
docs = [Document({"properties": {"entity": person}}) for person in people] | ||
|
||
table = docs_to_pyarrow(docs, pa_schema) | ||
assert table.to_pylist() == people |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from sycamore.utils.pyarrow.fs import infer_fs, maybe_use_anonymous_s3_fs, cross_check_infer_fs | ||
from sycamore.utils.pyarrow.types import ( | ||
named_property_to_pyarrow, | ||
property_to_pyarrow, | ||
schema_to_pyarrow, | ||
docs_to_pyarrow, | ||
) | ||
|
||
__all__ = [ | ||
"infer_fs", | ||
"maybe_use_anonymous_s3_fs", | ||
"cross_check_infer_fs", | ||
"named_property_to_pyarrow", | ||
"property_to_pyarrow", | ||
"schema_to_pyarrow", | ||
"docs_to_pyarrow", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
root?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done