+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions lib/sycamore/sycamore/connectors/file/file_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import uuid
import logging

from pyarrow.filesystem import FileSystem

from pyarrow.fs import FileSystem, LocalFileSystem, FileSelector
from sycamore.data import Document
from sycamore.plan_nodes import Scan
from sycamore.utils.time_trace import timetrace
Expand Down Expand Up @@ -166,6 +165,46 @@ def execute(self, **kwargs) -> "Dataset":

return files.map(self._to_document, **self.resource_args)

def local_source(self, **kwargs) -> list[Document]:

if isinstance(self._paths, str):
paths = [self._paths]
if not self._filesystem:
self._filesystem = LocalFileSystem()
documents = []

def process_file(info):
if not info.is_file:
return
if self._filter_paths_by_extension and not info.path.endswith(self.format()):
return

with self._filesystem.open_input_file(info.path) as file:
binary_data = file.read()

document = Document()
document.doc_id = str(uuid.uuid1())
document.type = self._binary_format
document.binary_representation = binary_data
document.properties["path"] = info.path
if "filetype" not in document.properties and self._binary_format is not None:
document.properties["filetype"] = self._file_mime_type()
if self._is_s3_scheme():
document.properties["path"] = "s3://" + info.path
if self._metadata_provider:
document.properties.update(self._metadata_provider.get_metadata(info.path))

documents.append(document)

for path in paths:
path_info = self._filesystem.get_file_info(path)
if path_info.is_file:
process_file(path_info)
else:
for info in self._filesystem.get_file_info(FileSelector(path, recursive=True)):
process_file(info)
return documents

def format(self):
return self._binary_format

Expand Down
27 changes: 15 additions & 12 deletions lib/sycamore/sycamore/transforms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np

from sycamore.data import Document, MetadataDocument
from sycamore.utils.lineage_utils import update_lineage
from sycamore.data.document import split_data_metadata
from sycamore.plan_nodes import Node, UnaryNode
from sycamore.utils.ray_utils import check_serializable
Expand Down Expand Up @@ -161,7 +162,7 @@ def local_execute(self, all_docs: list[Document]) -> list[Document]:
outputs = self._local_process(docs)
to_docs = [d for d in outputs if not isinstance(d, MetadataDocument)]
if self._enable_auto_metadata and (len(docs) > 0 or len(to_docs) > 0):
outputs.extend(BaseMapTransform._update_lineage(docs, to_docs))
outputs.extend(update_lineage(docs, to_docs))
outputs.extend(metadata)
return outputs

Expand Down Expand Up @@ -261,24 +262,16 @@ def _process_ray(

to_docs = [d for d in outputs if not isinstance(d, MetadataDocument)]
if enable_auto_metadata and (len(docs) > 0 or len(to_docs) > 0):
outputs.extend(BaseMapTransform._update_lineage(docs, to_docs))
outputs.extend(update_lineage(docs, to_docs))
outputs.extend(metadata)
return {"doc": [d.serialize() for d in outputs]}

@classmethod
def _update_lineage(cls, from_docs, to_docs):
from_ids = [d.lineage_id for d in from_docs]
for d in to_docs:
d.update_lineage_id()
to_ids = [d.lineage_id for d in to_docs]

return [MetadataDocument(lineage_links={"from_ids": from_ids, "to_ids": to_ids})]


class CompositeTransform(UnaryNode):
def __init__(self, child: Node, base_args: list[dict], **resource_args):
def __init__(self, child: Node, base_args: list[dict], enable_auto_metadata=True, **resource_args):
super().__init__(child, **resource_args)
self.nodes = CompositeTransform.combine(child, base_args, **resource_args)
self._enable_auto_metadata = enable_auto_metadata

@staticmethod
def combine(last: Node, base_args: list[dict], **resource_args) -> list[BaseMapTransform]:
Expand All @@ -297,5 +290,15 @@ def _local_process(self, in_docs: list[Document]) -> list[Document]:

return docs

def local_execute(self, all_docs: list[Document]) -> list[Document]:
docs = [d for d in all_docs if not isinstance(d, MetadataDocument)]
metadata = [d for d in all_docs if isinstance(d, MetadataDocument)]
outputs = self._local_process(docs)
to_docs = [d for d in outputs if not isinstance(d, MetadataDocument)]
if self._enable_auto_metadata and (len(docs) > 0 or len(to_docs) > 0):
outputs.extend(update_lineage(docs, to_docs))
outputs.extend(metadata)
return outputs

def execute(self, **kwargs) -> "Dataset":
return self.nodes[-1].execute()
10 changes: 10 additions & 0 deletions lib/sycamore/sycamore/utils/lineage_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from sycamore.data import Document, MetadataDocument


def update_lineage(from_docs: list[Document], to_docs: list[Document]) -> list[MetadataDocument]:
from_ids = [d.lineage_id for d in from_docs]
for d in to_docs:
d.update_lineage_id()
to_ids = [d.lineage_id for d in to_docs]

return [MetadataDocument(lineage_links={"from_ids": from_ids, "to_ids": to_ids})]
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载