diff --git a/Cargo.lock b/Cargo.lock index 186d7e08..e390f48e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1083,7 +1083,7 @@ dependencies = [ [[package]] name = "moq-clock" -version = "0.4.2" +version = "0.5.0" dependencies = [ "anyhow", "chrono", @@ -1100,7 +1100,7 @@ dependencies = [ [[package]] name = "moq-dir" -version = "0.1.2" +version = "0.2.0" dependencies = [ "anyhow", "bytes", @@ -1118,7 +1118,7 @@ dependencies = [ [[package]] name = "moq-native" -version = "0.3.0" +version = "0.4.0" dependencies = [ "anyhow", "clap", @@ -1140,7 +1140,7 @@ dependencies = [ [[package]] name = "moq-pub" -version = "0.6.1" +version = "0.7.0" dependencies = [ "anyhow", "bytes", @@ -1161,7 +1161,7 @@ dependencies = [ [[package]] name = "moq-relay" -version = "0.5.1" +version = "0.6.0" dependencies = [ "anyhow", "axum", @@ -1183,7 +1183,7 @@ dependencies = [ [[package]] name = "moq-sub" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anyhow", "clap", @@ -1200,7 +1200,7 @@ dependencies = [ [[package]] name = "moq-transport" -version = "0.5.3" +version = "0.6.0" dependencies = [ "bytes", "futures", diff --git a/moq-clock/Cargo.toml b/moq-clock/Cargo.toml index 3c75b6fd..4734b67a 100644 --- a/moq-clock/Cargo.toml +++ b/moq-clock/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.4.2" +version = "0.5.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native = { path = "../moq-native", version = "0.3" } -moq-transport = { path = "../moq-transport", version = "0.5" } +moq-native = { path = "../moq-native", version = "0.4" } +moq-transport = { path = "../moq-transport", version = "0.6" } # QUIC url = "2" diff --git a/moq-dir/Cargo.toml b/moq-dir/Cargo.toml index 2f4f36a3..32ac44dd 100644 --- a/moq-dir/Cargo.toml +++ b/moq-dir/Cargo.toml @@ -5,15 +5,15 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.1.2" +version = "0.2.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] -moq-native = { path = "../moq-native", version = "0.3" } -moq-transport = { path = "../moq-transport", version = "0.5" } +moq-native = { path = "../moq-native", version = "0.4" } +moq-transport = { path = "../moq-transport", version = "0.6" } # QUIC web-transport = { workspace = true } diff --git a/moq-native/Cargo.toml b/moq-native/Cargo.toml index 9bf126bb..e88db7d4 100644 --- a/moq-native/Cargo.toml +++ b/moq-native/Cargo.toml @@ -5,14 +5,14 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.3.0" +version = "0.4.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] -moq-transport = { path = "../moq-transport", version = "0.5" } +moq-transport = { path = "../moq-transport", version = "0.6" } web-transport = { workspace = true } web-transport-quinn = "0.3" diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index bb307c93..43a30bc3 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Mike English", "Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.6.1" +version = "0.7.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native = { path = "../moq-native", version = "0.3" } -moq-transport = { path = "../moq-transport", version = "0.5" } +moq-native = { path = "../moq-native", version = "0.4" } +moq-transport = { path = "../moq-transport", version = "0.6" } moq-catalog = { path = "../moq-catalog", version = "0.2" } url = "2" diff --git a/moq-relay/Cargo.toml b/moq-relay/Cargo.toml index 971a9c56..3bca6230 100644 --- a/moq-relay/Cargo.toml +++ b/moq-relay/Cargo.toml @@ -5,15 +5,15 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.5.1" +version = "0.6.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] -moq-transport = { path = "../moq-transport", version = "0.5" } -moq-native = { path = "../moq-native", version = "0.3" } +moq-transport = { path = "../moq-transport", version = "0.6" } +moq-native = { path = "../moq-native", version = "0.4" } moq-api = { path = "../moq-api", version = "0.2" } # QUIC diff --git a/moq-sub/Cargo.toml b/moq-sub/Cargo.toml index 62b1ca39..8d68f24f 100644 --- a/moq-sub/Cargo.toml +++ b/moq-sub/Cargo.toml @@ -5,7 +5,7 @@ authors = [] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.1.1" +version = "0.2.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-transport = { path = "../moq-transport", version = "0.5" } -moq-native = { path = "../moq-native", version = "0.3" } +moq-transport = { path = "../moq-transport", version = "0.6" } +moq-native = { path = "../moq-native", version = "0.4" } url = "2" # Async stuff diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 4eae1329..583fadb7 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" -version = "0.5.3" +version = "0.6.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index 3e9be526..998254d9 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -34,6 +34,18 @@ pub enum DecodeError { #[error("invalid subscribe location")] InvalidSubscribeLocation, + #[error("invalid filter type")] + InvalidFilterType, + + #[error("invalid object status")] + InvalidObjectStatus, + + #[error("invalid track status code")] + InvalidTrackStatusCode, + + #[error("missing field")] + MissingField, + #[error("invalid value")] InvalidValue, diff --git a/moq-transport/src/coding/encode.rs b/moq-transport/src/coding/encode.rs index 04bc9a61..548dbb97 100644 --- a/moq-transport/src/coding/encode.rs +++ b/moq-transport/src/coding/encode.rs @@ -28,6 +28,9 @@ pub enum EncodeError { #[error("invalid value")] InvalidValue, + #[error("missing field")] + MissingField, + #[error("i/o error: {0}")] Io(sync::Arc), } diff --git a/moq-transport/src/data/datagram.rs b/moq-transport/src/data/datagram.rs index 970b5220..84f96fc3 100644 --- a/moq-transport/src/data/datagram.rs +++ b/moq-transport/src/data/datagram.rs @@ -1,4 +1,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::data::ObjectStatus; + #[derive(Clone, Debug)] pub struct Datagram { // The subscribe ID. @@ -16,6 +18,9 @@ pub struct Datagram { // The priority, where **smaller** values are sent first. pub send_order: u64, + // Object status + pub object_status: ObjectStatus, + // The payload. pub payload: bytes::Bytes, } @@ -27,6 +32,7 @@ impl Decode for Datagram { let group_id = u64::decode(r)?; let object_id = u64::decode(r)?; let send_order = u64::decode(r)?; + let object_status = ObjectStatus::decode(r)?; let payload = r.copy_to_bytes(r.remaining()); Ok(Self { @@ -35,6 +41,7 @@ impl Decode for Datagram { group_id, object_id, send_order, + object_status, payload, }) } @@ -47,6 +54,7 @@ impl Encode for Datagram { self.group_id.encode(w)?; self.object_id.encode(w)?; self.send_order.encode(w)?; + self.object_status.encode(w)?; Self::encode_remaining(w, self.payload.len())?; w.put_slice(&self.payload); diff --git a/moq-transport/src/data/group.rs b/moq-transport/src/data/group.rs index aee076a1..417a3453 100644 --- a/moq-transport/src/data/group.rs +++ b/moq-transport/src/data/group.rs @@ -1,4 +1,5 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::data::ObjectStatus; #[derive(Clone, Debug)] pub struct GroupHeader { @@ -41,6 +42,7 @@ impl Encode for GroupHeader { pub struct GroupObject { pub object_id: u64, pub size: usize, + pub status: ObjectStatus, } impl Decode for GroupObject { @@ -48,7 +50,19 @@ impl Decode for GroupObject { let object_id = u64::decode(r)?; let size = usize::decode(r)?; - Ok(Self { object_id, size }) + // If the size is 0, then the status is sent explicitly. + // Otherwise, the status is assumed to be 0x0 (Object). + let status = if size == 0 { + ObjectStatus::decode(r)? + } else { + ObjectStatus::Object + }; + + Ok(Self { + object_id, + size, + status, + }) } } @@ -57,6 +71,12 @@ impl Encode for GroupObject { self.object_id.encode(w)?; self.size.encode(w)?; + // If the size is 0, then the status is sent explicitly. + // Otherwise, the status is assumed to be 0x0 (Object). + if self.size == 0 { + self.status.encode(w)?; + } + Ok(()) } } diff --git a/moq-transport/src/data/object.rs b/moq-transport/src/data/object.rs index 6b601e28..e34ea617 100644 --- a/moq-transport/src/data/object.rs +++ b/moq-transport/src/data/object.rs @@ -1,5 +1,39 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ObjectStatus { + Object = 0x0, + ObjectDoesNotExist = 0x1, + GroupDoesNotExist = 0x2, + EndOfGroup = 0x3, + EndOfTrack = 0x4, +} + +impl Decode for ObjectStatus { + fn decode(r: &mut B) -> Result { + match u64::decode(r)? { + 0x0 => Ok(Self::Object), + 0x1 => Ok(Self::ObjectDoesNotExist), + 0x2 => Ok(Self::GroupDoesNotExist), + 0x3 => Ok(Self::EndOfGroup), + 0x4 => Ok(Self::EndOfTrack), + _ => Err(DecodeError::InvalidObjectStatus), + } + } +} + +impl Encode for ObjectStatus { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + Self::Object => (0x0_u64).encode(w), + Self::ObjectDoesNotExist => (0x1_u64).encode(w), + Self::GroupDoesNotExist => (0x2_u64).encode(w), + Self::EndOfGroup => (0x3_u64).encode(w), + Self::EndOfTrack => (0x4_u64).encode(w), + } + } +} + #[derive(Clone, Debug)] pub struct ObjectHeader { // The subscribe ID. @@ -16,6 +50,9 @@ pub struct ObjectHeader { // The send order, where **smaller** values are sent first. pub send_order: u64, + + // The object status + pub object_status: ObjectStatus, } impl Decode for ObjectHeader { @@ -26,6 +63,7 @@ impl Decode for ObjectHeader { group_id: u64::decode(r)?, object_id: u64::decode(r)?, send_order: u64::decode(r)?, + object_status: ObjectStatus::decode(r)?, }) } } @@ -37,6 +75,7 @@ impl Encode for ObjectHeader { self.group_id.encode(w)?; self.object_id.encode(w)?; self.send_order.encode(w)?; + self.object_status.encode(w)?; Ok(()) } diff --git a/moq-transport/src/data/track.rs b/moq-transport/src/data/track.rs index 70ce3a06..ff0aa3c7 100644 --- a/moq-transport/src/data/track.rs +++ b/moq-transport/src/data/track.rs @@ -1,4 +1,5 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::data::ObjectStatus; #[derive(Clone, Debug)] pub struct TrackHeader { @@ -37,6 +38,7 @@ pub struct TrackObject { pub group_id: u64, pub object_id: u64, pub size: usize, + pub status: ObjectStatus, } impl Decode for TrackObject { @@ -46,10 +48,19 @@ impl Decode for TrackObject { let object_id = u64::decode(r)?; let size = usize::decode(r)?; + // If the size is 0, then the status is sent explicitly. + // Otherwise, the status is assumed to be 0x0 (Object). + let status = if size == 0 { + ObjectStatus::decode(r)? + } else { + ObjectStatus::Object + }; + Ok(Self { group_id, object_id, size, + status, }) } } @@ -60,6 +71,12 @@ impl Encode for TrackObject { self.object_id.encode(w)?; self.size.encode(w)?; + // If the size is 0, then the status is sent explicitly. + // Otherwise, the status is assumed to be 0x0 (Object). + if self.size == 0 { + self.status.encode(w)?; + } + Ok(()) } } diff --git a/moq-transport/src/message/filter_type.rs b/moq-transport/src/message/filter_type.rs new file mode 100644 index 00000000..1edbf42f --- /dev/null +++ b/moq-transport/src/message/filter_type.rs @@ -0,0 +1,34 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +/// Filter Types +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-filter-types +#[derive(Clone, Debug, PartialEq)] +pub enum FilterType { + LatestGroup = 0x1, + LatestObject = 0x2, + AbsoluteStart = 0x3, + AbsoluteRange = 0x4, +} + +impl Encode for FilterType { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + Self::LatestGroup => (0x1_u64).encode(w), + Self::LatestObject => (0x2_u64).encode(w), + Self::AbsoluteStart => (0x3_u64).encode(w), + Self::AbsoluteRange => (0x4_u64).encode(w), + } + } +} + +impl Decode for FilterType { + fn decode(r: &mut R) -> Result { + match u64::decode(r)? { + 0x01 => Ok(Self::LatestGroup), + 0x02 => Ok(Self::LatestObject), + 0x03 => Ok(Self::AbsoluteStart), + 0x04 => Ok(Self::AbsoluteRange), + _ => Err(DecodeError::InvalidFilterType), + } + } +} diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 1812f769..54f8bb2c 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -35,13 +35,17 @@ mod announce; mod announce_cancel; mod announce_error; mod announce_ok; +mod filter_type; mod go_away; mod publisher; mod subscribe; mod subscribe_done; mod subscribe_error; mod subscribe_ok; +mod subscribe_update; mod subscriber; +mod track_status; +mod track_status_request; mod unannounce; mod unsubscribe; @@ -49,13 +53,17 @@ pub use announce::*; pub use announce_cancel::*; pub use announce_error::*; pub use announce_ok::*; +pub use filter_type::*; pub use go_away::*; pub use publisher::*; pub use subscribe::*; pub use subscribe_done::*; pub use subscribe_error::*; pub use subscribe_ok::*; +pub use subscribe_update::*; pub use subscriber::*; +pub use track_status::*; +pub use track_status_request::*; pub use unannounce::*; pub use unsubscribe::*; @@ -141,6 +149,7 @@ message_types! { // SetupServer = 0x41 // SUBSCRIBE family, sent by subscriber + SubscribeUpdate = 0x2, Subscribe = 0x3, Unsubscribe = 0xa, @@ -158,6 +167,53 @@ message_types! { AnnounceError = 0x8, AnnounceCancel = 0xc, + // TRACK_STATUS_REQUEST, sent by subscriber + TrackStatusRequest = 0xd, + + // TRACK_STATUS, sent by publisher + TrackStatus = 0xe, + // Misc GoAway = 0x10, } + +/// Track Status Codes +/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-track_status +#[derive(Clone, Debug, PartialEq, Copy)] +pub enum TrackStatusCode { + // 0x00: The track is in progress, and subsequent fields contain the highest group and object ID for that track. + InProgress = 0x00, + // 0x01: The track does not exist. Subsequent fields MUST be zero, and any other value is a malformed message. + DoesNotExist = 0x01, + // 0x02: The track has not yet begun. Subsequent fields MUST be zero. Any other value is a malformed message. + NotYetBegun = 0x02, + // 0x03: The track has finished, so there is no "live edge." Subsequent fields contain the highest Group and object ID known. + Finished = 0x03, + // 0x04: The sender is a relay that cannot obtain the current track status from upstream. Subsequent fields contain the largest group and object ID known. + Relay = 0x04, +} + +impl Decode for TrackStatusCode { + fn decode(r: &mut B) -> Result { + match u64::decode(r)? { + 0x00 => Ok(Self::InProgress), + 0x01 => Ok(Self::DoesNotExist), + 0x02 => Ok(Self::NotYetBegun), + 0x03 => Ok(Self::Finished), + 0x04 => Ok(Self::Relay), + _ => Err(DecodeError::InvalidTrackStatusCode), + } + } +} + +impl Encode for TrackStatusCode { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + Self::InProgress => (0x00_u64).encode(w), + Self::DoesNotExist => (0x01_u64).encode(w), + Self::NotYetBegun => (0x02_u64).encode(w), + Self::Finished => (0x03_u64).encode(w), + Self::Relay => (0x04_u64).encode(w), + } + } +} diff --git a/moq-transport/src/message/publisher.rs b/moq-transport/src/message/publisher.rs index 24ed96e1..703f92ca 100644 --- a/moq-transport/src/message/publisher.rs +++ b/moq-transport/src/message/publisher.rs @@ -50,4 +50,5 @@ publisher_msgs! { SubscribeOk, SubscribeError, SubscribeDone, + TrackStatus, } diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index 328adb8f..b9d807ce 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -1,4 +1,5 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::message::FilterType; /// Sent by the subscriber to request all future objects for the given track. /// @@ -13,9 +14,12 @@ pub struct Subscribe { pub track_namespace: String, pub track_name: String, - /// The start/end group/object. - pub start: SubscribePair, - pub end: SubscribePair, + /// Filter type + pub filter_type: FilterType, + + /// The start/end group/object. (TODO: Make optional) + pub start: Option, // TODO: Make optional + pub end: Option, // TODO: Make optional /// Optional parameters pub params: Params, @@ -28,17 +32,42 @@ impl Decode for Subscribe { let track_namespace = String::decode(r)?; let track_name = String::decode(r)?; - let start = SubscribePair::decode(r)?; - let end = SubscribePair::decode(r)?; - - // You can't have a start object without a start group. - if start.group == SubscribeLocation::None && start.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); + let filter_type = FilterType::decode(r)?; + + let start: Option; + let end: Option; + match filter_type { + FilterType::AbsoluteStart => { + if r.remaining() < 2 { + return Err(DecodeError::MissingField); + } + start = Some(SubscribePair::decode(r)?); + end = None; + } + FilterType::AbsoluteRange => { + if r.remaining() < 4 { + return Err(DecodeError::MissingField); + } + start = Some(SubscribePair::decode(r)?); + end = Some(SubscribePair::decode(r)?); + } + _ => { + start = None; + end = None; + } } - // You can't have an end object without an end group. - if end.group == SubscribeLocation::None && end.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); + if let Some(s) = &start { + // You can't have a start object without a start group. + if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + } + if let Some(e) = &end { + // You can't have an end object without an end group. + if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } } // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. @@ -50,6 +79,7 @@ impl Decode for Subscribe { track_alias, track_namespace, track_name, + filter_type, start, end, params, @@ -64,8 +94,19 @@ impl Encode for Subscribe { self.track_namespace.encode(w)?; self.track_name.encode(w)?; - self.start.encode(w)?; - self.end.encode(w)?; + self.filter_type.encode(w)?; + + if self.filter_type == FilterType::AbsoluteStart || self.filter_type == FilterType::AbsoluteRange { + if self.start.is_none() || self.end.is_none() { + return Err(EncodeError::MissingField); + } + if let Some(start) = &self.start { + start.encode(w)?; + } + if let Some(end) = &self.end { + end.encode(w)?; + } + } self.params.encode(w)?; diff --git a/moq-transport/src/message/subscribe_update.rs b/moq-transport/src/message/subscribe_update.rs new file mode 100644 index 00000000..37ac7814 --- /dev/null +++ b/moq-transport/src/message/subscribe_update.rs @@ -0,0 +1,116 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::message::subscribe::{SubscribeLocation, SubscribePair}; +use crate::message::FilterType; + +/// Sent by the subscriber to request all future objects for the given track. +/// +/// Objects will use the provided ID instead of the full track name, to save bytes. +#[derive(Clone, Debug)] +pub struct SubscribeUpdate { + /// The subscription ID + pub id: u64, + + /// Track properties + pub track_alias: u64, // This alias is useless but part of the spec + pub track_namespace: String, + pub track_name: String, + + /// Filter type + pub filter_type: FilterType, + + /// The start/end group/object. (TODO: Make optional) + pub start: Option, // TODO: Make optional + pub end: Option, // TODO: Make optional + + /// Optional parameters + pub params: Params, +} + +impl Decode for SubscribeUpdate { + fn decode(r: &mut R) -> Result { + let id = u64::decode(r)?; + let track_alias = u64::decode(r)?; + let track_namespace = String::decode(r)?; + let track_name = String::decode(r)?; + + let filter_type = FilterType::decode(r)?; + + let start: Option; + let end: Option; + match filter_type { + FilterType::AbsoluteStart => { + if r.remaining() < 2 { + return Err(DecodeError::MissingField); + } + start = Some(SubscribePair::decode(r)?); + end = None; + } + FilterType::AbsoluteRange => { + if r.remaining() < 4 { + return Err(DecodeError::MissingField); + } + start = Some(SubscribePair::decode(r)?); + end = Some(SubscribePair::decode(r)?); + } + _ => { + start = None; + end = None; + } + } + + if let Some(s) = &start { + // You can't have a start object without a start group. + if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + } + if let Some(e) = &end { + // You can't have an end object without an end group. + if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + } + + // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. + + let params = Params::decode(r)?; + + Ok(Self { + id, + track_alias, + track_namespace, + track_name, + filter_type, + start, + end, + params, + }) + } +} + +impl Encode for SubscribeUpdate { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.id.encode(w)?; + self.track_alias.encode(w)?; + self.track_namespace.encode(w)?; + self.track_name.encode(w)?; + + self.filter_type.encode(w)?; + + if self.filter_type == FilterType::AbsoluteStart || self.filter_type == FilterType::AbsoluteRange { + if self.start.is_none() || self.end.is_none() { + return Err(EncodeError::MissingField); + } + if let Some(start) = &self.start { + start.encode(w)?; + } + if let Some(end) = &self.end { + end.encode(w)?; + } + } + + self.params.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscriber.rs b/moq-transport/src/message/subscriber.rs index 914ba3c9..5b699aa2 100644 --- a/moq-transport/src/message/subscriber.rs +++ b/moq-transport/src/message/subscriber.rs @@ -50,4 +50,6 @@ subscriber_msgs! { AnnounceCancel, Subscribe, Unsubscribe, + SubscribeUpdate, + TrackStatusRequest, } diff --git a/moq-transport/src/message/track_status.rs b/moq-transport/src/message/track_status.rs new file mode 100644 index 00000000..16e0dc04 --- /dev/null +++ b/moq-transport/src/message/track_status.rs @@ -0,0 +1,39 @@ +use super::TrackStatusCode; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +#[derive(Clone, Debug)] +pub struct TrackStatus { + /// Track Namespace + pub track_namespace: String, + /// Track Name + pub track_name: String, + /// Status Code + pub status_code: TrackStatusCode, + /// Last Group ID + pub last_group_id: u64, + /// Last Object ID + pub last_object_id: u64, +} + +impl Decode for TrackStatus { + fn decode(r: &mut R) -> Result { + Ok(Self { + track_namespace: String::decode(r)?, + track_name: String::decode(r)?, + status_code: TrackStatusCode::decode(r)?, + last_group_id: u64::decode(r)?, + last_object_id: u64::decode(r)?, + }) + } +} + +impl Encode for TrackStatus { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_namespace.encode(w)?; + self.track_name.encode(w)?; + self.status_code.encode(w)?; + self.last_group_id.encode(w)?; + self.last_object_id.encode(w)?; + Ok(()) + } +} diff --git a/moq-transport/src/message/track_status_request.rs b/moq-transport/src/message/track_status_request.rs new file mode 100644 index 00000000..89000662 --- /dev/null +++ b/moq-transport/src/message/track_status_request.rs @@ -0,0 +1,30 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +#[derive(Clone, Debug)] +pub struct TrackStatusRequest { + /// Track Namespace + pub track_namespace: String, + /// Track Name + pub track_name: String, +} + +impl Decode for TrackStatusRequest { + fn decode(r: &mut R) -> Result { + let track_namespace = String::decode(r)?; + let track_name = String::decode(r)?; + + Ok(Self { + track_namespace, + track_name, + }) + } +} + +impl Encode for TrackStatusRequest { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_namespace.encode(w)?; + self.track_name.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/serve/datagram.rs b/moq-transport/src/serve/datagram.rs index fc92c86f..9683389a 100644 --- a/moq-transport/src/serve/datagram.rs +++ b/moq-transport/src/serve/datagram.rs @@ -1,5 +1,6 @@ use std::{fmt, sync::Arc}; +use crate::data::ObjectStatus; use crate::watch::State; use super::{ServeError, Track}; @@ -118,6 +119,7 @@ pub struct Datagram { pub group_id: u64, pub object_id: u64, pub priority: u64, + pub status: ObjectStatus, pub payload: bytes::Bytes, } @@ -127,6 +129,7 @@ impl fmt::Debug for Datagram { .field("object_id", &self.object_id) .field("group_id", &self.group_id) .field("priority", &self.priority) + .field("status", &self.status) .field("payload", &self.payload.len()) .finish() } diff --git a/moq-transport/src/serve/group.rs b/moq-transport/src/serve/group.rs index 78d861dd..d274ceba 100644 --- a/moq-transport/src/serve/group.rs +++ b/moq-transport/src/serve/group.rs @@ -10,6 +10,7 @@ use bytes::Bytes; use std::{cmp, ops::Deref, sync::Arc}; +use crate::data::ObjectStatus; use crate::watch::State; use super::{ServeError, Track}; @@ -270,6 +271,7 @@ impl GroupWriter { let (writer, reader) = GroupObject { group: self.info.clone(), object_id: self.next, + status: ObjectStatus::Object, size, } .produce(); @@ -396,6 +398,9 @@ pub struct GroupObject { // The size of the object. pub size: usize, + + // Object status + pub status: ObjectStatus, } impl GroupObject { diff --git a/moq-transport/src/serve/object.rs b/moq-transport/src/serve/object.rs index 3be37e9d..29293a5c 100644 --- a/moq-transport/src/serve/object.rs +++ b/moq-transport/src/serve/object.rs @@ -14,6 +14,8 @@ use super::{ServeError, Track}; use crate::watch::State; use bytes::Bytes; +use crate::data::ObjectStatus; + pub struct Objects { pub track: Arc, } @@ -71,6 +73,7 @@ impl ObjectsWriter { group_id: object.group_id, object_id: object.object_id, priority: object.priority, + status: ObjectStatus::Object, }; let (writer, reader) = object.produce(); @@ -191,6 +194,9 @@ pub struct ObjectInfo { // The priority of the stream. pub priority: u64, + + // The object status + pub status: ObjectStatus, } impl Deref for ObjectInfo { diff --git a/moq-transport/src/serve/stream.rs b/moq-transport/src/serve/stream.rs index c843ba43..0aa210a8 100644 --- a/moq-transport/src/serve/stream.rs +++ b/moq-transport/src/serve/stream.rs @@ -1,6 +1,7 @@ use bytes::Bytes; use std::{ops::Deref, sync::Arc}; +use crate::data::ObjectStatus; use crate::watch::State; use super::{ServeError, Track}; @@ -233,6 +234,7 @@ impl StreamGroupWriter { let (writer, reader) = StreamObject { group: self.info.clone(), object_id: self.next, + status: ObjectStatus::Object, size, } .produce(); @@ -325,6 +327,9 @@ pub struct StreamObject { // The size of the object. pub size: usize, + + // Object status + pub status: ObjectStatus, } impl StreamObject { diff --git a/moq-transport/src/session/announce.rs b/moq-transport/src/session/announce.rs index c85a91f8..706be529 100644 --- a/moq-transport/src/session/announce.rs +++ b/moq-transport/src/session/announce.rs @@ -3,7 +3,7 @@ use std::{collections::VecDeque, ops}; use crate::watch::State; use crate::{message, serve::ServeError}; -use super::{Publisher, Subscribed}; +use super::{Publisher, Subscribed, TrackStatusRequested}; #[derive(Debug, Clone)] pub struct AnnounceInfo { @@ -12,6 +12,7 @@ pub struct AnnounceInfo { struct AnnounceState { subscribers: VecDeque, + track_statuses_requested: VecDeque, ok: bool, closed: Result<(), ServeError>, } @@ -20,6 +21,7 @@ impl Default for AnnounceState { fn default() -> Self { Self { subscribers: Default::default(), + track_statuses_requested: Default::default(), ok: false, closed: Ok(()), } @@ -81,7 +83,7 @@ impl Announce { } } - pub async fn subscribed(&mut self) -> Result, ServeError> { + pub async fn subscribed(&self) -> Result, ServeError> { loop { { let state = self.state.lock(); @@ -99,6 +101,26 @@ impl Announce { } } + pub async fn track_status_requested(&self) -> Result, ServeError> { + loop { + { + let state = self.state.lock(); + if !state.track_statuses_requested.is_empty() { + return Ok(state + .into_mut() + .and_then(|mut state| state.track_statuses_requested.pop_front())); + } + + state.closed.clone()?; + match state.modified() { + Some(notified) => notified, + None => return Ok(None), + } + } + .await; + } + } + // Wait until an OK is received pub async fn ok(&self) -> Result<(), ServeError> { loop { @@ -172,4 +194,13 @@ impl AnnounceRecv { Ok(()) } + + pub fn recv_track_status_requested( + &mut self, + track_status_requested: TrackStatusRequested, + ) -> Result<(), ServeError> { + let mut state = self.state.lock_mut().ok_or(ServeError::Done)?; + state.track_statuses_requested.push_back(track_status_requested); + Ok(()) + } } diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index 47ddc4d7..8c1c90d9 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -6,6 +6,7 @@ mod reader; mod subscribe; mod subscribed; mod subscriber; +mod track_status_requested; mod writer; pub use announce::*; @@ -15,6 +16,7 @@ pub use publisher::*; pub use subscribe::*; pub use subscribed::*; pub use subscriber::*; +pub use track_status_requested::*; use reader::*; use writer::*; @@ -77,7 +79,7 @@ impl Session { let mut sender = Writer::new(control.0); let mut recver = Reader::new(control.1); - let versions: setup::Versions = [setup::Version::DRAFT_03].into(); + let versions: setup::Versions = [setup::Version::DRAFT_04].into(); let client = setup::Client { role, @@ -126,10 +128,10 @@ impl Session { let client: setup::Client = recver.decode().await?; log::debug!("received client SETUP: {:?}", client); - if !client.versions.contains(&setup::Version::DRAFT_03) { + if !client.versions.contains(&setup::Version::DRAFT_04) { return Err(SessionError::Version( client.versions, - [setup::Version::DRAFT_03].into(), + [setup::Version::DRAFT_04].into(), )); } @@ -150,7 +152,7 @@ impl Session { let server = setup::Server { role, - version: setup::Version::DRAFT_03, + version: setup::Version::DRAFT_04, params: Default::default(), }; diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 0335daf3..20da5532 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -13,7 +13,7 @@ use crate::{ use crate::watch::Queue; -use super::{Announce, AnnounceRecv, Session, SessionError, Subscribed, SubscribedRecv}; +use super::{Announce, AnnounceRecv, Session, SessionError, Subscribed, SubscribedRecv, TrackStatusRequested}; // TODO remove Clone. #[derive(Clone)] @@ -51,7 +51,7 @@ impl Publisher { /// Announce a namespace and serve tracks using the provided [serve::TracksReader]. /// The caller uses [serve::TracksWriter] for static tracks and [serve::TracksRequest] for dynamic tracks. pub async fn announce(&mut self, tracks: TracksReader) -> Result<(), SessionError> { - let mut announce = match self.announces.lock().unwrap().entry(tracks.namespace.clone()) { + let announce = match self.announces.lock().unwrap().entry(tracks.namespace.clone()) { hash_map::Entry::Occupied(_) => return Err(ServeError::Duplicate.into()), hash_map::Entry::Vacant(entry) => { let (send, recv) = Announce::new(self.clone(), tracks.namespace.clone()); @@ -60,29 +60,47 @@ impl Publisher { } }; - let mut tasks = FuturesUnordered::new(); - let mut done = None; + let mut subscribe_tasks = FuturesUnordered::new(); + let mut status_tasks = FuturesUnordered::new(); + let mut subscribe_done = false; + let mut status_done = false; loop { tokio::select! { - subscribe = announce.subscribed(), if done.is_none() => { - let subscribe = match subscribe { - Ok(Some(subscribe)) => subscribe, - Ok(None) => { done = Some(Ok(())); continue }, - Err(err) => { done = Some(Err(err)); continue }, - }; - - let tracks = tracks.clone(); - - tasks.push(async move { - let info = subscribe.info.clone(); - if let Err(err) = Self::serve_subscribe(subscribe, tracks).await { - log::warn!("failed serving subscribe: {:?}, error: {}", info, err) - } - }); + res = announce.subscribed(), if !subscribe_done => { + match res? { + Some(subscribed) => { + let tracks = tracks.clone(); + + subscribe_tasks.push(async move { + let info = subscribed.info.clone(); + if let Err(err) = Self::serve_subscribe(subscribed, tracks).await { + log::warn!("failed serving subscribe: {:?}, error: {}", info, err) + } + }); + }, + None => subscribe_done = true, + } + + }, + res = announce.track_status_requested(), if !status_done => { + match res? { + Some(status) => { + let tracks = tracks.clone(); + + status_tasks.push(async move { + let info = status.info.clone(); + if let Err(err) = Self::serve_track_status(status, tracks).await { + log::warn!("failed serving track status request: {:?}, error: {}", info, err) + } + }); + }, + None => status_done = true, + } }, - _ = tasks.next(), if !tasks.is_empty() => {}, - else => return Ok(done.unwrap()?) + Some(res) = subscribe_tasks.next() => res, + Some(res) = status_tasks.next() => res, + else => return Ok(()) } } } @@ -97,6 +115,39 @@ impl Publisher { Ok(()) } + pub async fn serve_track_status( + mut track_status_request: TrackStatusRequested, + mut tracks: TracksReader, + ) -> Result<(), SessionError> { + let track = tracks + .subscribe(&track_status_request.info.track.clone()) + .ok_or(ServeError::NotFound)?; + let response; + + if let Some((latest_group_id, latest_object_id)) = track.latest() { + response = message::TrackStatus { + track_namespace: track_status_request.info.namespace.clone(), + track_name: track_status_request.info.track.clone(), + status_code: message::TrackStatusCode::InProgress, + last_group_id: latest_group_id, + last_object_id: latest_object_id, + }; + } else { + response = message::TrackStatus { + track_namespace: track_status_request.info.namespace.clone(), + track_name: track_status_request.info.track.clone(), + status_code: message::TrackStatusCode::DoesNotExist, + last_group_id: 0, + last_object_id: 0, + }; + } + // TODO: can we know of any other statuses in this context? + + track_status_request.respond(response).await?; + + Ok(()) + } + // Returns subscriptions that do not map to an active announce. pub async fn subscribed(&mut self) -> Option { self.unknown.pop().await @@ -109,6 +160,8 @@ impl Publisher { message::Subscriber::AnnounceCancel(msg) => self.recv_announce_cancel(msg), message::Subscriber::Subscribe(msg) => self.recv_subscribe(msg), message::Subscriber::Unsubscribe(msg) => self.recv_unsubscribe(msg), + message::Subscriber::SubscribeUpdate(msg) => self.recv_subscribe_update(msg), + message::Subscriber::TrackStatusRequest(msg) => self.recv_track_status_request(msg), }; if let Err(err) = res { @@ -175,6 +228,24 @@ impl Publisher { Ok(()) } + fn recv_subscribe_update(&mut self, _msg: message::SubscribeUpdate) -> Result<(), SessionError> { + // TODO: Implement updating subscriptions. + Err(SessionError::Internal) + } + + fn recv_track_status_request(&mut self, msg: message::TrackStatusRequest) -> Result<(), SessionError> { + let namespace = msg.track_namespace.clone(); + + let mut announces = self.announces.lock().unwrap(); + let announce = announces.get_mut(&namespace).ok_or(SessionError::Internal)?; + + let track_status_requested = TrackStatusRequested::new(self.clone(), msg); + + announce + .recv_track_status_requested(track_status_requested) + .map_err(Into::into) + } + fn recv_unsubscribe(&mut self, msg: message::Unsubscribe) -> Result<(), SessionError> { if let Some(subscribed) = self.subscribed.lock().unwrap().get_mut(&msg.id) { subscribed.recv_unsubscribe()?; diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 9438dc37..f91a449c 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -2,7 +2,7 @@ use std::ops; use crate::{ data, - message::{self, SubscribeLocation, SubscribePair}, + message::{self, FilterType, SubscribeLocation, SubscribePair}, serve::{self, ServeError, TrackWriter, TrackWriterMode}, }; @@ -47,15 +47,16 @@ impl Subscribe { track_alias: id, track_namespace: track.namespace.clone(), track_name: track.name.clone(), + filter_type: FilterType::LatestGroup, // TODO add these to the publisher. - start: SubscribePair { + start: Some(SubscribePair { group: SubscribeLocation::Latest(0), object: SubscribeLocation::Absolute(0), - }, - end: SubscribePair { + }), + end: Some(SubscribePair { group: SubscribeLocation::None, object: SubscribeLocation::None, - }, + }), params: Default::default(), }); @@ -209,6 +210,7 @@ impl SubscribeRecv { group_id: datagram.group_id, object_id: datagram.object_id, priority: datagram.send_order, + status: datagram.object_status, payload: datagram.payload, })?; diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 2ee37f31..1b40efa9 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -183,6 +183,7 @@ impl Subscribed { group_id: object.group_id, object_id: object.object_id, size: object.size, + status: object.status, }; self.state @@ -263,6 +264,7 @@ impl Subscribed { let header = data::GroupObject { object_id: object.object_id, size: object.size, + status: object.status, }; writer.encode(&header).await?; @@ -299,6 +301,8 @@ impl Subscribed { group_id: object.group_id, object_id: object.object_id, send_order: object.priority, + object_status: object.status, + }; let publisher = self.publisher.clone(); @@ -362,6 +366,7 @@ impl Subscribed { group_id: datagram.group_id, object_id: datagram.object_id, send_order: datagram.priority, + object_status: datagram.status, payload: datagram.payload, }; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 5e072fca..2833e8aa 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -83,6 +83,7 @@ impl Subscriber { message::Publisher::SubscribeOk(msg) => self.recv_subscribe_ok(msg), message::Publisher::SubscribeError(msg) => self.recv_subscribe_error(msg), message::Publisher::SubscribeDone(msg) => self.recv_subscribe_done(msg), + message::Publisher::TrackStatus(msg) => self.recv_track_status(msg), }; if let Err(SessionError::Serve(err)) = res { @@ -144,6 +145,13 @@ impl Subscriber { Ok(()) } + fn recv_track_status(&mut self, _msg: &message::TrackStatus) -> Result<(), SessionError> { + // TODO: Expose this somehow? + // TODO: Also add a way to sent a Track Status Request in the first place + + Ok(()) + } + fn drop_announce(&mut self, namespace: &str) { self.announced.lock().unwrap().remove(namespace); } diff --git a/moq-transport/src/session/track_status_requested.rs b/moq-transport/src/session/track_status_requested.rs new file mode 100644 index 00000000..75fb1ba2 --- /dev/null +++ b/moq-transport/src/session/track_status_requested.rs @@ -0,0 +1,30 @@ +use super::{Publisher, SessionError}; +use crate::message; + +#[derive(Debug, Clone)] +pub struct TrackStatusRequestedInfo { + pub namespace: String, + pub track: String, +} + +pub struct TrackStatusRequested { + publisher: Publisher, + // msg: message::TrackStatusRequest, // TODO: See if we actually need this + pub info: TrackStatusRequestedInfo, +} + +impl TrackStatusRequested { + pub fn new(publisher: Publisher, msg: message::TrackStatusRequest) -> Self { + let namespace = msg.track_namespace.clone(); + let track = msg.track_name.clone(); + Self { + publisher, + info: TrackStatusRequestedInfo { namespace, track }, + } + } + + pub async fn respond(&mut self, status: message::TrackStatus) -> Result<(), SessionError> { + self.publisher.send_message(status); + Ok(()) + } +} diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index 5d67f8c2..8ddc9913 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -18,6 +18,9 @@ impl Version { /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-03.html pub const DRAFT_03: Version = Version(0xff000003); + + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html + pub const DRAFT_04: Version = Version(0xff000004); } impl From for Version {