diff --git a/Cargo.toml b/Cargo.toml index 7f4fbe3..331e8d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "bevy_light_field" description = "rust bevy light field array tooling" -version = "0.2.1" +version = "0.3.0" edition = "2021" authors = ["mosure "] license = "MIT" @@ -26,9 +26,13 @@ exclude = [ default-run = "viewer" +# TODO: add feature for ffmpeg output + + [dependencies] anyhow = "1.0" async-compat = "0.2" +bytes = "1.5.0" futures = "0.3" openh264 = "0.5" retina = "0.4" diff --git a/README.md b/README.md index aaa51c1..e56a22f 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,32 @@ rust bevy light field camera array tooling -## example +## capabilities + +- [X] grid view of light field camera array +- [X] stream to files with recording controls +- [ ] playback nersemble recordings with annotations +- [ ] person segmentation post-process (batch across streams) +- [ ] camera array calibration +- [ ] 3d reconstruction dataset preparation +- [ ] real-time 3d reconstruction viewer + + +## run the viewer + +`cargo run -- --help` + +the viewer opens a window and displays the light field camera array, with post-process options + +### controls + +- `r` to start recording +- `s` to stop recording +- `esc` to exit +- [ ] UI controls + + +## library usage ```rust use bevy::{ @@ -43,7 +68,6 @@ fn main() { RtspStreamPlugin, )) .add_systems(Startup, create_streams) - .add_systems(Startup, setup_camera) .run(); } @@ -99,44 +123,17 @@ fn create_streams( commands.entity(entity).insert(rtsp_stream); }); } - -fn setup_camera( - mut commands: Commands, -) { - commands.spawn(( - Camera2dBundle { - ..default() - }, - )); -} ``` -## run the viewer - -`cargo run -- --help` - -the viewer opens a window and displays the light field camera array, with post-process options - - -## capabilities - -- [X] grid view of light field camera array -- [ ] stream to files with recording controls -- [ ] person segmentation post-process (batch across streams) -- [ ] camera array calibration -- [ ] 3d reconstruction dataset preparation -- [ ] real-time 3d reconstruction viewer - - ## light field camera array view the [onshape model](https://cad.onshape.com/documents/20d4b522e97cda88fb785536/w/9939c2cecd85477ae7e753f6/e/69f97c604cdee8494e4e46bc?renderMode=0&uiState=65ea51d493f7bd0c772084fa) -![Alt text](docs/light_field_camera_onshape_transparent.webp) - - [ ] parts list +![Alt text](docs/light_field_camera_onshape_transparent.webp) + ## setup rtsp streaming server @@ -146,7 +143,7 @@ it is useful to test the light field viewer with emulated camera streams - install https://obsproject.com/ - install rtsp plugin https://github.com/iamscottxu/obs-rtspserver/releases -- Tools > RTSP Server > Start Server +- tools > rtsp server > start server ## compatible bevy versions @@ -158,4 +155,6 @@ it is useful to test the light field viewer with emulated camera streams ## credits - [bevy_video](https://github.com/PortalCloudInc/bevy_video) +- [gaussian_avatars](https://github.com/ShenhanQian/GaussianAvatars) +- [nersemble](https://github.com/tobias-kirschstein/nersemble) - [paddle_seg_matting](https://github.com/PaddlePaddle/PaddleSeg/blob/release/2.9/Matting/docs/quick_start_en.md) diff --git a/src/lib.rs b/src/lib.rs index baf29e0..6e77d68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,2 @@ +pub mod mp4; pub mod stream; diff --git a/src/mp4.rs b/src/mp4.rs new file mode 100644 index 0000000..a76c945 --- /dev/null +++ b/src/mp4.rs @@ -0,0 +1,537 @@ +// https://github.com/scottlamb/retina/blob/main/examples/client/src/mp4.rs + +use anyhow::{anyhow, bail, Error}; +use bytes::{Buf, BufMut, BytesMut}; +use retina::codec::{AudioParameters, ParametersRef, VideoParameters}; + +use std::convert::TryFrom; +use std::io::SeekFrom; +use tokio::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; + + +/// Writes a box length for everything appended in the supplied scope. +macro_rules! write_box { + ($buf:expr, $fourcc:expr, $b:block) => {{ + let _: &mut BytesMut = $buf; // type-check. + let pos_start = ($buf as &BytesMut).len(); + let fourcc: &[u8; 4] = $fourcc; + $buf.extend_from_slice(&[0, 0, 0, 0, fourcc[0], fourcc[1], fourcc[2], fourcc[3]]); + let r = { + $b; + }; + let pos_end = ($buf as &BytesMut).len(); + let len = pos_end.checked_sub(pos_start).unwrap(); + $buf[pos_start..pos_start + 4].copy_from_slice(&u32::try_from(len)?.to_be_bytes()[..]); + r + }}; +} + +/// Writes `.mp4` data to a sink. +/// See module-level documentation for details. +pub struct Mp4Writer { + mdat_start: u32, + mdat_pos: u32, + video_params: Vec, + + /// The most recently used 1-based index within `video_params`. + cur_video_params_sample_description_index: Option, + audio_params: Option>, + allow_loss: bool, + + /// The (1-indexed) video sample (frame) number of each sync sample (random access point). + video_sync_sample_nums: Vec, + + video_trak: TrakTracker, + audio_trak: TrakTracker, + inner: W, +} + +/// A chunk: a group of samples that have consecutive byte positions and same sample description. +struct Chunk { + first_sample_number: u32, // 1-based index + byte_pos: u32, // starting byte of first sample + sample_description_index: u32, +} + +/// Tracks the parts of a `trak` atom which are common between video and audio samples. +#[derive(Default)] +struct TrakTracker { + samples: u32, + next_pos: Option, + chunks: Vec, + sizes: Vec, + + /// The durations of samples in a run-length encoding form: (number of samples, duration). + /// This lags one sample behind calls to `add_sample` because each sample's duration + /// is calculated using the PTS of the following sample. + durations: Vec<(u32, u32)>, + last_pts: Option, + tot_duration: u64, +} + +impl TrakTracker { + fn add_sample( + &mut self, + sample_description_index: u32, + byte_pos: u32, + size: u32, + timestamp: retina::Timestamp, + loss: u16, + allow_loss: bool, + ) -> Result<(), Error> { + if self.samples > 0 && loss > 0 && !allow_loss { + bail!("Lost {} RTP packets mid-stream", loss); + } + self.samples += 1; + if self.next_pos != Some(byte_pos) + || self.chunks.last().map(|c| c.sample_description_index) + != Some(sample_description_index) + { + self.chunks.push(Chunk { + first_sample_number: self.samples, + byte_pos, + sample_description_index, + }); + } + self.sizes.push(size); + self.next_pos = Some(byte_pos + size); + if let Some(last_pts) = self.last_pts.replace(timestamp.timestamp()) { + let duration = timestamp.timestamp().checked_sub(last_pts).unwrap(); + self.tot_duration += u64::try_from(duration).unwrap(); + let duration = u32::try_from(duration)?; + match self.durations.last_mut() { + Some((s, d)) if *d == duration => *s += 1, + _ => self.durations.push((1, duration)), + } + } + Ok(()) + } + + fn finish(&mut self) { + if self.last_pts.is_some() { + self.durations.push((1, 0)); + } + } + + /// Estimates the sum of the variable-sized portions of the data. + fn size_estimate(&self) -> usize { + (self.durations.len() * 8) + // stts + (self.chunks.len() * 12) + // stsc + (self.sizes.len() * 4) + // stsz + (self.chunks.len() * 4) // stco + } + + fn write_common_stbl_parts(&self, buf: &mut BytesMut) -> Result<(), Error> { + // TODO: add an edit list so the video and audio tracks are in sync. + write_box!(buf, b"stts", { + buf.put_u32(0); + buf.put_u32(u32::try_from(self.durations.len())?); + for (samples, duration) in &self.durations { + buf.put_u32(*samples); + buf.put_u32(*duration); + } + }); + write_box!(buf, b"stsc", { + buf.put_u32(0); // version + buf.put_u32(u32::try_from(self.chunks.len())?); + let mut prev_sample_number = 1; + let mut chunk_number = 1; + if !self.chunks.is_empty() { + for c in &self.chunks[1..] { + buf.put_u32(chunk_number); + buf.put_u32(c.first_sample_number - prev_sample_number); + buf.put_u32(c.sample_description_index); + prev_sample_number = c.first_sample_number; + chunk_number += 1; + } + buf.put_u32(chunk_number); + buf.put_u32(self.samples + 1 - prev_sample_number); + buf.put_u32(1); // sample_description_index + } + }); + write_box!(buf, b"stsz", { + buf.put_u32(0); // version + buf.put_u32(0); // sample_size + buf.put_u32(u32::try_from(self.sizes.len())?); + for s in &self.sizes { + buf.put_u32(*s); + } + }); + write_box!(buf, b"stco", { + buf.put_u32(0); // version + buf.put_u32(u32::try_from(self.chunks.len())?); // entry_count + for c in &self.chunks { + buf.put_u32(c.byte_pos); + } + }); + Ok(()) + } +} + +impl Mp4Writer { + pub async fn new( + audio_params: Option>, + allow_loss: bool, + mut inner: W, + ) -> Result { + let mut buf = BytesMut::new(); + write_box!(&mut buf, b"ftyp", { + buf.extend_from_slice(&[ + b'i', b's', b'o', b'm', // major_brand + 0, 0, 0, 0, // minor_version + b'i', b's', b'o', b'm', // compatible_brands[0] + ]); + }); + buf.extend_from_slice(&b"\0\0\0\0mdat"[..]); + let mdat_start = u32::try_from(buf.len())?; + inner.write_all(&buf).await?; + Ok(Mp4Writer { + inner, + video_params: Vec::new(), + cur_video_params_sample_description_index: None, + audio_params, + allow_loss, + video_trak: TrakTracker::default(), + audio_trak: TrakTracker::default(), + video_sync_sample_nums: Vec::new(), + mdat_start, + mdat_pos: mdat_start, + }) + } + + pub async fn finish(mut self) -> Result<(), Error> { + self.video_trak.finish(); + self.audio_trak.finish(); + let mut buf = BytesMut::with_capacity( + 1024 + self.video_trak.size_estimate() + + self.audio_trak.size_estimate() + + 4 * self.video_sync_sample_nums.len(), + ); + write_box!(&mut buf, b"moov", { + write_box!(&mut buf, b"mvhd", { + buf.put_u32(1 << 24); // version + buf.put_u64(0); // creation_time + buf.put_u64(0); // modification_time + buf.put_u32(90000); // timescale + buf.put_u64(self.video_trak.tot_duration); + buf.put_u32(0x00010000); // rate + buf.put_u16(0x0100); // volume + buf.put_u16(0); // reserved + buf.put_u64(0); // reserved + for v in &[0x00010000, 0, 0, 0, 0x00010000, 0, 0, 0, 0x40000000] { + buf.put_u32(*v); // matrix + } + for _ in 0..6 { + buf.put_u32(0); // pre_defined + } + buf.put_u32(2); // next_track_id + }); + if self.video_trak.samples > 0 { + self.write_video_trak(&mut buf)?; + } + if self.audio_trak.samples > 0 { + self.write_audio_trak(&mut buf, self.audio_params.as_ref().unwrap())?; + } + }); + self.inner.write_all(&buf).await?; + self.inner + .seek(SeekFrom::Start(u64::from(self.mdat_start - 8))) + .await?; + self.inner + .write_all(&(self.mdat_pos + 8 - self.mdat_start).to_be_bytes()[..]) + .await?; + Ok(()) + } + + fn write_video_trak(&self, buf: &mut BytesMut) -> Result<(), Error> { + write_box!(buf, b"trak", { + write_box!(buf, b"tkhd", { + buf.put_u32((1 << 24) | 7); // version, flags + buf.put_u64(0); // creation_time + buf.put_u64(0); // modification_time + buf.put_u32(1); // track_id + buf.put_u32(0); // reserved + buf.put_u64(self.video_trak.tot_duration); + buf.put_u64(0); // reserved + buf.put_u16(0); // layer + buf.put_u16(0); // alternate_group + buf.put_u16(0); // volume + buf.put_u16(0); // reserved + for v in &[0x00010000, 0, 0, 0, 0x00010000, 0, 0, 0, 0x40000000] { + buf.put_u32(*v); // matrix + } + let dims = self.video_params.iter().fold((0, 0), |prev_dims, p| { + let dims = p.pixel_dimensions(); + ( + std::cmp::max(prev_dims.0, dims.0), + std::cmp::max(prev_dims.1, dims.1), + ) + }); + let width = u32::from(u16::try_from(dims.0)?) << 16; + let height = u32::from(u16::try_from(dims.1)?) << 16; + buf.put_u32(width); + buf.put_u32(height); + }); + write_box!(buf, b"mdia", { + write_box!(buf, b"mdhd", { + buf.put_u32(1 << 24); // version + buf.put_u64(0); // creation_time + buf.put_u64(0); // modification_time + buf.put_u32(90000); // timebase + buf.put_u64(self.video_trak.tot_duration); + buf.put_u32(0x55c40000); // language=und + pre-defined + }); + write_box!(buf, b"hdlr", { + buf.extend_from_slice(&[ + 0x00, 0x00, 0x00, 0x00, // version + flags + 0x00, 0x00, 0x00, 0x00, // pre_defined + b'v', b'i', b'd', b'e', // handler = vide + 0x00, 0x00, 0x00, 0x00, // reserved[0] + 0x00, 0x00, 0x00, 0x00, // reserved[1] + 0x00, 0x00, 0x00, 0x00, // reserved[2] + 0x00, // name, zero-terminated (empty) + ]); + }); + write_box!(buf, b"minf", { + write_box!(buf, b"vmhd", { + buf.put_u32(1); + buf.put_u64(0); + }); + write_box!(buf, b"dinf", { + write_box!(buf, b"dref", { + buf.put_u32(0); + buf.put_u32(1); // entry_count + write_box!(buf, b"url ", { + buf.put_u32(1); // version, flags=self-contained + }); + }); + }); + write_box!(buf, b"stbl", { + write_box!(buf, b"stsd", { + buf.put_u32(0); // version + buf.put_u32(u32::try_from(self.video_params.len())?); // entry_count + for p in &self.video_params { + self.write_video_sample_entry(buf, p)?; + } + }); + self.video_trak.write_common_stbl_parts(buf)?; + write_box!(buf, b"stss", { + buf.put_u32(0); // version + buf.put_u32(u32::try_from(self.video_sync_sample_nums.len())?); + for n in &self.video_sync_sample_nums { + buf.put_u32(*n); + } + }); + }); + }); + }); + }); + Ok(()) + } + + fn write_audio_trak( + &self, + buf: &mut BytesMut, + parameters: &AudioParameters, + ) -> Result<(), Error> { + write_box!(buf, b"trak", { + write_box!(buf, b"tkhd", { + buf.put_u32((1 << 24) | 7); // version, flags + buf.put_u64(0); // creation_time + buf.put_u64(0); // modification_time + buf.put_u32(2); // track_id + buf.put_u32(0); // reserved + buf.put_u64(self.audio_trak.tot_duration); + buf.put_u64(0); // reserved + buf.put_u16(0); // layer + buf.put_u16(0); // alternate_group + buf.put_u16(0); // volume + buf.put_u16(0); // reserved + for v in &[0x00010000, 0, 0, 0, 0x00010000, 0, 0, 0, 0x40000000] { + buf.put_u32(*v); // matrix + } + buf.put_u32(0); // width + buf.put_u32(0); // height + }); + write_box!(buf, b"mdia", { + write_box!(buf, b"mdhd", { + buf.put_u32(1 << 24); // version + buf.put_u64(0); // creation_time + buf.put_u64(0); // modification_time + buf.put_u32(parameters.clock_rate()); + buf.put_u64(self.audio_trak.tot_duration); + buf.put_u32(0x55c40000); // language=und + pre-defined + }); + write_box!(buf, b"hdlr", { + buf.extend_from_slice(&[ + 0x00, 0x00, 0x00, 0x00, // version + flags + 0x00, 0x00, 0x00, 0x00, // pre_defined + b's', b'o', b'u', b'n', // handler = soun + 0x00, 0x00, 0x00, 0x00, // reserved[0] + 0x00, 0x00, 0x00, 0x00, // reserved[1] + 0x00, 0x00, 0x00, 0x00, // reserved[2] + 0x00, // name, zero-terminated (empty) + ]); + }); + write_box!(buf, b"minf", { + write_box!(buf, b"smhd", { + buf.extend_from_slice(&[ + 0x00, 0x00, 0x00, 0x00, // version + flags + 0x00, 0x00, // balance + 0x00, 0x00, // reserved + ]); + }); + write_box!(buf, b"dinf", { + write_box!(buf, b"dref", { + buf.put_u32(0); + buf.put_u32(1); // entry_count + write_box!(buf, b"url ", { + buf.put_u32(1); // version, flags=self-contained + }); + }); + }); + write_box!(buf, b"stbl", { + write_box!(buf, b"stsd", { + buf.put_u32(0); // version + buf.put_u32(1); // entry_count + buf.extend_from_slice( + parameters + .sample_entry() + .expect("all added streams have sample entries"), + ); + }); + self.audio_trak.write_common_stbl_parts(buf)?; + + // AAC requires two samples (really, each is a set of 960 or 1024 samples) + // to decode accurately. See + // https://developer.apple.com/library/archive/documentation/QuickTime/QTFF/QTFFAppenG/QTFFAppenG.html . + write_box!(buf, b"sgpd", { + // BMFF section 8.9.3: SampleGroupDescriptionBox + buf.put_u32(0); // version + buf.extend_from_slice(b"roll"); // grouping type + buf.put_u32(1); // entry_count + // BMFF section 10.1: AudioRollRecoveryEntry + buf.put_i16(-1); // roll_distance + }); + write_box!(buf, b"sbgp", { + // BMFF section 8.9.2: SampleToGroupBox + buf.put_u32(0); // version + buf.extend_from_slice(b"roll"); // grouping type + buf.put_u32(1); // entry_count + buf.put_u32(self.audio_trak.samples); + buf.put_u32(1); // group_description_index + }); + }); + }); + }); + }); + Ok(()) + } + + fn write_video_sample_entry( + &self, + buf: &mut BytesMut, + parameters: &VideoParameters, + ) -> Result<(), Error> { + // TODO: this should move to client::VideoParameters::sample_entry() or some such. + write_box!(buf, b"avc1", { + buf.put_u32(0); + buf.put_u32(1); // data_reference_index = 1 + buf.extend_from_slice(&[0; 16]); + buf.put_u16(u16::try_from(parameters.pixel_dimensions().0)?); + buf.put_u16(u16::try_from(parameters.pixel_dimensions().1)?); + buf.extend_from_slice(&[ + 0x00, 0x48, 0x00, 0x00, // horizresolution + 0x00, 0x48, 0x00, 0x00, // vertresolution + 0x00, 0x00, 0x00, 0x00, // reserved + 0x00, 0x01, // frame count + 0x00, 0x00, 0x00, 0x00, // compressorname + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x00, 0x00, 0x00, // + 0x00, 0x18, 0xff, 0xff, // depth + pre_defined + ]); + write_box!(buf, b"avcC", { + buf.extend_from_slice(parameters.extra_data()); + }); + }); + Ok(()) + } + + pub async fn video( + &mut self, + stream: &retina::client::Stream, + frame: &retina::codec::VideoFrame, + ) -> Result<(), Error> { + let sample_description_index = if let (Some(i), false) = ( + self.cur_video_params_sample_description_index, + frame.has_new_parameters(), + ) { + // Use the most recent sample description index for most frames, without having to + // scan through self.video_sample_index. + i + } else { + match stream.parameters() { + Some(ParametersRef::Video(params)) => { + let pos = self.video_params.iter().position(|p| p == params); + if let Some(pos) = pos { + u32::try_from(pos + 1)? + } else { + self.video_params.push(params.clone()); + u32::try_from(self.video_params.len())? + } + } + None => { + return Ok(()); + } + _ => unreachable!(), + } + }; + self.cur_video_params_sample_description_index = Some(sample_description_index); + let size = u32::try_from(frame.data().remaining())?; + self.video_trak.add_sample( + sample_description_index, + self.mdat_pos, + size, + frame.timestamp(), + frame.loss(), + self.allow_loss, + )?; + self.mdat_pos = self + .mdat_pos + .checked_add(size) + .ok_or_else(|| anyhow!("mdat_pos overflow"))?; + if frame.is_random_access_point() { + self.video_sync_sample_nums.push(self.video_trak.samples); + } + self.inner.write_all(frame.data()).await?; + Ok(()) + } + + pub async fn audio(&mut self, frame: retina::codec::AudioFrame) -> Result<(), Error> { + println!( + "{}: {}-byte audio frame", + frame.timestamp(), + frame.data().remaining() + ); + let size = u32::try_from(frame.data().remaining())?; + self.audio_trak.add_sample( + /* sample_description_index */ 1, + self.mdat_pos, + size, + frame.timestamp(), + frame.loss(), + self.allow_loss, + )?; + self.mdat_pos = self + .mdat_pos + .checked_add(size) + .ok_or_else(|| anyhow!("mdat_pos overflow"))?; + self.inner.write_all(frame.data()).await?; + Ok(()) + } +} diff --git a/src/stream.rs b/src/stream.rs index 35fcafe..bc20f64 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -24,12 +24,18 @@ use retina::{ }, codec::VideoFrame, }; -use tokio::runtime::{ - Handle, - Runtime, +use tokio::{ + fs::File, + runtime::{ + Handle, + Runtime, + }, + sync::mpsc, }; use url::Url; +use crate::mp4::Mp4Writer; + pub struct RtspStreamPlugin; impl Plugin for RtspStreamPlugin { @@ -97,12 +103,20 @@ pub fn apply_decode( #[derive(Debug, Clone, Copy, PartialEq, Reflect)] pub struct StreamId(pub usize); -#[derive(Debug, Component, Clone)] +#[derive(Debug)] +pub enum RecordingCommand { + StartRecording(File), + StopRecording, +} + + +#[derive(Component, Clone)] pub struct RtspStreamDescriptor { pub uri: String, pub id: StreamId, pub image: bevy::asset::Handle, latest_frame: Arc>>, + recording_sender: Arc>>>, } impl RtspStreamDescriptor { @@ -116,6 +130,7 @@ impl RtspStreamDescriptor { id, image, latest_frame: Arc::new(Mutex::new(None)), + recording_sender: Arc::new(Mutex::new(None)), } } @@ -128,6 +143,7 @@ impl RtspStreamDescriptor { } } + #[derive(Component)] struct RtspStreamCreated; @@ -182,6 +198,33 @@ impl RtspStreamManager { } }); } + + pub fn start_recording(&self, output_directory: &str, prefix: &str) { + let stream_descriptors = self.stream_descriptors.lock().unwrap(); + for descriptor in stream_descriptors.iter() { + let filepath = format!("{}/{}_{}.mp4", output_directory, prefix, descriptor.id.0); + + let send_channel = descriptor.recording_sender.lock().unwrap(); + let sender_clone = send_channel.as_ref().unwrap().clone(); + + self.handle.block_on(async move { + let file = File::create(&filepath).await.unwrap(); + sender_clone.send(RecordingCommand::StartRecording(file)).await.unwrap(); + }); + } + } + + pub fn stop_recording(&self) { + let stream_descriptors = self.stream_descriptors.lock().unwrap(); + for descriptor in stream_descriptors.iter() { + let send_channel = descriptor.recording_sender.lock().unwrap(); + let sender_clone = send_channel.as_ref().unwrap().clone(); + + self.handle.block_on(async move { + sender_clone.send(RecordingCommand::StopRecording).await.unwrap(); + }); + } + } } @@ -190,6 +233,7 @@ pub struct RtspStream { pub descriptor: RtspStreamDescriptor, decoder: Option, demuxed: Option, + writer: Option>, } impl RtspStream { @@ -201,16 +245,57 @@ impl RtspStream { descriptor, decoder, demuxed: None, + writer: None, } } async fn run(&mut self) -> Result<(), Box>{ - let session = create_session(&self.descriptor.uri).await?; + let (session, stream_idx) = create_session(&self.descriptor.uri).await?; self.demuxed = session.demuxed()?.into(); + let (sender, mut receiver) = mpsc::channel(1); + + { + let mut send_channel = self.descriptor.recording_sender.lock().unwrap(); + *send_channel = sender.into(); + } + loop { let frame = self.capture_frame().await?; + if let Ok(command) = receiver.try_recv() { + match command { + RecordingCommand::StartRecording(file) => { + if let Some(writer) = self.writer.take() { + writer.finish().await.ok(); + } + + self.writer = Mp4Writer::new( + None, + true, + file, + ).await.ok(); + + println!("writing stream {}", self.descriptor.id.0); + }, + RecordingCommand::StopRecording => { + if let Some(writer) = self.writer.take() { + println!("stopped recording stream {}", self.descriptor.id.0); + writer.finish().await.ok(); + } + }, + } + } + + { + if let Some(writer) = self.writer.as_mut() { + writer.video( + &self.demuxed.as_mut().unwrap().streams()[stream_idx], + &frame, + ).await?; + } + } + let mut data = frame.into_data(); convert_h264(&mut data)?; @@ -266,7 +351,10 @@ impl RtspStream { } -async fn create_session(url: &str) -> Result, Box> { +async fn create_session(url: &str) -> Result< + (Session, usize), + Box +> { let parsed_url = Url::parse(url)?; let username = parsed_url.username(); @@ -312,7 +400,7 @@ async fn create_session(url: &str) -> Result, Box>, + stream_manager: Res +) { + if keys.just_pressed(KeyCode::KeyR) { + let output_directory = "capture"; + std::fs::create_dir_all(output_directory).unwrap(); + + let base_prefix = "bevy_light_field_"; + + let prefix = format!( + "{}{:03}", + base_prefix, + get_next_session_id(output_directory, base_prefix) + ); + + stream_manager.start_recording( + output_directory, + &prefix, + ); + } +} + +fn press_s_stop_recording( + keys: Res>, + stream_manager: Res +) { + if keys.just_pressed(KeyCode::KeyS) { + stream_manager.stop_recording(); + } +} + fn calculate_grid_dimensions(window_width: f32, window_height: f32, num_streams: usize) -> (usize, usize, f32, f32) { let window_aspect_ratio = window_width / window_height; @@ -194,3 +232,26 @@ fn calculate_grid_dimensions(window_width: f32, window_height: f32, num_streams: (best_layout.0, best_layout.1, best_sprite_size.0, best_sprite_size.1) } + + +fn get_next_session_id(output_directory: &str, base_prefix: &str) -> i32 { + let mut highest_count = -1i32; + if let Ok(entries) = std::fs::read_dir(output_directory) { + for entry in entries.filter_map(|e| e.ok()) { + let path = entry.path(); + if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) { + if stem.starts_with(base_prefix) { + let suffix = stem.trim_start_matches(base_prefix); + let numeric_part = suffix.split('_').next().unwrap_or(""); + if let Ok(num) = numeric_part.parse::() { + highest_count = highest_count.max(num); + } else { + println!("failed to parse session ID '{}' for file '{}'", numeric_part, stem); + } + } + } + } + } + + highest_count + 1 +}