-
Notifications
You must be signed in to change notification settings - Fork 65
Initial Iceberg writer. #1499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Initial Iceberg writer. #1499
Conversation
This commit contains an Iceberg Write node that writes properties to an Iceberg table using the PyIceberg library. The implementation differs somewhat for local mode and Ray mode. In local mode, we convert the DocSet into a pyarrow table and use pyiceberg's native arrow support for writing. In Ray mode, we still leverage pyarrow, but we use it to convert the Docset into a Ray Dataset of the properties and then use the Ray Data Iceberg writer directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds an Iceberg writer implementation that enables writing Sycamore documents to Apache Iceberg tables using the PyIceberg library. The implementation provides support for both local execution mode and Ray distributed execution mode.
- Adds PyArrow type conversion utilities for Sycamore schemas and documents
- Implements IcebergWriter transform node with dual execution paths (local vs Ray)
- Includes comprehensive test coverage for type conversions and end-to-end writer functionality
Reviewed Changes
Copilot reviewed 6 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
lib/sycamore/sycamore/utils/pyarrow/types.py | Core PyArrow type conversion functions for schemas and documents |
lib/sycamore/sycamore/utils/pyarrow/init.py | Package exports for PyArrow utilities |
lib/sycamore/sycamore/tests/unit/utils/test_pyarrow.py | Unit tests for PyArrow type conversion functionality |
lib/sycamore/sycamore/tests/unit/connectors/iceberg/test_iceberg_writer.py | Integration tests for IcebergWriter |
lib/sycamore/sycamore/connectors/iceberg/iceberg_writer.py | Main IcebergWriter implementation |
lib/sycamore/pyproject.toml | Dependency and extras configuration for PyIceberg |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Args: | ||
docs: The list of documents to convert. | ||
schema: The Scyamore or PyArrow Schema defining the structure of the |
Copilot
AI
Oct 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'Scyamore' to 'Sycamore'.
schema: The Scyamore or PyArrow Schema defining the structure of the | |
schema: The Sycamore or PyArrow Schema defining the structure of the |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol. Oops.
else: | ||
pa_schema = schema | ||
|
||
for field in schema: |
Copilot
AI
Oct 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line uses schema
instead of pa_schema
for iteration. When schema
is a SchemaV2
object, it may not be directly iterable like a PyArrow schema.
for field in schema: | |
for field in pa_schema: |
Copilot uses AI. Check for mistakes.
schema = self._pa_schema | ||
|
||
def f(batch: "pa.Table") -> "pa.Table": | ||
print(type(batch)) |
Copilot
AI
Oct 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed from production code.
print(type(batch)) |
Copilot uses AI. Check for mistakes.
4 Jobs Failed: Testing / integ-tests (3.10) failed on "Run Integ tests"
Testing / notebook-tests-docprep (3.10) failed on "Run DocPrep Notebook Tests"
Testing / notebook-tests-slow (3.10) failed on "Run Notebook tests"
1 job failed running on non-Blacksmith runners. Summary: 4 successful workflows, 1 failed workflow
Last updated: 2025-10-15 19:20:51 UTC |
.transform( | ||
IcebergWriter, | ||
catalog_kwargs=catalog_options, | ||
schema=schema, | ||
table_identifier="test_namespace.simple_table", | ||
location=str(tmp_path), | ||
) | ||
.execute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a docset.write.iceberg
?
ns = Catalog.namespace_from(self._table_id) | ||
catalog = self._get_catalog() | ||
catalog.create_namespace_if_not_exists(ns) | ||
return catalog.create_table_if_not_exists(self._table_id, self._pa_schema, location=self._location) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this do if the table exists but has a mismatching schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately it looks like it just matches on name (tries to create the table and if it throws an AlreadyExists exception just returns whats there). I could see some value in checking if the schemas match, but not sure it's worth it yet.
pinecone-text = { version = "^0.11.0", optional = true } | ||
weaviate-client = { version = "^4.16.10", optional = true } | ||
qdrant-client = { version = "^1.11.2", optional = true } | ||
pyiceberg = { version = "^0.10.0", optional = true} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any idea which extras we get from our existing dependencies? I assume downstream users have to add the extras they care about on their own
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I quite understand what you mean. Users would have to add the extras. I don't think we get pyiceberg for free. Ray does use it for it's iceberg writer, but I believe it's in an extra there as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean pyiceberg has a bunch of extras, e.g. for Glue support. In your unit test you're doing sqlite, which is listed as an extra here https://py.iceberg.apache.org/ - so my assumption is something else (probably ray) depends on sqlite to do stuff which pulls it in. The question is which other iceberg extras are like sqlite in that regard
Mostly I'm just curious so if this is a pain to figure out don't bother
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. Makes sense. I confess I didn't look to closely, but I'll do another pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good call. It looks like we are getting sqlalchemy from langchain of all places. I'll add that one explicitly to the iceberg extra. The other extras (like pyarrow) I'm a bit less worried about because they are core dependencies of things like Ray.
# Recursively build the array for the flattened child data. | ||
child_array = _build_array(flattened_data, arrow_type.value_type) | ||
|
||
null_mask = pa.array([not v for v in validity], type=pa.bool_()) | ||
return pa.ListArray.from_arrays(offsets, child_array, mask=null_mask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seeing if I understand correctly:
- offsets is a list of indices into the flattened array
- each 'document' gets a corresponding pair of offsets for [start, end).
- if a document has an empty list the offsets array ends up with pair of matching offsets
- if a document has a non-list we treat it as an empty list for the offsets but write down that it's actually a null
- resulting offset array has length = len(documents) + 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! You've got it. It took me quite a few iterations with Gemini to figure this out. The first snippet of code I found called the null_mask the "validity_mask", so I had it backwards in my head for a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
root?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
This commit contains an Iceberg Write node that writes properties to an Iceberg table using the PyIceberg library.
The implementation differs somewhat for local mode and Ray mode. In local mode, we convert the DocSet into a pyarrow table and use pyiceberg's native arrow support for writing. In Ray mode, we still leverage pyarrow, but we use it to convert the Docset into a Ray Dataset of the properties and then use the Ray Data Iceberg writer directly.