Skip to content

Commit

Permalink
WIP (absurd amounts of debug logs)
Browse files Browse the repository at this point in the history
  • Loading branch information
englishm committed Jul 23, 2024
1 parent 7e1728a commit d456610
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 13 deletions.
55 changes: 48 additions & 7 deletions moq-transport/src/session/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct AnnounceInfo {
pub namespace: String,
}

#[derive(Debug)]
struct AnnounceState {
subscribers: VecDeque<Subscribed>,
track_statuses_requested: VecDeque<TrackStatusRequested>,
Expand Down Expand Up @@ -86,53 +87,93 @@ impl Announce {

pub async fn subscribed(&mut self) -> Result<Option<Subscribed>, ServeError> {
loop {
log::debug!("subscribed loop");
{
log::debug!("subscribed 1");
let state = self.state.lock();
log::debug!("subscribed 2");
if !state.subscribers.is_empty() {
log::debug!("subscribed 3");
return Ok(state.into_mut().and_then(|mut state| state.subscribers.pop_front()));
}

log::debug!("subscribed 4");
state.closed.clone()?;
log::debug!("subscribed 5");
match state.modified() {
Some(notified) => notified,
None => return Ok(None),
Some(notified) => {
log::debug!("subscribed 6");
notified
},

None => {
log::debug!("subscribed 7");
return Ok(None)
},
}
}
.await;
log::debug!("subscribed 8");
}
}

pub async fn track_status_requested(&mut self) -> Result<Option<TrackStatusRequested>, ServeError> {
loop {
log::debug!("track_status_requested loop");
{
log::debug!("track_status_requested 1");
let state = self.state.lock();
log::debug!("track_status_requested 2");
if !state.track_statuses_requested.is_empty() {
log::debug!("track_status_requested 3");
return Ok(state.into_mut().and_then(|mut state| state.track_statuses_requested.pop_front()));
}
log::debug!("track_status_requested 4");

state.closed.clone()?;
log::debug!("track_status_requested 5");
match state.modified() {
Some(notified) => notified,
None => return Ok(None),
Some(notified) => {
log::debug!("track_status_requested 5.1");
dbg!(&notified);
notified
},
None => {
log::debug!("track_status_requested 5.2");
return Ok(None)
},
}
}
.await;
log::debug!("track_status_requested 6");
}
}

// Wait until an OK is received
pub async fn ok(&self) -> Result<(), ServeError> {
loop {
log::debug!("ok loop");
{
log::debug!("ok 1");
let state = self.state.lock();
log::debug!("ok 2");
if state.ok {
log::debug!("ok 3");
return Ok(());
}
log::debug!("ok 4");
state.closed.clone()?;

log::debug!("ok 5");
match state.modified() {
Some(notified) => notified,
None => return Ok(()),
Some(notified) =>
{
log::debug!("ok 6");
notified
} ,
None => {
log::debug!("ok 7");
return Ok(())
},
}
}
.await;
Expand Down
48 changes: 42 additions & 6 deletions moq-transport/src/session/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{hash_map, HashMap},
fmt::Debug,
sync::{Arc, Mutex},
};

Expand All @@ -25,6 +26,12 @@ pub struct Publisher {
outgoing: Queue<Message>,
}

impl Debug for Publisher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Publisher")
}
}

impl Publisher {
pub(crate) fn new(outgoing: Queue<Message>, webtransport: web_transport::Session) -> Self {
Self {
Expand All @@ -49,6 +56,7 @@ impl Publisher {
/// Announce a namespace and serve tracks using the provided [serve::TracksReader].
/// The caller uses [serve::TracksWriter] for static tracks and [serve::TracksRequest] for dynamic tracks.
pub async fn announce(&mut self, tracks: TracksReader) -> Result<(), SessionError> {
log::debug!("announcing!");
let announce = match self.announces.lock().unwrap().entry(tracks.namespace.clone()) {
hash_map::Entry::Occupied(_) => return Err(ServeError::Duplicate.into()),
hash_map::Entry::Vacant(entry) => {
Expand All @@ -64,9 +72,11 @@ impl Publisher {

tokio::select! {
result = self.serve_subscribes(announce_for_subscriptions, tracks) => {
log::debug!("select - subscribes");
result
},
result2 = self.serve_track_statuses(announce_for_track_status_requests, tracks_for_track_status_requests) => {
log::debug!("select - track statuses");
result2
}
}
Expand All @@ -79,14 +89,20 @@ impl Publisher {
let mut done = None;

loop {
log::debug!("serve_subscribes loop");
tokio::select! {
subscribe = {
log::debug!("DEBUG: {}:{}", file!(), line!());
let announce = announce.clone();
log::debug!("DEBUG: {}:{}", file!(), line!());
async move {
log::debug!("DEBUG: {}:{}", file!(), line!());
let mut announce = announce.lock().await;
log::debug!("DEBUG: {}:{}", file!(), line!());
announce.subscribed().await
}
}, if done.is_none() => {
log::debug!("DEBUG: {}:{}", file!(), line!());
let subscribe = match subscribe {
Ok(Some(subscribe)) => subscribe,
Ok(None) => { done = Some(Ok(())); continue },
Expand All @@ -103,7 +119,9 @@ impl Publisher {
});
},

_ = tasks.next(), if !tasks.is_empty() => {},
_ = tasks.next(), if !tasks.is_empty() => {
log::debug!("doing stuff in serve_subscribes");
},
else => return Ok(done.unwrap()?)
}
}
Expand All @@ -114,32 +132,50 @@ impl Publisher {
let mut done = None;

loop {
log::debug!("serve_track_statuses loop");
tokio::select! {
track_status_request = {
log::debug!("serve_track_statuses getting track status request");
let announce = announce.clone();
async move {
let mut announce = announce.lock().await;
announce.track_status_requested().await
}
}, if done.is_none() => {
log::debug!("done.is_none()");
let track_status_request = match track_status_request {
Ok(Some(track_status_request)) => track_status_request,
Ok(None) => { done = Some(Ok(())); continue },
Err(err) => { done = Some(Err(err)); continue },
Ok(Some(track_status_request)) => {
log::debug!("Ok(Some(track_status_request))");
track_status_request
},
Ok(None) => {
log::debug!("Ok(None)");
done = Some(Ok(())); continue },
Err(err) => {
log::debug!("Err(err)");
done = Some(Err(err)); continue },
};

log::debug!("cloning tracks...");
let tracks = tracks.clone();

log::debug!("pushing task...");
tasks.push(async move {
log::debug!("task being pushed");
let info = track_status_request.info.clone();
if let Err(err) = Self::serve_track_status_request(track_status_request, tracks).await {
log::warn!("failed serving track status request: {:?}, error: {}", info, err)
}
});
},

_ = tasks.next(), if !tasks.is_empty() => {},
else => return Ok(done.unwrap()?)
_ = tasks.next(), if !tasks.is_empty() => {
log::debug!("doing stuff in serve_track_statuses");
},
else => {
log::debug!("else");
return Ok(done.unwrap()?)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions moq-transport/src/session/subscribed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl Default for SubscribedState {
}
}

#[derive(Debug)]
pub struct Subscribed {
publisher: Publisher,
state: State<SubscribedState>,
Expand Down
1 change: 1 addition & 0 deletions moq-transport/src/session/track_status_requested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub struct TrackStatusRequestedInfo {
pub track: String,
}

#[derive(Debug)]
pub struct TrackStatusRequested {
publisher: Publisher,
// msg: message::TrackStatusRequest, // TODO: See if we actually need this
Expand Down
7 changes: 7 additions & 0 deletions moq-transport/src/watch/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct State<T> {

impl<T> State<T> {
pub fn new(initial: T) -> Self {
log::debug!("State<{}>::new", std::any::type_name::<T>());
let state = Arc::new(Mutex::new(StateInner::new(initial)));

Self {
Expand All @@ -65,6 +66,7 @@ impl<T> State<T> {
}

pub fn lock(&self) -> StateRef<T> {
log::debug!("State<{}>::lock", std::any::type_name::<T>());
StateRef {
state: self.state.clone(),
drop: self.drop.clone(),
Expand All @@ -73,6 +75,7 @@ impl<T> State<T> {
}

pub fn lock_mut(&self) -> Option<StateMut<T>> {
log::debug!("State<{}>::lock_mut", std::any::type_name::<T>());
let lock = self.state.lock().unwrap();
lock.dropped?;
Some(StateMut {
Expand Down Expand Up @@ -133,7 +136,9 @@ pub struct StateRef<'a, T> {
impl<'a, T> StateRef<'a, T> {
// Release the lock and wait for a notification when next updated.
pub fn modified(self) -> Option<StateChanged<T>> {
log::debug!("StateRef::modified 1");
self.lock.dropped?;
log::debug!("StateRef::modified 2");

Some(StateChanged {
state: self.state,
Expand Down Expand Up @@ -196,6 +201,7 @@ impl<'a, T: fmt::Debug> fmt::Debug for StateMut<'a, T> {
}
}

#[derive(Debug)]
pub struct StateChanged<T> {
state: Arc<Mutex<StateInner<T>>>,
epoch: usize,
Expand All @@ -209,6 +215,7 @@ impl<T> Future for StateChanged<T> {
let mut state = self.state.lock().unwrap();

if state.epoch > self.epoch {
log::debug!("StateChanged::poll: ready");
task::Poll::Ready(())
} else {
state.register(cx.waker());
Expand Down

0 comments on commit d456610

Please sign in to comment.