+
Skip to content

Conversation

bsowell
Copy link
Contributor

@bsowell bsowell commented Oct 14, 2025

This commit contains an Iceberg Write node that writes properties to an Iceberg table using the PyIceberg library.

The implementation differs somewhat for local mode and Ray mode. In local mode, we convert the DocSet into a pyarrow table and use pyiceberg's native arrow support for writing. In Ray mode, we still leverage pyarrow, but we use it to convert the Docset into a Ray Dataset of the properties and then use the Ray Data Iceberg writer directly.

This commit contains an Iceberg Write node that writes properties to an
Iceberg table using the PyIceberg library.

The implementation differs somewhat for local mode and Ray mode. In local
mode, we convert the DocSet into a pyarrow table and use pyiceberg's native
arrow support for writing. In Ray mode, we still leverage pyarrow, but we use
it to convert the Docset into a Ray Dataset of the properties and then use the
Ray Data Iceberg writer directly.
@bsowell bsowell requested review from HenryL27 and Copilot October 14, 2025 23:42
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds an Iceberg writer implementation that enables writing Sycamore documents to Apache Iceberg tables using the PyIceberg library. The implementation provides support for both local execution mode and Ray distributed execution mode.

  • Adds PyArrow type conversion utilities for Sycamore schemas and documents
  • Implements IcebergWriter transform node with dual execution paths (local vs Ray)
  • Includes comprehensive test coverage for type conversions and end-to-end writer functionality

Reviewed Changes

Copilot reviewed 6 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
lib/sycamore/sycamore/utils/pyarrow/types.py Core PyArrow type conversion functions for schemas and documents
lib/sycamore/sycamore/utils/pyarrow/init.py Package exports for PyArrow utilities
lib/sycamore/sycamore/tests/unit/utils/test_pyarrow.py Unit tests for PyArrow type conversion functionality
lib/sycamore/sycamore/tests/unit/connectors/iceberg/test_iceberg_writer.py Integration tests for IcebergWriter
lib/sycamore/sycamore/connectors/iceberg/iceberg_writer.py Main IcebergWriter implementation
lib/sycamore/pyproject.toml Dependency and extras configuration for PyIceberg

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Args:
docs: The list of documents to convert.
schema: The Scyamore or PyArrow Schema defining the structure of the
Copy link

Copilot AI Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'Scyamore' to 'Sycamore'.

Suggested change
schema: The Scyamore or PyArrow Schema defining the structure of the
schema: The Sycamore or PyArrow Schema defining the structure of the

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. Oops.

else:
pa_schema = schema

for field in schema:
Copy link

Copilot AI Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line uses schema instead of pa_schema for iteration. When schema is a SchemaV2 object, it may not be directly iterable like a PyArrow schema.

Suggested change
for field in schema:
for field in pa_schema:

Copilot uses AI. Check for mistakes.

schema = self._pa_schema

def f(batch: "pa.Table") -> "pa.Table":
print(type(batch))
Copy link

Copilot AI Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug print statement should be removed from production code.

Suggested change
print(type(batch))

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

bsowell commented Oct 14, 2025

4 Jobs Failed:

Testing / integ-tests (3.10) failed on "Run Integ tests"
[...]
    res = fn(batch)
    res = fn(batch)
  File "/home/runner/.cache/pypoetry/virtualenvs/sycamore-monorepo-OM1azog3-py3.10/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 425, in _wrapped_udf_map_fn
  File "/home/runner/.cache/pypoetry/virtualenvs/sycamore-monorepo-OM1azog3-py3.10/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 425, in _wrapped_udf_map_fn
    _try_wrap_udf_exception(e)
    _try_wrap_udf_exception(e)
  File "/home/runner/.cache/pypoetry/virtualenvs/sycamore-monorepo-OM1azog3-py3.10/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 441, in _try_wrap_udf_exception
  File "/home/runner/.cache/pypoetry/virtualenvs/sycamore-monorepo-OM1azog3-py3.10/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 441, in _try_wrap_udf_exception
    raise UserCodeException("UDF failed to process a data block.") from e
    raise UserCodeException("UDF failed to process a data block.") from e
ray.exceptions.UserCodeException: UDF failed to process a data block.
ray.exceptions.UserCodeException: UDF failed to process a data block.
ERROR lib/sycamore/sycamore/tests/integration/utils/test_pdf.py::test_basic - opensearchpy.exceptions.NotFoundError: NotFoundError(404, 'index_not_found_exception', 'no such index [sycamore_query_ntsb_integration_tests]', sycamore_query_ntsb_integration_tests, index_or_alias)
ERROR lib/sycamore/sycamore/tests/integration/utils/test_pdf.py::test_basic - opensearchpy.exceptions.NotFoundError: NotFoundError(404, 'index_not_found_exception', 'no such index [sycamore_query_ntsb_integration_tests]', sycamore_query_ntsb_integration_tests, index_or_alias)
= 17 failed, 151 passed, 5 skipped, 578 warnings, 1 error in 5290.59s (1:28:10) =
= 17 failed, 151 passed, 5 skipped, 578 warnings, 1 error in 5290.59s (1:28:10) =
(pid=58701) INFO:root:Spurious log 1: Verifying that log messages are propagated
(pid=58701) INFO:root:Spurious log 1: Verifying that log messages are propagated
Error: Process completed with exit code 1.
Error: Process completed with exit code 1.
Testing / notebook-tests-docprep (3.10) failed on "Run DocPrep Notebook Tests"
[...]
     97     tb = traceback.format_exc()
     98     logger.warning(f"Error writing records to target:\n{tb}")
---> 99     raise ValueError(f"Error writing to target: {e}")
    100 finally:
    101     client.close()

ValueError: Error writing to target: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7ff271193670>: Failed to resolve 'search-aryn-blog-test-kmzf2omtmydwhsnhov6xlj7y5m.us-east-1.es.amazonaws.com' ([Errno -2] Name or service not known)) caused by: NameResolutionError(<urllib3.connection.HTTPSConnection object at 0x7ff271193670>: Failed to resolve 'search-aryn-blog-test-kmzf2omtmydwhsnhov6xlj7y5m.us-east-1.es.amazonaws.com' ([Errno -2] Name or service not known))
------------------------------ Captured log call -------------------------------
INFO     traitlets:client.py:695 Executing notebook with kernel:

* nbmake: Automate reading GitHub Actions logs with our bot: https://github.com/marketplace/treebeard-build

=========================== short test summary info ============================
FAILED docprep/minilm-l6-v2_greedy-section-merger_opensearch.ipynb::notebooks/docprep/minilm-l6-v2_greedy-section-merger_opensearch.ipynb - ValueError: Error writing to target: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7ff271193670>: Failed to resolve 'search-aryn-blog-test-kmzf2omtmydwhsnhov6xlj7y5m.us-east-1.es.amazonaws.com' ([Errno -2] Name or service not known)) caused by: NameResolutionError(<urllib3.connection.HTTPSConnection object at 0x7ff271193670>: Failed to resolve 'search-aryn-blog-test-kmzf2omtmydwhsnhov6xlj7y5m.us-east-1.es.amazonaws.com' ([Errno -2] Name or service not known))
============================== 1 failed in 13.76s ==============================

real	0m14.783s
user	0m16.291s
sys	0m1.011s
Error: Process completed with exit code 1.
Testing / notebook-tests-slow (3.10) failed on "Run Notebook tests"
[...]

----------------------------- Captured stderr call -----------------------------
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
------------------------------ Captured log call -------------------------------
INFO     traitlets:client.py:695 Executing notebook with kernel: python3

* nbmake: Automate reading GitHub Actions logs with our bot: https://github.com/marketplace/treebeard-build

=========================== short test summary info ============================
FAILED ArynPartitionerPython.ipynb::notebooks/ArynPartitionerPython.ipynb - 
============================== 1 failed in 8.28s ===============================

real	0m9.296s
user	0m2.935s
sys	0m0.213s
Error: Process completed with exit code 1.

1 job failed running on non-Blacksmith runners.


Summary: 4 successful workflows, 1 failed workflow

Last updated: 2025-10-15 19:20:51 UTC

Comment on lines +33 to +40
.transform(
IcebergWriter,
catalog_kwargs=catalog_options,
schema=schema,
table_identifier="test_namespace.simple_table",
location=str(tmp_path),
)
.execute()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a docset.write.iceberg?

ns = Catalog.namespace_from(self._table_id)
catalog = self._get_catalog()
catalog.create_namespace_if_not_exists(ns)
return catalog.create_table_if_not_exists(self._table_id, self._pa_schema, location=self._location)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do if the table exists but has a mismatching schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it looks like it just matches on name (tries to create the table and if it throws an AlreadyExists exception just returns whats there). I could see some value in checking if the schemas match, but not sure it's worth it yet.

pinecone-text = { version = "^0.11.0", optional = true }
weaviate-client = { version = "^4.16.10", optional = true }
qdrant-client = { version = "^1.11.2", optional = true }
pyiceberg = { version = "^0.10.0", optional = true}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any idea which extras we get from our existing dependencies? I assume downstream users have to add the extras they care about on their own

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I quite understand what you mean. Users would have to add the extras. I don't think we get pyiceberg for free. Ray does use it for it's iceberg writer, but I believe it's in an extra there as well.

Copy link
Collaborator

@HenryL27 HenryL27 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean pyiceberg has a bunch of extras, e.g. for Glue support. In your unit test you're doing sqlite, which is listed as an extra here https://py.iceberg.apache.org/ - so my assumption is something else (probably ray) depends on sqlite to do stuff which pulls it in. The question is which other iceberg extras are like sqlite in that regard

Mostly I'm just curious so if this is a pain to figure out don't bother

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Makes sense. I confess I didn't look to closely, but I'll do another pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good call. It looks like we are getting sqlalchemy from langchain of all places. I'll add that one explicitly to the iceberg extra. The other extras (like pyarrow) I'm a bit less worried about because they are core dependencies of things like Ray.

Comment on lines +115 to +119
# Recursively build the array for the flattened child data.
child_array = _build_array(flattened_data, arrow_type.value_type)

null_mask = pa.array([not v for v in validity], type=pa.bool_())
return pa.ListArray.from_arrays(offsets, child_array, mask=null_mask)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seeing if I understand correctly:

  • offsets is a list of indices into the flattened array
  • each 'document' gets a corresponding pair of offsets for [start, end).
  • if a document has an empty list the offsets array ends up with pair of matching offsets
  • if a document has a non-list we treat it as an empty list for the offsets but write down that it's actually a null
  • resulting offset array has length = len(documents) + 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! You've got it. It took me quite a few iterations with Gemini to figure this out. The first snippet of code I found called the null_mask the "validity_mask", so I had it backwards in my head for a long time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

root?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载