From 3c49eeb9f263c872710c22b17328dcdb550ce9c9 Mon Sep 17 00:00:00 2001 From: Ryan McGrath Date: Mon, 14 Aug 2023 09:30:24 -0700 Subject: [PATCH 1/2] Move completion network call into its own background thread for now - this can be refactored into something nicer as this grows --- .../game-reporter/src/lib.rs | 95 +++++++++++++++--- .../game-reporter/src/queue.rs | 98 +++++++++++++------ 2 files changed, 145 insertions(+), 48 deletions(-) diff --git a/Externals/SlippiRustExtensions/game-reporter/src/lib.rs b/Externals/SlippiRustExtensions/game-reporter/src/lib.rs index 1670450ee8..d8099b64a1 100644 --- a/Externals/SlippiRustExtensions/game-reporter/src/lib.rs +++ b/Externals/SlippiRustExtensions/game-reporter/src/lib.rs @@ -22,6 +22,19 @@ pub(crate) enum ProcessingEvent { Shutdown, } +/// Used to pass completion event data to a background processing thread. +#[derive(Clone, Debug)] +pub(crate) enum CompletionEvent { + ReportAvailable { + uid: String, + play_key: String, + match_id: String, + end_mode: u8, + }, + + Shutdown, +} + /// The public interface for the game reporter service. This handles managing any /// necessary background threads and provides hooks for instrumenting the reporting /// process. @@ -32,8 +45,10 @@ pub(crate) enum ProcessingEvent { #[derive(Debug)] pub struct SlippiGameReporter { iso_md5_hasher_thread: Option>, - processing_thread: Option>, - processing_thread_notifier: Sender, + queue_thread: Option>, + queue_thread_notifier: Sender, + completion_thread: Option>, + completion_thread_notifier: Sender, queue: GameReporterQueue, replay_data: Vec, } @@ -61,21 +76,33 @@ impl SlippiGameReporter { }) .expect("Failed to spawn SlippiGameReporterISOHasherThread."); - let (sender, receiver) = mpsc::channel(); - let processing_thread_queue_handle = queue.clone(); + let (queue_sender, queue_receiver) = mpsc::channel(); + let queue_thread_queue_handle = queue.clone(); + + let queue_thread = thread::Builder::new() + .name("SlippiGameReporterQueueProcessingThread".into()) + .spawn(move || { + queue::run(queue_thread_queue_handle, queue_receiver); + }) + .expect("Failed to spawn SlippiGameReporterQueueProcessingThread."); + + let (completion_sender, completion_receiver) = mpsc::channel(); + let completion_http_handle = queue.http_client.clone(); - let processing_thread = thread::Builder::new() - .name("SlippiGameReporterProcessingThread".into()) + let completion_thread = thread::Builder::new() + .name("SlippiGameReporterCompletionProcessingThread".into()) .spawn(move || { - queue::run(processing_thread_queue_handle, receiver); + queue::run_completion(completion_http_handle, completion_receiver); }) - .expect("Failed to spawn SlippiGameReporterProcessingThread."); + .expect("Failed to spawn SlippiGameReporterCompletionProcessingThread."); Self { queue, replay_data: Vec::new(), - processing_thread_notifier: sender, - processing_thread: Some(processing_thread), + queue_thread_notifier: queue_sender, + queue_thread: Some(queue_thread), + completion_thread_notifier: completion_sender, + completion_thread: Some(completion_thread), iso_md5_hasher_thread: Some(iso_md5_hasher_thread), } } @@ -101,7 +128,7 @@ impl SlippiGameReporter { report.replay_data = std::mem::replace(&mut self.replay_data, Vec::new()); self.queue.add_report(report); - if let Err(e) = self.processing_thread_notifier.send(ProcessingEvent::ReportAvailable) { + if let Err(e) = self.queue_thread_notifier.send(ProcessingEvent::ReportAvailable) { tracing::error!( target: Log::GameReporter, error = ?e, @@ -109,6 +136,24 @@ impl SlippiGameReporter { ); } } + + /// Dispatches a completion report to a background processing thread. + pub fn report_completion(&self, uid: String, play_key: String, match_id: String, end_mode: u8) { + let event = CompletionEvent::ReportAvailable { + uid, + play_key, + match_id, + end_mode, + }; + + if let Err(e) = self.completion_thread_notifier.send(event) { + tracing::error!( + target: Log::GameReporter, + error = ?e, + "Unable to dispatch match completion notification" + ); + } + } } impl Deref for SlippiGameReporter { @@ -125,20 +170,38 @@ impl Drop for SlippiGameReporter { /// Joins the background threads when we're done, logging if /// any errors are encountered. fn drop(&mut self) { - if let Some(processing_thread) = self.processing_thread.take() { - if let Err(e) = self.processing_thread_notifier.send(ProcessingEvent::Shutdown) { + if let Some(queue_thread) = self.queue_thread.take() { + if let Err(e) = self.queue_thread_notifier.send(ProcessingEvent::Shutdown) { + tracing::error!( + target: Log::GameReporter, + error = ?e, + "Failed to send shutdown notification to queue processing thread, may hang" + ); + } + + if let Err(e) = queue_thread.join() { + tracing::error!( + target: Log::GameReporter, + error = ?e, + "Queue thread failure" + ); + } + } + + if let Some(completion_thread) = self.completion_thread.take() { + if let Err(e) = self.completion_thread_notifier.send(CompletionEvent::Shutdown) { tracing::error!( target: Log::GameReporter, error = ?e, - "Failed to send shutdown notification to report processing thread, may hang" + "Failed to send shutdown notification to completion processing thread, may hang" ); } - if let Err(e) = processing_thread.join() { + if let Err(e) = completion_thread.join() { tracing::error!( target: Log::GameReporter, error = ?e, - "Processing thread failure" + "Completion thread failure" ); } } diff --git a/Externals/SlippiRustExtensions/game-reporter/src/queue.rs b/Externals/SlippiRustExtensions/game-reporter/src/queue.rs index ddab403aca..befd9760e6 100644 --- a/Externals/SlippiRustExtensions/game-reporter/src/queue.rs +++ b/Externals/SlippiRustExtensions/game-reporter/src/queue.rs @@ -11,7 +11,7 @@ use serde_json::{json, Value}; use dolphin_integrations::{Color, Dolphin, Duration as OSDDuration, Log}; use crate::types::{GameReport, GameReportRequestPayload, OnlinePlayMode}; -use crate::ProcessingEvent; +use crate::{CompletionEvent, ProcessingEvent}; use flate2::write::GzEncoder; use flate2::Compression; @@ -77,37 +77,6 @@ impl GameReporterQueue { } } - /// Report a completed match. - /// - /// This doesn't necessarily need to be here, but it's easier to grok the codebase - /// if we keep all reporting network calls in one module. - pub fn report_completion(&self, uid: String, play_key: String, match_id: String, end_mode: u8) { - let mutation = r#" - mutation ($report: OnlineGameCompleteInput!) { - completeOnlineGame (report: $report) - } - "#; - - let variables = Some(json!({ - "report": { - "matchId": match_id, - "fbUid": uid, - "playKey": play_key, - "endMode": end_mode, - } - })); - - let res = execute_graphql_query(&self.http_client, mutation, variables, Some("completeOnlineGame")); - - match res { - Ok(value) if value == "true" => { - tracing::info!(target: Log::GameReporter, "Successfully executed completion request") - }, - Ok(value) => tracing::error!(target: Log::GameReporter, ?value, "Error executing completion request",), - Err(error) => tracing::error!(target: Log::GameReporter, ?error, "Error executing completion request"), - } - } - /// Report an abandoned match. /// /// This doesn't necessarily need to be here, but it's easier to grok the codebase @@ -139,6 +108,71 @@ impl GameReporterQueue { } } +pub(crate) fn run_completion(http_client: ureq::Agent, receiver: Receiver) { + loop { + // Watch for notification to do work + match receiver.recv() { + Ok(CompletionEvent::ReportAvailable { + uid, + play_key, + match_id, + end_mode, + }) => { + report_completion(&http_client, uid, play_key, match_id, end_mode); + }, + + Ok(CompletionEvent::Shutdown) => { + tracing::info!(target: Log::GameReporter, "Completion thread winding down"); + break; + }, + + // This should realistically never happen, since it means the Sender + // that's held a level up has been dropped entirely - but we'll log + // for the hell of it in case anyone's tweaking the logic. + Err(error) => { + tracing::error!( + target: Log::GameReporter, + ?error, + "Failed to receive CompletionEvent, thread will exit" + ); + + break; + }, + } + } +} + +/// Report a completed match. +/// +/// This doesn't necessarily need to be here, but it's easier to grok the codebase +/// if we keep all reporting network calls in one module. +pub fn report_completion(http_client: &ureq::Agent, uid: String, match_id: String, play_key: String, end_mode: u8) { + let mutation = r#" + mutation ($report: OnlineGameCompleteInput!) { + completeOnlineGame (report: $report) + } + "#; + + let variables = Some(json!({ + "report": { + "matchId": match_id, + "fbUid": uid, + "playKey": play_key, + "endMode": end_mode, + } + })); + + let res = execute_graphql_query(http_client, mutation, variables, Some("completeOnlineGame")); + + match res { + Ok(value) if value == "true" => { + tracing::info!(target: Log::GameReporter, "Successfully executed completion request") + }, + Ok(value) => tracing::error!(target: Log::GameReporter, ?value, "Error executing completion request",), + Err(error) => tracing::error!(target: Log::GameReporter, ?error, "Error executing completion request"), + } +} + /// The main loop that processes reports. pub(crate) fn run(reporter: GameReporterQueue, receiver: Receiver) { loop { From 40a1f203a442e1dd201f67d386d2c39c2f09110c Mon Sep 17 00:00:00 2001 From: Jas Laferriere Date: Mon, 14 Aug 2023 13:28:15 -0400 Subject: [PATCH 2/2] fix: arg order on report completion --- Externals/SlippiRustExtensions/game-reporter/src/queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Externals/SlippiRustExtensions/game-reporter/src/queue.rs b/Externals/SlippiRustExtensions/game-reporter/src/queue.rs index befd9760e6..664df7a97a 100644 --- a/Externals/SlippiRustExtensions/game-reporter/src/queue.rs +++ b/Externals/SlippiRustExtensions/game-reporter/src/queue.rs @@ -118,7 +118,7 @@ pub(crate) fn run_completion(http_client: ureq::Agent, receiver: Receiver { - report_completion(&http_client, uid, play_key, match_id, end_mode); + report_completion(&http_client, uid, match_id, play_key, end_mode); }, Ok(CompletionEvent::Shutdown) => {