-
Notifications
You must be signed in to change notification settings - Fork 154
Add video splitting pipeline with fixed stride extraction and transcoding Stage #783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: ray-api
Are you sure you want to change the base?
Conversation
parser.add_argument("--verbose", action="store_true", default=False) | ||
parser.add_argument("--output-clip-path", type=str, default="/mnt/mint/output") | ||
parser.add_argument( | ||
"--no-upload-clips", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this argument used anywhere ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is used when we have S3 client support, I just have it here in advance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i would say as general practice lets add things when the time comes, otherwise the diff becomes hard to reason about today (and in future). SImilarly in the Video
dataclass you have more fields than what the "currently" merged stage need. This makes it hard to prune out what's needed where.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of minor comments but I think the major blockers are
- Copy stats
- Entire_gpu should not be present
"""Resource requirements for this stage.""" | ||
if self.encoder == "h264_nvenc" or self.use_hwaccel: | ||
# TODO: support partial GPU usage | ||
return Resources(entire_gpu=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Can you please add your name to the TODO so that we know it's our TODO and not of cosmos curate.
- I think it should not be entire_gpu instead Resouces(gpus=1). The diff is of nvencs and nvdecs. entire GPU gives you nvencs and nvdecs as well whereas gpus=1 does not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also just wondering if there are any blockers for making this partial GPU ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modify this to use the partial GPU now
…ding stages - Introduced `video_split_clip_example.py` to demonstrate video splitting functionality. - Added `ClipTranscodingStage` and `FixedStrideExtractorStage` for processing video clips. - Implemented command-line arguments for configuring video processing parameters. - Created utility functions for grouping iterables in `grouping.py`. - Added unit tests for the new stages in `test_clip_transcoding_stage.py` and `test_fixed_stride_extractor_stage.py`. Signed-off-by: Ao Tang <aot@nvidia.com>
…age integration Signed-off-by: Ao Tang <aot@nvidia.com>
Signed-off-by: Ao Tang <aot@nvidia.com>
…ntegrate new functionalities - Replaced separate VideoReaderStage and VideoDownloadStage with a composite VideoReaderDownloadStage, streamlining the video reading and downloading process. - Updated ClipTranscodingStage to improve GPU resource allocation and added detailed arguments for better configurability. - Adjusted tests to reflect changes in resource management, ensuring accurate assertions on GPU usage. These changes improve the clarity and efficiency of video processing within the ray-curator framework. Signed-off-by: Ao Tang <aot@nvidia.com>
- Introduced MockGpuInfo and MockGpuResources classes to simulate GPU information and resources for testing. - Updated test_resources_gpu_encoder and test_resources_hwaccel_enabled methods to utilize mocks, ensuring accurate resource assertions without dependency on actual GPU hardware. - Enhanced test_different_encoder_configurations to validate resource requirements for various encoder configurations, including GPU settings. These changes improve the robustness of the ClipTranscodingStage tests by isolating them from hardware dependencies, facilitating easier testing and validation. Signed-off-by: [Your Name] <your.email@example.com> Signed-off-by: Ao Tang <aot@nvidia.com>
Signed-off-by: Ao Tang <aot@nvidia.com>
29a106c
to
012a2e1
Compare
…-video-clip-extraction
/ok to test c6a8a1b |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. But I think verbose is still pending.
T = typing.TypeVar("T") | ||
|
||
|
||
def split_by_chunk_size( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add tests for these?
@@ -0,0 +1,267 @@ | |||
import argparse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't want to commit these? let's check with arham and team what is the final thing? one e.g. per module is gonna be some serious bloat.. we can have an integration test if we want to "test it"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you mean this file, I actually directly use this file to integrate the rest of the modules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean you can keep it locally but need not push.
…tage tests Signed-off-by: Ao Tang <aot@nvidia.com>
/ok to test 9f39885 |
Signed-off-by: Ao Tang <aot@nvidia.com>
/ok to test 8472134 |
- Added `ray_stage_spec` method to `ClipTranscodingStage`, `VideoDownloadStage`, and `VideoReaderStage` to define stage characteristics for Ray integration. - Updated input and output methods in `ClipTranscodingStage` to include additional input parameters. - Modified `SplitPipeTask` to return properties from `data` instead of `video`, ensuring consistency in task data handling. - Added unit tests to verify the correctness of the new `ray_stage_spec` implementations. These changes improve the integration of video processing stages with Ray's architecture and enhance test coverage for the new functionalities. Signed-off-by: Ao Tang <aot@nvidia.com>
encoder_threads: Number of threads per encoder. | ||
encode_batch_size: Number of clips to encode in parallel. | ||
nb_streams_per_gpu: Number of streams per GPU. | ||
use_hwaccel: Whether to use hardware acceleration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like even when use_hwaccel
is False then we use resources(gpus=1)
(at line 76). Can you elaborate what happens there?
def resources(self) -> Resources: | ||
"""Resource requirements for this stage.""" | ||
if self.encoder == "h264_nvenc" or self.use_hwaccel: | ||
if self.nb_streams_per_gpu > 0: | ||
# Assume that we have same type of GPUs | ||
gpu_info = _get_local_gpu_info()[0] | ||
nvencs = _make_gpu_resources_from_gpu_name(gpu_info.name).num_nvencs | ||
gpu_memory_gb = _get_gpu_memory_gb() | ||
return Resources(nvencs=nvencs // self.nb_streams_per_gpu, gpu_memory_gb=gpu_memory_gb // self.nb_streams_per_gpu) | ||
else: | ||
return Resources(gpus=1) | ||
|
||
return Resources(cpus=self.num_cpus_per_worker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not override properties but in post_init override the self._resources
. If you override properties we'll end up with a weird case if someone does ClipTranscodingStage.with(resources=???)
if video.source_bytes is None: | ||
msg = "Video source bytes are not available" | ||
raise ValueError(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this fro IDE to be happy? Because in theory the validate(..) of the stage should check against the inputs source_bytes, right? If not then we've gone somewhere wrong in our validate implementation
output_tasks = [] | ||
clip_durations = [clip.duration for clip in video.clips] | ||
if len(clip_durations) > 0: | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inside verbose?
def ray_stage_spec(self) -> dict[str, Any]: | ||
"""Ray stage specification for this stage.""" | ||
return { | ||
RayStageSpecKeys.IS_ACTOR_STAGE: True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fanout makes sense since we go from X -> list[X]
but do we need this to be Actor? For general context let's use Actors when we want to maintain state or our setup(..)
loads a model or something i.e. our init time is expensive.. If it's a simpler map style operation then we don't need it to be an Actor and you can remove that value and RayData executor will autodecide if it should be one.
FWIW RayData won't work with this stage because it has nvencs.
def test_drop_incomplete_chunk_false_explicit(self): | ||
"""Test keeping incomplete chunks when drop_incomplete_chunk=False.""" | ||
data = [1, 2, 3, 4, 5] | ||
chunks = list(split_by_chunk_size(data, 3, drop_incomplete_chunk=False)) | ||
expected = [[1, 2, 3], [4, 5]] | ||
assert chunks == expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be combined with line 18 test..
def test_basic_functionality(self): | ||
"""Test basic splitting into n chunks.""" | ||
data = [1, 2, 3, 4, 5, 6, 7, 8, 9] | ||
chunks = list(split_into_n_chunks(data, 3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this differ from functionality from list(split_by_chunk_size(data, 3))
)
Note this requires #775 to be merged first (currently the base is set to
aot/;ray-video-reader
instead ofray-api
)video_split_clip_example.py
to demonstrate video splitting functionality.ClipTranscodingStage
andFixedStrideExtractorStage
for processing video clips.grouping.py
.test_clip_transcoding_stage.py
andtest_fixed_stride_extractor_stage.py
.Description
Usage
Checklist