这是indexloc提供的服务,不要输入任何密码
Skip to content

Ray Video Reader Enhancement #848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: ray-api
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ray-curator/ray_curator/examples/video/video_read_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from ray_curator.backends.xenna import XennaExecutor
from ray_curator.pipeline import Pipeline
from ray_curator.stages.video.io.video_reader_download import VideoReaderDownloadStage
from ray_curator.stages.video.io.video_loading import VideoLoadingStage


def create_video_reading_pipeline(args: argparse.Namespace) -> Pipeline:
Expand All @@ -12,7 +12,7 @@ def create_video_reading_pipeline(args: argparse.Namespace) -> Pipeline:

# Add stages
# Add the composite stage that combines reading and downloading
pipeline.add_stage(VideoReaderDownloadStage(
pipeline.add_stage(VideoLoadingStage(
input_video_path=args.video_folder,
video_limit=args.video_limit,
verbose=args.verbose
Expand Down
15 changes: 13 additions & 2 deletions ray-curator/ray_curator/stages/io/reader/file_partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class FilePartitioningStage(ProcessingStage[_EmptyTask, FileGroupTask]):
blocksize: int | str | None = None
file_extensions: list[str] | None = None
storage_options: dict[str, Any] | None = None
limit: int | None = None
_name: str = "file_partitioning"

def __post_init__(self):
Expand Down Expand Up @@ -79,6 +80,9 @@ def process(self, _: _EmptyTask) -> list[FileGroupTask]:
dataset_name = self._get_dataset_name(files)

for i, file_group in enumerate(partitions):
if self.limit is not None and len(tasks) >= self.limit:
logger.info(f"Reached limit of {self.limit} file groups")
break
file_task = FileGroupTask(
task_id=f"file_group_{i}",
dataset_name=dataset_name,
Expand Down Expand Up @@ -155,9 +159,16 @@ def _parse_size(self, size_str: str) -> int:
"""Parse size string like '128MB' to bytes."""
size_str = size_str.upper().strip()

units = {"B": 1, "KB": 1024, "MB": 1024 * 1024, "GB": 1024 * 1024 * 1024, "TB": 1024 * 1024 * 1024 * 1024}
# Check units in order from longest to shortest to avoid partial matches
units = [
("TB", 1024 * 1024 * 1024 * 1024),
("GB", 1024 * 1024 * 1024),
("MB", 1024 * 1024),
("KB", 1024),
("B", 1),
]

for unit, multiplier in units.items():
for unit, multiplier in units:
if size_str.endswith(unit):
number = float(size_str[: -len(unit)])
return int(number * multiplier)
Expand Down
145 changes: 0 additions & 145 deletions ray-curator/ray_curator/stages/video/io/video_download.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from dataclasses import dataclass

from ray_curator.stages.base import CompositeStage, ProcessingStage
from ray_curator.stages.video.io.video_download import VideoDownloadStage
from ray_curator.stages.io.reader.file_partitioning import FilePartitioningStage
from ray_curator.stages.video.io.video_reader import VideoReaderStage
from ray_curator.tasks import VideoTask, _EmptyTask
from ray_curator.tasks import _EmptyTask
from ray_curator.tasks.video import VideoTask


@dataclass
class VideoReaderDownloadStage(CompositeStage[_EmptyTask, VideoTask]):
class VideoLoadingStage(CompositeStage[_EmptyTask, VideoTask]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what convention is right, and if we want to change it in future, but take a look here https://github.com/NVIDIA-NeMo/Curator/blob/ray-api/ray-curator/ray_curator/stages/io/reader/jsonl.py

We have a stage called JsonlReaderStage(ProcessingStage[FileGroupTask, DocumentBatch]) and then the composite stage is called JsonlReader(CompositeStage[_EmptyTask, DocumentBatch]). Do we want to match that, and call it VideoReaderStage and VideoReader respectively?

"""Composite stage that reads video files from storage and downloads/processes them.

This stage combines VideoReaderStage and VideoDownloadStage into a single
This stage combines FilePartitioningStage and VideoReaderStage into a single
high-level operation for reading video files from a directory and processing
them with metadata extraction.

Expand All @@ -29,20 +30,22 @@ def __post_init__(self):

@property
def name(self) -> str:
return "video_reader_download"
return "video_loading"

def decompose(self) -> list[ProcessingStage]:
"""Decompose into constituent execution stages.

Returns:
List of processing stages: [VideoReaderStage, VideoDownloadStage]
List of processing stages: [FilePartitioningStage, VideoReaderStage]
"""
reader_stage = VideoReaderStage(
input_video_path=self.input_video_path,
video_limit=self.video_limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit should we also have video_limit be an int | None similar to how you have it in FilePartitioningStage

reader_stage = FilePartitioningStage(
file_paths=self.input_video_path,
files_per_partition=1,
file_extensions=[".mp4", ".mov", ".avi", ".mkv", ".webm"],
limit=self.video_limit,
)

download_stage = VideoDownloadStage(
download_stage = VideoReaderStage(
verbose=self.verbose
)

Expand Down
Loading