From 048bf401f3ce357f43d94467cf1c0c02e6202be8 Mon Sep 17 00:00:00 2001 From: mosure Date: Wed, 13 Mar 2024 01:23:43 -0500 Subject: [PATCH] feat: recording and post-process automation --- Cargo.toml | 8 ++ README.md | 4 +- assets/streams.json | 31 +++--- src/lib.rs | 2 + src/person_detect.rs | 193 +++++++++++++++++++++++++++++++++++ src/pipeline.rs | 233 +++++++++++++++++++++++++++++++++++++++++++ src/stream.rs | 96 +++++++++++------- tools/viewer.rs | 106 ++++++++++++++------ 8 files changed, 588 insertions(+), 85 deletions(-) create mode 100644 src/person_detect.rs create mode 100644 src/pipeline.rs diff --git a/Cargo.toml b/Cargo.toml index 542e828..844ccbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,9 +28,11 @@ default-run = "viewer" [features] default = [ "person_matting", + "pipeline", ] person_matting = ["bevy_ort", "ort", "ndarray"] +pipeline = ["image", "rayon"] [dependencies] @@ -41,8 +43,10 @@ bevy_ort = { version = "0.6", optional = true } bytes = "1.5" clap = { version = "4.4", features = ["derive"] } futures = "0.3" +image = { version = "0.24", optional = true } ndarray = { version = "0.15", optional = true } openh264 = "0.5" +rayon = { version = "1.8", optional = true } serde = "1.0" serde_json = "1.0" serde_qs = "0.12" @@ -76,6 +80,10 @@ features = [ ] +[dev-dependencies] +approx = "0.5" + + [profile.dev.package."*"] opt-level = 3 diff --git a/README.md b/README.md index cd8d4fd..ffd35c1 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ use bevy::{ }; use bevy_light_field::stream::{ - RtspStreamDescriptor, + RtspStreamHandle, RtspStreamPlugin, StreamId, }; @@ -118,7 +118,7 @@ fn create_streams( ..default() }); - let rtsp_stream = RtspStreamDescriptor::new( + let rtsp_stream = RtspStreamHandle::new( url.to_string(), StreamId(index), image, diff --git a/assets/streams.json b/assets/streams.json index f159014..aa57bcd 100644 --- a/assets/streams.json +++ b/assets/streams.json @@ -1,19 +1,20 @@ [ - "rtsp://192.168.1.22/stream/main", + { "uri": "rtsp://192.168.1.21/stream/main", "transport": "Udp" }, + { "uri": "rtsp://192.168.1.22/stream/main", "transport": "Udp" }, - "rtsp://192.168.1.23/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.24/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.25/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.26/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.27/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.28/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.29/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.30/user=admin&password=admin123&channel=1&stream=0.sdp?", + { "uri": "rtsp://192.168.1.23/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.24/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.25/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.26/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.27/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.28/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.29/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.30/user=admin&password=admin123&channel=1&stream=0.sdp?" }, - "rtsp://192.168.1.31/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.32/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.33/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.34/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.35/user=admin&password=admin123&channel=1&stream=0.sdp?", - "rtsp://192.168.1.36/user=admin&password=admin123&channel=1&stream=0.sdp?" + { "uri": "rtsp://192.168.1.31/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.32/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.33/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.34/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.35/user=admin&password=admin123&channel=1&stream=0.sdp?" }, + { "uri": "rtsp://192.168.1.36/user=admin&password=admin123&channel=1&stream=0.sdp?" } ] diff --git a/src/lib.rs b/src/lib.rs index aeec5a2..ff783f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ pub mod matting; pub mod materials; pub mod mp4; +pub mod person_detect; +pub mod pipeline; pub mod stream; diff --git a/src/person_detect.rs b/src/person_detect.rs new file mode 100644 index 0000000..4a06dc2 --- /dev/null +++ b/src/person_detect.rs @@ -0,0 +1,193 @@ +use std::cmp::{max, min}; + +use bevy::prelude::*; +use image::DynamicImage; +use rayon::prelude::*; + +use crate::{ + matting::MattedStream, + stream::StreamId, +}; + + +pub struct PersonDetectPlugin; + +impl Plugin for PersonDetectPlugin { + fn build(&self, app: &mut App) { + app.add_systems(Update, detect_person); + } + +} + + +#[derive(Component)] +pub struct DetectPersons; + + +#[derive(Debug, Clone, Reflect, PartialEq)] +pub struct BoundingBox { + pub x: i32, + pub y: i32, + pub width: i32, + pub height: i32, +} + +#[derive(Event, Debug, Reflect, Clone)] +pub struct PersonDetectedEvent { + pub stream_id: StreamId, + pub bounding_box: BoundingBox, + pub mask_sum: f32, +} + + +fn detect_person( + mut ev_asset: EventReader>, + mut ev_person_detected: EventWriter, + person_detect_streams: Query<( + &MattedStream, + &DetectPersons, + )>, + images: Res>, +) { + for ev in ev_asset.read() { + match ev { + AssetEvent::Modified { id } => { + for (matted_stream, _) in person_detect_streams.iter() { + if &matted_stream.output.id() == id { + let image = images.get(&matted_stream.output).unwrap().clone().try_into_dynamic().unwrap(); + + let bounding_box = masked_bounding_box(&image); + let sum = sum_masked_pixels(&image); + + println!("bounding box: {:?}, sum: {}", bounding_box, sum); + + // TODO: add thresholds for detection + let person_detected = false; + if person_detected { + ev_person_detected.send(PersonDetectedEvent { + stream_id: matted_stream.stream_id, + bounding_box: bounding_box.unwrap(), + mask_sum: sum, + }); + } + } + } + } + _ => {} + } + } +} + + + +pub fn masked_bounding_box(image: &DynamicImage) -> Option { + let img = image.as_luma8().unwrap(); + + let bounding_boxes = img.enumerate_pixels() + .par_bridge() + .filter_map(|(x, y, pixel)| { + if pixel[0] > 128 { + Some((x as i32, y as i32, x as i32, y as i32)) + } else { + None + } + }) + .reduce_with(|( + min_x1, + min_y1, + max_x1, + max_y1, + ), ( + min_x2, + min_y2, + max_x2, + max_y2, + )| { + ( + min(min_x1, min_x2), + min(min_y1, min_y2), + max(max_x1, max_x2), + max(max_y1, max_y2), + ) + }); + + bounding_boxes.map(|( + min_x, + min_y, + max_x, + max_y + )| { + BoundingBox { + x: min_x, + y: min_y, + width: max_x - min_x + 1, + height: max_y - min_y + 1, + } + }) +} + + +pub fn sum_masked_pixels(image: &DynamicImage) -> f32 { + let img = image.as_luma8().unwrap(); + let pixels = img.pixels(); + + let count = pixels.par_bridge() + .map(|pixel| { + pixel.0[0] as f32 / 255.0 + }) + .sum(); + + count +} + + + +#[cfg(test)] +mod tests { + use super::*; + use image::{ImageBuffer, Luma}; + use approx::assert_relative_eq; + + + #[test] + fn test_masked_bounding_box() { + let width = 10; + let height = 10; + let mut img: ImageBuffer, Vec> = ImageBuffer::new(width, height); + + for x in 2..=5 { + for y in 2..=5 { + img.put_pixel(x, y, Luma([200])); + } + } + + let dynamic_img = DynamicImage::ImageLuma8(img); + let result = masked_bounding_box(&dynamic_img).expect("expected a bounding box"); + + let expected = BoundingBox { + x:2, + y: 2, + width: 4, + height: 4, + }; + assert_eq!(result, expected, "the computed bounding box did not match the expected values."); + } + + + #[test] + fn test_sum_masked_pixels() { + let width = 4; + let height = 4; + let mut img: ImageBuffer, Vec> = ImageBuffer::new(width, height); + + img.put_pixel(0, 0, Luma([255])); + img.put_pixel(1, 0, Luma([127])); + img.put_pixel(2, 0, Luma([63])); + + let dynamic_img = DynamicImage::ImageLuma8(img); + let result = sum_masked_pixels(&dynamic_img); + + let expected = (255.0 + 127.0 + 63.0) / 255.0; + assert_relative_eq!(result, expected); + } +} diff --git a/src/pipeline.rs b/src/pipeline.rs new file mode 100644 index 0000000..25607b2 --- /dev/null +++ b/src/pipeline.rs @@ -0,0 +1,233 @@ +use bevy::prelude::*; +use image::DynamicImage; +use rayon::prelude::*; + + +pub struct PipelinePlugin; +impl Plugin for PipelinePlugin { + fn build(&self, app: &mut App) { + app.add_systems(Update, generate_annotations); + } +} + + +#[derive(Component, Reflect)] +pub struct PipelineConfig { + pub raw_frames: bool, + pub subject_refinement: bool, // https://github.com/onnx/models/tree/main/validated/vision/body_analysis/ultraface + pub repair_frames: bool, // https://huggingface.co/docs/diffusers/en/optimization/onnx & https://github.com/bnm6900030/swintormer + pub upsample_frames: bool, // https://huggingface.co/ssube/stable-diffusion-x4-upscaler-onnx + pub mask_frames: bool, // https://github.com/ZHKKKe/MODNet + pub light_field_cameras: bool, // https://github.com/jasonyzhang/RayDiffusion + pub depth_maps: bool, // https://github.com/fabio-sim/Depth-Anything-ONNX + pub gaussian_cloud: bool, +} + +impl Default for PipelineConfig { + fn default() -> Self { + Self { + raw_frames: true, + subject_refinement: false, + repair_frames: false, + upsample_frames: false, + mask_frames: false, + light_field_cameras: false, + depth_maps: false, + gaussian_cloud: false, + } + } +} + + +#[derive(Bundle, Default, Reflect)] +pub struct StreamSessionBundle { + pub config: PipelineConfig, + pub raw_streams: RawStreams, + pub session: Session, +} + +// TODO: use an entity saver to write Session and it's components (e.g. `0/session.ron`) + + +#[derive(Component, Default, Reflect)] +pub struct Session { + pub id: usize, + pub directory: String, +} + +impl Session { + pub fn new(directory: String) -> Self { + let id = get_next_session_id(&directory); + let directory = format!("{}/{}", directory, id); + std::fs::create_dir_all(&directory).unwrap(); + + Self { id, directory } + } +} + + +pub trait PipelineNode { + fn new(session: &Session) -> Self; + fn exists(session: &Session) -> bool; +} + + +#[derive(Component, Default, Reflect)] +pub struct RawStreams { + pub streams: Vec, +} +impl RawStreams { + pub fn decoders(&self) -> () { + // TODO: create decoders for each h264 file stream + todo!() + } +} + + +// TODO: add a pipeline config for the session describing which components to run + + +fn generate_annotations( + mut commands: Commands, + raw_streams: Query< + ( + Entity, + &PipelineConfig, + &RawStreams, + &Session, + ), + Without, + >, +) { + for ( + entity, + config, + raw_streams, + session, + ) in raw_streams.iter() { + let raw_frames = RawFrames::new(session); + + if config.raw_frames { + let run_node = !RawFrames::exists(session); + let mut raw_frames = RawFrames::new(session); + + if run_node { + // TODO: add (identifier == "{}_{}", stream, frame) output to this first stage (forward to mask stage /w image in RAM) + let frames = raw_streams.streams.par_iter() + .map(|stream| { + let decoder = todo!(); + + (stream, decoder) + }) + .map(|(stream, decoder)| { + let frames: Vec = vec![]; + + // TODO: read all frames + + (stream, frames) + }) + .collect::>(); + + frames.par_iter() + .for_each(|(stream, frames)| { + frames.iter().enumerate() + .for_each(|(i, frame)| { + let path = format!( + "{}/frames/{}_{}.png", + session.directory, + stream, + i, + ); + frame.save(path).unwrap(); + }); + }); + } + } + + commands.entity(entity).insert(raw_frames); + } +} + + +#[derive(Component, Default, Reflect)] +pub struct RawFrames { + pub frames: Vec, +} +impl RawFrames { + // TODO: move new and exists to a trait + + pub fn new( + session: &Session, + ) -> Self { + let output_directory = format!("{}/frames", session.directory); + std::fs::create_dir_all(&output_directory).unwrap(); + + // TODO: load all files that are already in the directory + + Self { + frames: vec![], + } + } + + pub fn exists( + session: &Session, + ) -> bool { + let output_directory = format!("{}/frames", session.directory); + std::fs::metadata(&output_directory).is_ok() + } + + pub fn image(&self, camera: usize, frame: usize) -> Option { + todo!() + } +} + + +#[derive(Component, Default, Reflect)] +pub struct MaskFrames { + pub frames: Vec, +} +impl MaskFrames { + pub fn new( + session: &Session, + ) -> Self { + let output_directory = format!("{}/masks", session.directory); + std::fs::create_dir_all(&output_directory).unwrap(); + + Self { + frames: vec![], + } + } + + pub fn image(&self, camera: usize, frame: usize) -> Option { + todo!() + } +} + + +#[derive(Default, Clone, Reflect)] +pub struct LightFieldCamera { + // TODO: intrinsics/extrinsics +} + +#[derive(Component, Default, Reflect)] +pub struct LightFieldCameras { + pub cameras: Vec, +} + + + +fn get_next_session_id(output_directory: &str) -> usize { + match std::fs::read_dir(output_directory) { + Ok(entries) => entries.filter_map(|entry| { + let entry = entry.ok()?; + if entry.path().is_dir() { + entry.file_name().to_string_lossy().parse::().ok() + } else { + None + } + }) + .max() + .map_or(0, |max_id| max_id + 1), + Err(_) => 0, + } +} diff --git a/src/stream.rs b/src/stream.rs index bc1588d..96c338f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -20,6 +20,7 @@ use retina::{ SessionOptions, SetupOptions, TcpTransportOptions, + UdpTransportOptions, Transport, }, codec::VideoFrame, @@ -62,7 +63,7 @@ fn create_streams_from_descriptors( descriptors: Query< ( Entity, - &RtspStreamDescriptor, + &RtspStreamHandle, ), Without, >, @@ -78,7 +79,7 @@ fn create_streams_from_descriptors( pub fn apply_decode( mut images: ResMut>, - descriptors: Query<&RtspStreamDescriptor>, + descriptors: Query<&RtspStreamHandle>, ) { for descriptor in descriptors.iter() { let frame = descriptor.take_frame(); @@ -118,23 +119,42 @@ pub enum RecordingCommand { } -#[derive(Component, Clone)] -pub struct RtspStreamDescriptor { +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +pub enum StreamTransport { + #[default] + Tcp, + Udp, +} + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +pub struct StreamDescriptor { pub uri: String, + + #[serde(default)] + pub transport: StreamTransport, +} + +#[derive(Resource, Clone, Debug, Default, Serialize, Deserialize)] +pub struct StreamUris(pub Vec); + + +#[derive(Component, Clone)] +pub struct RtspStreamHandle { + pub descriptor: StreamDescriptor, pub id: StreamId, pub image: bevy::asset::Handle, latest_frame: Arc>>, recording_sender: Arc>>>, } -impl RtspStreamDescriptor { +impl RtspStreamHandle { pub fn new( - uri: String, + descriptor: StreamDescriptor, id: StreamId, image: bevy::asset::Handle, ) -> Self { Self { - uri, + descriptor, id, image, latest_frame: Arc::new(Mutex::new(None)), @@ -166,7 +186,7 @@ struct Bgra8Frame { #[derive(Resource)] pub struct RtspStreamManager { - stream_descriptors: Arc>>, + stream_handles: Arc>>, handle: Handle, } @@ -184,7 +204,7 @@ impl FromWorld for RtspStreamManager { }); Self { - stream_descriptors: Arc::new(Mutex::new(vec![])), + stream_handles: Arc::new(Mutex::new(vec![])), handle, } } @@ -192,11 +212,11 @@ impl FromWorld for RtspStreamManager { impl RtspStreamManager { pub fn contains(&self, id: StreamId) -> bool { - self.stream_descriptors.lock().unwrap().iter().any(|s: &RtspStreamDescriptor| s.id == id) + self.stream_handles.lock().unwrap().iter().any(|s: &RtspStreamHandle| s.id == id) } pub fn add_stream(&self, stream: RtspStream) { - self.stream_descriptors.lock().unwrap().push(stream.descriptor.clone()); + self.stream_handles.lock().unwrap().push(stream.handle.clone()); self.handle.spawn(async move { let mut stream = stream; @@ -207,10 +227,11 @@ impl RtspStreamManager { }); } - pub fn start_recording(&self, output_directory: &str, prefix: &str) { - let stream_descriptors = self.stream_descriptors.lock().unwrap(); - for descriptor in stream_descriptors.iter() { - let filepath = format!("{}/{}_{}.mp4", output_directory, prefix, descriptor.id.0); + pub fn start_recording(&self, output_directory: &str) { + let stream_handles = self.stream_handles.lock().unwrap(); + for descriptor in stream_handles.iter() { + let filename = format!("{}.mp4", descriptor.id.0); + let filepath = format!("{}/{}", output_directory, filename); let send_channel = descriptor.recording_sender.lock().unwrap(); let sender_clone = send_channel.as_ref().unwrap().clone(); @@ -222,35 +243,41 @@ impl RtspStreamManager { } } - pub fn stop_recording(&self) { - let stream_descriptors = self.stream_descriptors.lock().unwrap(); - for descriptor in stream_descriptors.iter() { + pub fn stop_recording(&self) -> Vec { + let mut filepaths = vec![]; + + let stream_handles = self.stream_handles.lock().unwrap(); + for descriptor in stream_handles.iter() { let send_channel = descriptor.recording_sender.lock().unwrap(); let sender_clone = send_channel.as_ref().unwrap().clone(); self.handle.block_on(async move { sender_clone.send(RecordingCommand::StopRecording).await.unwrap(); }); + + filepaths.push(format!("{}.mp4", descriptor.id.0)); } + + filepaths } } pub struct RtspStream { - pub descriptor: RtspStreamDescriptor, + pub handle: RtspStreamHandle, decoder: Option, demuxed: Option, writer: Option>, } impl RtspStream { - pub fn new(descriptor: RtspStreamDescriptor) -> Self { + pub fn new(handle: RtspStreamHandle) -> Self { let api = openh264::OpenH264API::from_source(); let decoder = Decoder::new(api).ok(); Self { - descriptor, + handle, decoder, demuxed: None, writer: None, @@ -258,13 +285,13 @@ impl RtspStream { } async fn run(&mut self) -> Result<(), Box>{ - let (session, stream_idx) = create_session(&self.descriptor.uri).await?; + let (session, stream_idx) = create_session(&self.handle.descriptor).await?; self.demuxed = session.demuxed()?.into(); let (sender, mut receiver) = mpsc::channel(1); { - let mut send_channel = self.descriptor.recording_sender.lock().unwrap(); + let mut send_channel = self.handle.recording_sender.lock().unwrap(); *send_channel = sender.into(); } @@ -284,11 +311,11 @@ impl RtspStream { file, ).await.ok(); - println!("writing stream {}", self.descriptor.id.0); + println!("writing stream {}", self.handle.id.0); }, RecordingCommand::StopRecording => { if let Some(writer) = self.writer.take() { - println!("stopped recording stream {}", self.descriptor.id.0); + println!("stopped recording stream {}", self.handle.id.0); writer.finish().await.ok(); } }, @@ -304,6 +331,8 @@ impl RtspStream { } } + // TODO: enable/disable decoding based on whether the live frames are being used + let mut data = frame.into_data(); convert_h264(&mut data)?; @@ -315,7 +344,7 @@ impl RtspStream { let image_size = frame.dimension_rgb(); { - let mut locked_sink = self.descriptor.latest_frame.lock().unwrap(); + let mut locked_sink = self.handle.latest_frame.lock().unwrap(); match *locked_sink { Some(ref mut sink) => { assert_eq!(u32::from(sink.width), image_size.0 as u32, "frame width mismatch - stream size changes are not supported yet."); @@ -359,11 +388,11 @@ impl RtspStream { } -async fn create_session(url: &str) -> Result< +async fn create_session(descriptor: &StreamDescriptor) -> Result< (Session, usize), Box > { - let parsed_url = Url::parse(url)?; + let parsed_url = Url::parse(&descriptor.uri)?; let username = parsed_url.username(); let password = parsed_url.password().unwrap_or(""); @@ -390,8 +419,10 @@ async fn create_session(url: &str) -> Result< options, ).await?; - let tcp_options = TcpTransportOptions::default(); - let transport = Transport::Tcp(tcp_options); + let transport = match descriptor.transport { + StreamTransport::Tcp => Transport::Tcp(TcpTransportOptions::default()), + StreamTransport::Udp => Transport::Udp(UdpTransportOptions::default()), + }; let video_stream_index = session.streams().iter().enumerate().find_map(|(i, s)| { if s.media() == "video" && s.encoding_name().to_uppercase() == "H264" { @@ -434,8 +465,3 @@ fn convert_h264(data: &mut [u8]) -> Result<(), Error> { Ok(()) } - - - -#[derive(Resource, Clone, Debug, Default, Reflect, Serialize, Deserialize)] -pub struct StreamUris(pub Vec); diff --git a/tools/viewer.rs b/tools/viewer.rs index b77ce81..2b46beb 100644 --- a/tools/viewer.rs +++ b/tools/viewer.rs @@ -28,8 +28,14 @@ use bevy_args::{ use bevy_light_field::{ LightFieldPlugin, materials::foreground::ForegroundMaterial, + pipeline::{ + PipelineConfig, + StreamSessionBundle, + Session, + RawStreams, + }, stream::{ - RtspStreamDescriptor, + RtspStreamHandle, RtspStreamManager, StreamId, StreamUris, @@ -59,6 +65,9 @@ pub struct LightFieldViewer { #[arg(long, default_value = "false")] pub show_fps: bool, + #[arg(long, default_value = "false")] + pub automatic_recording: bool, + #[arg(long, default_value = "false")] pub fullscreen: bool, @@ -115,6 +124,7 @@ fn main() { args.max_matting_height, )), )) + .init_resource::() .add_systems(Startup, create_streams) .add_systems(Startup, setup_camera) .add_systems( @@ -164,9 +174,11 @@ fn create_streams( ..default() }; + // TODO: support enabling/disabling decoding/matting per stream (e.g. during 'record mode') + let input_images: Vec> = stream_uris.0.iter() .enumerate() - .map(|(index, url)| { + .map(|(index, descriptor)| { let entity = commands.spawn_empty().id(); let mut image = Image { @@ -190,8 +202,8 @@ fn create_streams( let image = images.add(image); let image_clone = image.clone(); - let rtsp_stream = RtspStreamDescriptor::new( - url.to_string(), + let rtsp_stream = RtspStreamHandle::new( + descriptor.clone(), StreamId(index), image, ); @@ -227,7 +239,7 @@ fn create_streams( let mut material = None; #[cfg(feature = "person_matting")] - if args.extract_foreground { + if args.extract_foreground || (args.automatic_recording && index == 0) { let foreground_mat = foreground_materials.add(ForegroundMaterial { input: image.clone(), mask: mask_image.clone(), @@ -262,8 +274,9 @@ fn create_streams( .with_children(|builder| { input_images.iter() .zip(mask_images.iter()) - .for_each(|(input, (_mask, material))| { - if args.extract_foreground { + .enumerate() + .for_each(|(index, (input, (_mask, material)))| { + if args.extract_foreground || (args.automatic_recording && index == 0) { builder.spawn(MaterialNodeBundle { style: Style { width: Val::Percent(100.0), @@ -308,31 +321,72 @@ fn press_esc_close( } } + +// TODO: add system to detect person mask in camera 0 for automatic recording +fn automatic_recording( + mut commands: Commands, + stream_manager: Res, + mut live_session: ResMut, +) { + if live_session.0.is_some() { + return; + } + + // TODO: check the segmentation mask labeled for automatic recording detection +} + + +#[derive(Resource, Default)] +pub struct LiveSession(Option); + fn press_r_start_recording( + mut commands: Commands, keys: Res>, - stream_manager: Res + stream_manager: Res, + mut live_session: ResMut, ) { if keys.just_pressed(KeyCode::KeyR) { + if live_session.0.is_some() { + return; + } - let output_directory = "capture"; - let session_id = get_next_session_id(output_directory); - let output_directory = format!("{}/{}", output_directory, session_id); - - std::fs::create_dir_all(&output_directory).unwrap(); + let session = Session { + directory: "capture".to_string(), + ..default() + }; stream_manager.start_recording( - &output_directory, - "bevy_light_field", + &session.directory, ); + + let entity = commands.spawn( + StreamSessionBundle { + session: session, + raw_streams: RawStreams { + streams: vec![], + }, + config: PipelineConfig::default(), + }, + ).id(); + live_session.0 = Some(entity); } } fn press_s_stop_recording( + mut commands: Commands, keys: Res>, - stream_manager: Res + stream_manager: Res, + mut live_session: ResMut, ) { - if keys.just_pressed(KeyCode::KeyS) { - stream_manager.stop_recording(); + if keys.just_pressed(KeyCode::KeyS) && live_session.0.is_some() { + let session_entity = live_session.0.take().unwrap(); + + let raw_streams = stream_manager.stop_recording(); + + commands.entity(session_entity) + .insert(RawStreams { + streams: raw_streams, + }); } } @@ -370,21 +424,7 @@ fn calculate_grid_dimensions(window_width: f32, window_height: f32, num_streams: } -fn get_next_session_id(output_directory: &str) -> i32 { - match std::fs::read_dir(output_directory) { - Ok(entries) => entries.filter_map(|entry| { - let entry = entry.ok()?; - if entry.path().is_dir() { - entry.file_name().to_string_lossy().parse::().ok() - } else { - None - } - }) - .max() - .map_or(0, |max_id| max_id + 1), - Err(_) => 0, - } -} + fn fps_display_setup(