Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[game-reporter] Move completion network call into a background thread #398

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 79 additions & 16 deletions Externals/SlippiRustExtensions/game-reporter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ pub(crate) enum ProcessingEvent {
Shutdown,
}

/// Used to pass completion event data to a background processing thread.
#[derive(Clone, Debug)]
pub(crate) enum CompletionEvent {
ReportAvailable {
uid: String,
play_key: String,
match_id: String,
end_mode: u8,
},

Shutdown,
}

/// The public interface for the game reporter service. This handles managing any
/// necessary background threads and provides hooks for instrumenting the reporting
/// process.
Expand All @@ -32,8 +45,10 @@ pub(crate) enum ProcessingEvent {
#[derive(Debug)]
pub struct SlippiGameReporter {
iso_md5_hasher_thread: Option<thread::JoinHandle<()>>,
processing_thread: Option<thread::JoinHandle<()>>,
processing_thread_notifier: Sender<ProcessingEvent>,
queue_thread: Option<thread::JoinHandle<()>>,
queue_thread_notifier: Sender<ProcessingEvent>,
completion_thread: Option<thread::JoinHandle<()>>,
completion_thread_notifier: Sender<CompletionEvent>,
queue: GameReporterQueue,
replay_data: Vec<u8>,
}
Expand Down Expand Up @@ -61,21 +76,33 @@ impl SlippiGameReporter {
})
.expect("Failed to spawn SlippiGameReporterISOHasherThread.");

let (sender, receiver) = mpsc::channel();
let processing_thread_queue_handle = queue.clone();
let (queue_sender, queue_receiver) = mpsc::channel();
let queue_thread_queue_handle = queue.clone();

let queue_thread = thread::Builder::new()
.name("SlippiGameReporterQueueProcessingThread".into())
.spawn(move || {
queue::run(queue_thread_queue_handle, queue_receiver);
})
.expect("Failed to spawn SlippiGameReporterQueueProcessingThread.");

let (completion_sender, completion_receiver) = mpsc::channel();
let completion_http_handle = queue.http_client.clone();

let processing_thread = thread::Builder::new()
.name("SlippiGameReporterProcessingThread".into())
let completion_thread = thread::Builder::new()
.name("SlippiGameReporterCompletionProcessingThread".into())
.spawn(move || {
queue::run(processing_thread_queue_handle, receiver);
queue::run_completion(completion_http_handle, completion_receiver);
})
.expect("Failed to spawn SlippiGameReporterProcessingThread.");
.expect("Failed to spawn SlippiGameReporterCompletionProcessingThread.");

Self {
queue,
replay_data: Vec::new(),
processing_thread_notifier: sender,
processing_thread: Some(processing_thread),
queue_thread_notifier: queue_sender,
queue_thread: Some(queue_thread),
completion_thread_notifier: completion_sender,
completion_thread: Some(completion_thread),
iso_md5_hasher_thread: Some(iso_md5_hasher_thread),
}
}
Expand All @@ -101,14 +128,32 @@ impl SlippiGameReporter {
report.replay_data = std::mem::replace(&mut self.replay_data, Vec::new());
self.queue.add_report(report);

if let Err(e) = self.processing_thread_notifier.send(ProcessingEvent::ReportAvailable) {
if let Err(e) = self.queue_thread_notifier.send(ProcessingEvent::ReportAvailable) {
tracing::error!(
target: Log::GameReporter,
error = ?e,
"Unable to dispatch ReportAvailable notification"
);
}
}

/// Dispatches a completion report to a background processing thread.
pub fn report_completion(&self, uid: String, play_key: String, match_id: String, end_mode: u8) {
let event = CompletionEvent::ReportAvailable {
uid,
play_key,
match_id,
end_mode,
};

if let Err(e) = self.completion_thread_notifier.send(event) {
tracing::error!(
target: Log::GameReporter,
error = ?e,
"Unable to dispatch match completion notification"
);
}
}
}

impl Deref for SlippiGameReporter {
Expand All @@ -125,20 +170,38 @@ impl Drop for SlippiGameReporter {
/// Joins the background threads when we're done, logging if
/// any errors are encountered.
fn drop(&mut self) {
if let Some(processing_thread) = self.processing_thread.take() {
if let Err(e) = self.processing_thread_notifier.send(ProcessingEvent::Shutdown) {
if let Some(queue_thread) = self.queue_thread.take() {
if let Err(e) = self.queue_thread_notifier.send(ProcessingEvent::Shutdown) {
tracing::error!(
target: Log::GameReporter,
error = ?e,
"Failed to send shutdown notification to queue processing thread, may hang"
);
}

if let Err(e) = queue_thread.join() {
tracing::error!(
target: Log::GameReporter,
error = ?e,
"Queue thread failure"
);
}
}

if let Some(completion_thread) = self.completion_thread.take() {
if let Err(e) = self.completion_thread_notifier.send(CompletionEvent::Shutdown) {
tracing::error!(
target: Log::GameReporter,
error = ?e,
"Failed to send shutdown notification to report processing thread, may hang"
"Failed to send shutdown notification to completion processing thread, may hang"
);
}

if let Err(e) = processing_thread.join() {
if let Err(e) = completion_thread.join() {
tracing::error!(
target: Log::GameReporter,
error = ?e,
"Processing thread failure"
"Completion thread failure"
);
}
}
Expand Down
98 changes: 66 additions & 32 deletions Externals/SlippiRustExtensions/game-reporter/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde_json::{json, Value};
use dolphin_integrations::{Color, Dolphin, Duration as OSDDuration, Log};

use crate::types::{GameReport, GameReportRequestPayload, OnlinePlayMode};
use crate::ProcessingEvent;
use crate::{CompletionEvent, ProcessingEvent};

use flate2::write::GzEncoder;
use flate2::Compression;
Expand Down Expand Up @@ -77,37 +77,6 @@ impl GameReporterQueue {
}
}

/// Report a completed match.
///
/// This doesn't necessarily need to be here, but it's easier to grok the codebase
/// if we keep all reporting network calls in one module.
pub fn report_completion(&self, uid: String, play_key: String, match_id: String, end_mode: u8) {
let mutation = r#"
mutation ($report: OnlineGameCompleteInput!) {
completeOnlineGame (report: $report)
}
"#;

let variables = Some(json!({
"report": {
"matchId": match_id,
"fbUid": uid,
"playKey": play_key,
"endMode": end_mode,
}
}));

let res = execute_graphql_query(&self.http_client, mutation, variables, Some("completeOnlineGame"));

match res {
Ok(value) if value == "true" => {
tracing::info!(target: Log::GameReporter, "Successfully executed completion request")
},
Ok(value) => tracing::error!(target: Log::GameReporter, ?value, "Error executing completion request",),
Err(error) => tracing::error!(target: Log::GameReporter, ?error, "Error executing completion request"),
}
}

/// Report an abandoned match.
///
/// This doesn't necessarily need to be here, but it's easier to grok the codebase
Expand Down Expand Up @@ -139,6 +108,71 @@ impl GameReporterQueue {
}
}

pub(crate) fn run_completion(http_client: ureq::Agent, receiver: Receiver<CompletionEvent>) {
loop {
// Watch for notification to do work
match receiver.recv() {
Ok(CompletionEvent::ReportAvailable {
uid,
play_key,
match_id,
end_mode,
}) => {
report_completion(&http_client, uid, match_id, play_key, end_mode);
},

Ok(CompletionEvent::Shutdown) => {
tracing::info!(target: Log::GameReporter, "Completion thread winding down");
break;
},

// This should realistically never happen, since it means the Sender
// that's held a level up has been dropped entirely - but we'll log
// for the hell of it in case anyone's tweaking the logic.
Err(error) => {
tracing::error!(
target: Log::GameReporter,
?error,
"Failed to receive CompletionEvent, thread will exit"
);

break;
},
}
}
}

/// Report a completed match.
///
/// This doesn't necessarily need to be here, but it's easier to grok the codebase
/// if we keep all reporting network calls in one module.
pub fn report_completion(http_client: &ureq::Agent, uid: String, match_id: String, play_key: String, end_mode: u8) {
let mutation = r#"
mutation ($report: OnlineGameCompleteInput!) {
completeOnlineGame (report: $report)
}
"#;

let variables = Some(json!({
"report": {
"matchId": match_id,
"fbUid": uid,
"playKey": play_key,
"endMode": end_mode,
}
}));

let res = execute_graphql_query(http_client, mutation, variables, Some("completeOnlineGame"));

match res {
Ok(value) if value == "true" => {
tracing::info!(target: Log::GameReporter, "Successfully executed completion request")
},
Ok(value) => tracing::error!(target: Log::GameReporter, ?value, "Error executing completion request",),
Err(error) => tracing::error!(target: Log::GameReporter, ?error, "Error executing completion request"),
}
}

/// The main loop that processes reports.
pub(crate) fn run(reporter: GameReporterQueue, receiver: Receiver<ProcessingEvent>) {
loop {
Expand Down