From c8953f95fb56ae7427b80ec4015f3bb2062c8363 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 8 Dec 2023 14:28:02 +0000 Subject: [PATCH 1/3] Add an endpoint to stream logs --- Cargo.lock | 2 ++ crates/librqbit/Cargo.toml | 2 ++ crates/librqbit/src/api.rs | 26 ++++++++++++++++- crates/librqbit/src/http_api.rs | 16 ++++++++-- crates/librqbit/src/lib.rs | 1 + crates/librqbit/src/log_subscriber.rs | 42 +++++++++++++++++++++++++++ crates/rqbit/Cargo.toml | 3 +- crates/rqbit/src/main.rs | 41 ++++++++++++++++++++------ 8 files changed, 119 insertions(+), 14 deletions(-) create mode 100644 crates/librqbit/src/log_subscriber.rs diff --git a/Cargo.lock b/Cargo.lock index 6cc5cf2f..277a6434 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1262,6 +1262,7 @@ dependencies = [ "bincode", "bitvec", "byteorder", + "bytes", "crypto-hash", "dashmap", "futures", @@ -2008,6 +2009,7 @@ name = "rqbit" version = "5.0.0" dependencies = [ "anyhow", + "bytes", "clap", "console-subscriber", "futures", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 10230782..633a3c6c 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -55,6 +55,7 @@ rand = "0.8" openssl = {version="0.10", optional=true} crypto-hash = {version="0.3", optional=true} sha1 = {version = "0.10", optional=true} +tracing-subscriber = {version = "0.3", default-features = false} uuid = {version = "1.2", features = ["v4"]} futures = "0.3" @@ -65,6 +66,7 @@ dashmap = "5.5.3" base64 = "0.21.5" serde_with = "3.4.0" tokio-util = "0.7.10" +bytes = "1.5.0" [dev-dependencies] futures = {version = "0.3"} diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 3c9454d1..6afb6906 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -2,11 +2,14 @@ use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; use buffers::ByteString; +use bytes::Bytes; use dht::{DhtStats, Id20}; +use futures::Stream; use http::StatusCode; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::UnboundedSender; +use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; use tracing::warn; use crate::{ @@ -30,13 +33,19 @@ pub type Result = std::result::Result; pub struct Api { session: Arc, rust_log_reload_tx: Option>, + line_rx: Option>>, } impl Api { - pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { + pub fn new( + session: Arc, + rust_log_reload_tx: Option>, + line_rx: Option>, + ) -> Self { Self { session, rust_log_reload_tx, + line_rx: line_rx.map(Arc::new), } } @@ -123,6 +132,21 @@ impl Api { Ok(Default::default()) } + pub fn api_log_lines_stream( + &self, + ) -> Result< + impl Stream> + + Send + + Sync + + 'static, + > { + Ok(self + .line_rx + .as_ref() + .map(|rx| BroadcastStream::new(rx.resubscribe())) + .context("line_rx wasn't set")?) + } + pub async fn api_add_torrent( &self, add: AddTorrent<'_>, diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 7d9f6f8b..9ea6be0e 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -3,6 +3,7 @@ use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use axum::routing::{get, post}; +use futures::StreamExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -25,15 +26,18 @@ type ApiState = Arc; use crate::api::Result; /// An HTTP server for the API. -#[derive(Clone)] pub struct HttpApi { inner: ApiState, } impl HttpApi { - pub fn new(session: Arc, rust_log_reload_tx: Option>) -> Self { + pub fn new( + session: Arc, + rust_log_reload_tx: Option>, + line_rx: Option>, + ) -> Self { Self { - inner: Arc::new(Api::new(session, rust_log_reload_tx)), + inner: Arc::new(Api::new(session, rust_log_reload_tx, line_rx)), } } @@ -185,8 +189,14 @@ impl HttpApi { state.api_set_rust_log(new_value).map(axum::Json) } + async fn stream_logs(State(state): State) -> Result { + let s = state.api_log_lines_stream()?; + Ok(axum::body::Body::from_stream(s)) + } + let mut app = Router::new() .route("/", get(api_root)) + .route("/stream_logs", get(stream_logs)) .route("/rust_log", post(set_rust_log)) .route("/dht/stats", get(dht_stats)) .route("/dht/table", get(dht_table)) diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 76e6e254..767372a3 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -29,6 +29,7 @@ mod dht_utils; mod file_ops; pub mod http_api; pub mod http_api_client; +pub mod log_subscriber; mod peer_connection; mod peer_info_reader; mod session; diff --git a/crates/librqbit/src/log_subscriber.rs b/crates/librqbit/src/log_subscriber.rs new file mode 100644 index 00000000..b2112132 --- /dev/null +++ b/crates/librqbit/src/log_subscriber.rs @@ -0,0 +1,42 @@ +use std::io::LineWriter; + +use bytes::Bytes; +use tracing_subscriber::fmt::MakeWriter; + +pub struct Subscriber { + tx: tokio::sync::broadcast::Sender, +} + +pub struct Writer { + tx: tokio::sync::broadcast::Sender, +} + +impl Subscriber { + pub fn new() -> (Self, tokio::sync::broadcast::Receiver) { + let (tx, rx) = tokio::sync::broadcast::channel(100); + (Self { tx }, rx) + } +} + +impl<'a> MakeWriter<'a> for Subscriber { + type Writer = LineWriter; + + fn make_writer(&self) -> Self::Writer { + LineWriter::new(Writer { + tx: self.tx.clone(), + }) + } +} + +impl std::io::Write for Writer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let len = buf.len(); + let arc = buf.to_vec().into(); + let _ = self.tx.send(arc); + Ok(len) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index e3eec02e..29cb24bf 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -38,6 +38,7 @@ reqwest = "0.11" serde = {version = "1", features=["derive"]} serde_json = "1" size_format = "1" +bytes = "1.5.0" [dev-dependencies] -futures = {version = "0.3"} \ No newline at end of file +futures = {version = "0.3"} diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 45bfe809..2bf63bb3 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -178,8 +178,13 @@ enum SubCommand { Download(DownloadOpts), } +struct InitLoggingResult { + rust_log_reload_tx: tokio::sync::mpsc::UnboundedSender, + line_rx: tokio::sync::broadcast::Receiver, +} + // Init logging and make a channel to send new RUST_LOG values to. -fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender { +fn init_logging(opts: &Opts) -> InitLoggingResult { let default_rust_log = match opts.log_level.as_ref() { Some(level) => match level { LogLevel::Trace => "trace", @@ -204,6 +209,8 @@ fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender { use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + let (line_sub, line_rx) = librqbit::log_subscriber::Subscriber::new(); + #[cfg(feature = "tokio-console")] { let (console_layer, server) = console_subscriber::Builder::default() @@ -230,7 +237,14 @@ fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender { #[cfg(not(feature = "tokio-console"))] { - let layered = tracing_subscriber::registry().with(fmt::layer().with_filter(stderr_filter)); + let layered = tracing_subscriber::registry() + .with(fmt::layer().with_filter(stderr_filter)) + .with( + fmt::layer() + .with_ansi(false) + .with_writer(line_sub) + .with_filter(EnvFilter::builder().parse("info").unwrap()), + ); if let Some(log_file) = &opts.log_file { let log_file = log_file.clone(); let log_file = move || { @@ -276,7 +290,10 @@ fn init_logging(opts: &Opts) -> tokio::sync::mpsc::UnboundedSender { Ok(()) }, ); - reload_tx + InitLoggingResult { + rust_log_reload_tx: reload_tx, + line_rx, + } } fn _start_deadlock_detector_thread() { @@ -332,7 +349,7 @@ fn main() -> anyhow::Result<()> { } async fn async_main(opts: Opts) -> anyhow::Result<()> { - let logging_reload_tx = init_logging(&opts); + let log_config = init_logging(&opts); let mut sopts = SessionOptions { disable_dht: opts.disable_dht, @@ -427,7 +444,11 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { trace_span!("stats_printer"), stats_printer(session.clone()), ); - let http_api = HttpApi::new(session, Some(logging_reload_tx)); + let http_api = HttpApi::new( + session, + Some(log_config.rust_log_reload_tx), + Some(log_config.line_rx), + ); let http_api_listen_addr = opts.http_api_listen_addr; http_api .make_http_api_and_run(http_api_listen_addr, false) @@ -507,14 +528,16 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { trace_span!("stats_printer"), stats_printer(session.clone()), ); - let http_api = HttpApi::new(session.clone(), Some(logging_reload_tx)); + let http_api = HttpApi::new( + session.clone(), + Some(log_config.rust_log_reload_tx), + Some(log_config.line_rx), + ); let http_api_listen_addr = opts.http_api_listen_addr; librqbit_spawn( "http_api", error_span!("http_api"), - http_api - .clone() - .make_http_api_and_run(http_api_listen_addr, true), + http_api.make_http_api_and_run(http_api_listen_addr, true), ); let mut added = false; From 441387e4b502aaf39a518960f60ff99d01b1fa56 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 8 Dec 2023 19:28:48 +0000 Subject: [PATCH 2/3] Display logs in desktop app --- crates/librqbit/src/http_api.rs | 62 +++---- crates/librqbit/src/log_subscriber.rs | 4 +- crates/librqbit/webui/src/api-types.ts | 1 + .../webui/src/components/LogStream.tsx | 175 ++++++++++++++++++ .../webui/src/components/LogStreamModal.tsx | 37 ++++ .../webui/src/components/RootContent.tsx | 4 +- crates/librqbit/webui/src/http-api.ts | 1 + crates/rqbit/src/main.rs | 1 + desktop/src-tauri/Cargo.lock | 2 + desktop/src-tauri/src/config.rs | 9 + desktop/src-tauri/src/main.rs | 66 +++++-- desktop/src/api.tsx | 84 +++++---- desktop/src/configuration.tsx | 1 + desktop/src/configure.tsx | 9 + desktop/src/main.tsx | 14 +- desktop/src/rqbit-desktop.tsx | 32 ++-- 16 files changed, 391 insertions(+), 111 deletions(-) create mode 100644 crates/librqbit/webui/src/components/LogStream.tsx create mode 100644 crates/librqbit/webui/src/components/LogStreamModal.tsx diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 9ea6be0e..face7db6 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -3,51 +3,48 @@ use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use axum::routing::{get, post}; -use futures::StreamExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::str::FromStr; -use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc::UnboundedSender; -use tracing::info; +use tracing::{info, warn}; use axum::Router; use crate::api::Api; use crate::peer_connection::PeerConnectionOptions; -use crate::session::{AddTorrent, AddTorrentOptions, Session, SUPPORTED_SCHEMES}; +use crate::session::{AddTorrent, AddTorrentOptions, SUPPORTED_SCHEMES}; use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter; -type ApiState = Arc; +type ApiState = Api; use crate::api::Result; /// An HTTP server for the API. pub struct HttpApi { inner: ApiState, + opts: HttpApiOptions, +} + +#[derive(Debug, Default)] +pub struct HttpApiOptions { + pub cors_enable_all: bool, + pub read_only: bool, } impl HttpApi { - pub fn new( - session: Arc, - rust_log_reload_tx: Option>, - line_rx: Option>, - ) -> Self { + pub fn new(api: Api, opts: Option) -> Self { Self { - inner: Arc::new(Api::new(session, rust_log_reload_tx, line_rx)), + inner: api, + opts: opts.unwrap_or_default(), } } /// Run the HTTP server forever on the given address. /// If read_only is passed, no state-modifying methods will be exposed. - pub async fn make_http_api_and_run( - self, - addr: SocketAddr, - read_only: bool, - ) -> anyhow::Result<()> { + pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { let state = self.inner; async fn api_root() -> impl IntoResponse { @@ -207,7 +204,7 @@ impl HttpApi { .route("/torrents/:id/stats/v1", get(torrent_stats_v1)) .route("/torrents/:id/peer_stats", get(peer_stats)); - if !read_only { + if !self.opts.read_only { app = app .route("/torrents", post(torrents_post)) .route("/torrents/:id/pause", post(torrent_action_pause)) @@ -218,7 +215,6 @@ impl HttpApi { #[cfg(feature = "webui")] { - use tracing::warn; let webui_router = Router::new() .route( "/", @@ -248,23 +244,25 @@ impl HttpApi { }), ); - // This is to develop webui by just doing "open index.html && tsc --watch" - let cors_layer = std::env::var("CORS_DEBUG") - .ok() - .map(|_| { - use tower_http::cors::{AllowHeaders, AllowOrigin}; + app = app.nest("/web/", webui_router); + } - warn!("CorsLayer: allowing everything because CORS_DEBUG is set"); - tower_http::cors::CorsLayer::default() - .allow_origin(AllowOrigin::predicate(|_, _| true)) - .allow_headers(AllowHeaders::any()) - }) - .unwrap_or_default(); + let enable_cors = std::env::var("CORS_DEBUG").is_ok() || self.opts.cors_enable_all; - app = app.nest("/web/", webui_router).layer(cors_layer); - } + // This is to develop webui by just doing "open index.html && tsc --watch" + let cors_layer = if enable_cors { + use tower_http::cors::{AllowHeaders, AllowOrigin}; + + warn!("CorsLayer: allowing everything"); + tower_http::cors::CorsLayer::default() + .allow_origin(AllowOrigin::predicate(|_, _| true)) + .allow_headers(AllowHeaders::any()) + } else { + Default::default() + }; let app = app + .layer(cors_layer) .layer(tower_http::trace::TraceLayer::new_for_http()) .with_state(state) .into_make_service(); diff --git a/crates/librqbit/src/log_subscriber.rs b/crates/librqbit/src/log_subscriber.rs index b2112132..48ed8e79 100644 --- a/crates/librqbit/src/log_subscriber.rs +++ b/crates/librqbit/src/log_subscriber.rs @@ -11,8 +11,10 @@ pub struct Writer { tx: tokio::sync::broadcast::Sender, } +pub type LineRx = tokio::sync::broadcast::Receiver; + impl Subscriber { - pub fn new() -> (Self, tokio::sync::broadcast::Receiver) { + pub fn new() -> (Self, LineRx) { let (tx, rx) = tokio::sync::broadcast::channel(100); (Self { tx }, rx) } diff --git a/crates/librqbit/webui/src/api-types.ts b/crates/librqbit/webui/src/api-types.ts index cf6956b3..958055a9 100644 --- a/crates/librqbit/webui/src/api-types.ts +++ b/crates/librqbit/webui/src/api-types.ts @@ -117,6 +117,7 @@ export interface AddTorrentOptions { } export interface RqbitAPI { + getHttpBaseUrl: () => string | null; listTorrents: () => Promise; getTorrentDetails: (index: number) => Promise; getTorrentStats: (index: number) => Promise; diff --git a/crates/librqbit/webui/src/components/LogStream.tsx b/crates/librqbit/webui/src/components/LogStream.tsx new file mode 100644 index 00000000..0091011b --- /dev/null +++ b/crates/librqbit/webui/src/components/LogStream.tsx @@ -0,0 +1,175 @@ +import React, { useEffect, useState } from "react"; +import { ErrorWithLabel } from "../rqbit-web"; +import { ErrorComponent } from "./ErrorComponent"; + +interface LogStreamProps { + httpApiBase: string; + maxLines?: number; +} + +interface Line { + id: number; + content: string; +} + +const mergeBuffers = (a1: Uint8Array, a2: Uint8Array): Uint8Array => { + const merged = new Uint8Array(a1.length + a2.length); + merged.set(a1); + merged.set(a2, a1.length); + return merged; +}; + +const streamLogs = ( + httpApiBase: string, + addLine: (text: string) => void, + setError: (error: ErrorWithLabel) => void +): (() => void) => { + const controller = new AbortController(); + const signal = controller.signal; + + const cancel = () => { + console.log("cancelling fetch"); + controller.abort(); + }; + + const run = async () => { + let response = null; + try { + response = await fetch(httpApiBase + "/stream_logs", { signal }); + } catch (e: any) { + setError({ + text: "network error fetching logs", + details: { + text: e.toString(), + }, + }); + return null; + } + + if (!response.ok) { + let text = await response.text(); + setError({ + text: "error fetching logs", + details: { + statusText: response.statusText, + text, + }, + }); + } + + if (!response.body) { + setError({ + text: "error fetching logs: ReadableStream not supported.", + }); + throw new Error("ReadableStream not supported."); + } + + const reader = response.body.getReader(); + + let buffer = new Uint8Array(); + while (true) { + const { done, value } = await reader.read(); + + if (done) { + // Handle stream completion or errors + break; + } + + buffer = mergeBuffers(buffer, value); + + while (true) { + const newLineIdx = buffer.indexOf(10); + if (newLineIdx === -1) { + break; + } + let lineBytes = buffer.slice(0, newLineIdx); + let line = new TextDecoder().decode(lineBytes); + addLine(line); + buffer = buffer.slice(newLineIdx + 1); + } + } + }; + run(); + + return cancel; +}; + +const SplitByLevelRegexp = new RegExp( + /(.*?) +(INFO|WARN|TRACE|ERROR|DEBUG) +(.*)/ +); + +const LogLine = ({ line }: { line: string }) => { + line.split; + const getClassNameByLevel = (level: string) => { + switch (level) { + case "INFO": + return "text-success"; + case "WARN": + return "text-warning"; + case "ERROR": + return "text-danger"; + case "DEBUG": + return "text-primary"; + default: + return "text-secondary"; + } + }; + + const getContent = () => { + let match = line.match(SplitByLevelRegexp); + if (!match) { + return line; + } + const [beforeLevel, level, afterLevel] = match.slice(1); + return ( + <> + {beforeLevel} + {level} + {afterLevel} + + ); + }; + + return ( +

+ {getContent()} +

+ ); +}; + +export const LogStream: React.FC = ({ + httpApiBase, + maxLines, +}) => { + const [logLines, setLogLines] = useState([]); + const [error, setError] = useState(null); + const maxL = maxLines ?? 1000; + + const addLine = (text: string) => { + setLogLines((logLines: Line[]) => { + const nextLineId = logLines.length == 0 ? 0 : logLines[0].id + 1; + + let newLogLines = [ + { + id: nextLineId, + content: text, + }, + ...logLines.slice(0, maxL - 1), + ]; + return newLogLines; + }); + }; + + useEffect(() => { + return streamLogs(httpApiBase, addLine, setError); + }, [httpApiBase]); + + return ( +
+ + {logLines.map((line) => ( + + ))} +
+ ); +}; diff --git a/crates/librqbit/webui/src/components/LogStreamModal.tsx b/crates/librqbit/webui/src/components/LogStreamModal.tsx new file mode 100644 index 00000000..5787245a --- /dev/null +++ b/crates/librqbit/webui/src/components/LogStreamModal.tsx @@ -0,0 +1,37 @@ +import { useContext } from "react"; +import { Button, Modal } from "react-bootstrap"; +import { APIContext } from "../context"; +import { ErrorComponent } from "./ErrorComponent"; +import { LogStream } from "./LogStream"; + +interface Props { + show: boolean; + onClose: () => void; +} + +export const LogStreamModal: React.FC = ({ show, onClose }) => { + const api = useContext(APIContext); + const apiBase = api.getHttpBaseUrl(); + + return ( + + + rqbit server logs + + + {apiBase ? ( + + ) : ( + + )} + + + + + + ); +}; diff --git a/crates/librqbit/webui/src/components/RootContent.tsx b/crates/librqbit/webui/src/components/RootContent.tsx index 3a6a987c..1c24c946 100644 --- a/crates/librqbit/webui/src/components/RootContent.tsx +++ b/crates/librqbit/webui/src/components/RootContent.tsx @@ -1,7 +1,7 @@ -import { useContext } from "react"; +import { useContext, useState } from "react"; import { Container } from "react-bootstrap"; import { TorrentId, ErrorDetails as ApiErrorDetails } from "../api-types"; -import { AppContext } from "../context"; +import { APIContext, AppContext } from "../context"; import { TorrentsList } from "./TorrentsList"; import { ErrorComponent } from "./ErrorComponent"; import { Buttons } from "./Buttons"; diff --git a/crates/librqbit/webui/src/http-api.ts b/crates/librqbit/webui/src/http-api.ts index 7117966a..60a81a94 100644 --- a/crates/librqbit/webui/src/http-api.ts +++ b/crates/librqbit/webui/src/http-api.ts @@ -64,6 +64,7 @@ const makeRequest = async ( }; export const API: RqbitAPI & { getVersion: () => Promise } = { + getHttpBaseUrl: () => apiUrl, listTorrents: (): Promise => makeRequest("GET", "/torrents"), getTorrentDetails: (index: number): Promise => { diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 2bf63bb3..feb2b317 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -241,6 +241,7 @@ fn init_logging(opts: &Opts) -> InitLoggingResult { .with(fmt::layer().with_filter(stderr_filter)) .with( fmt::layer() + .event_format(fmt::format().with_ansi(false).compact()) .with_ansi(false) .with_writer(line_sub) .with_filter(EnvFilter::builder().parse("info").unwrap()), diff --git a/desktop/src-tauri/Cargo.lock b/desktop/src-tauri/Cargo.lock index d4403aed..0575f8af 100644 --- a/desktop/src-tauri/Cargo.lock +++ b/desktop/src-tauri/Cargo.lock @@ -1876,6 +1876,7 @@ dependencies = [ "bincode", "bitvec", "byteorder", + "bytes", "dashmap", "futures", "hex 0.4.3", @@ -1903,6 +1904,7 @@ dependencies = [ "tokio-util", "tower-http", "tracing", + "tracing-subscriber", "url", "urlencoding", "uuid", diff --git a/desktop/src-tauri/src/config.rs b/desktop/src-tauri/src/config.rs index 6e4c8a00..fb4dd609 100644 --- a/desktop/src-tauri/src/config.rs +++ b/desktop/src-tauri/src/config.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; #[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct RqbitDesktopConfigDht { pub disable: bool, pub disable_persistence: bool, @@ -26,6 +27,7 @@ impl Default for RqbitDesktopConfigDht { } #[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct RqbitDesktopConfigTcpListen { pub disable: bool, pub min_port: u16, @@ -44,6 +46,7 @@ impl Default for RqbitDesktopConfigTcpListen { } #[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct RqbitDesktopConfigPersistence { pub disable: bool, pub filename: PathBuf, @@ -60,6 +63,7 @@ impl Default for RqbitDesktopConfigPersistence { #[serde_as] #[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct RqbitDesktopConfigPeerOpts { #[serde_as(as = "serde_with::DurationSeconds")] pub connect_timeout: Duration, @@ -79,10 +83,12 @@ impl Default for RqbitDesktopConfigPeerOpts { #[serde_as] #[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct RqbitDesktopConfigHttpApi { pub disable: bool, pub listen_addr: SocketAddr, pub read_only: bool, + pub cors_enable_all: bool, } impl Default for RqbitDesktopConfigHttpApi { @@ -91,16 +97,19 @@ impl Default for RqbitDesktopConfigHttpApi { disable: Default::default(), listen_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 3030)), read_only: false, + cors_enable_all: true, } } } #[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct RqbitDesktopConfigUpnp { pub disable: bool, } #[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct RqbitDesktopConfig { pub default_download_location: PathBuf, pub dht: RqbitDesktopConfigDht, diff --git a/desktop/src-tauri/src/main.rs b/desktop/src-tauri/src/main.rs index cb9dc79d..eedae27b 100644 --- a/desktop/src-tauri/src/main.rs +++ b/desktop/src-tauri/src/main.rs @@ -36,12 +36,10 @@ struct StateShared { type RustLogReloadTx = tokio::sync::mpsc::UnboundedSender; -impl StateShared {} - struct State { config_filename: String, shared: Arc>>, - rust_log_reload_tx: RustLogReloadTx, + init_logging: InitLogging, } fn read_config(path: &str) -> anyhow::Result { @@ -66,7 +64,7 @@ fn write_config(path: &str, config: &RqbitDesktopConfig) -> anyhow::Result<()> { } async fn api_from_config( - rust_log_reload_tx: &RustLogReloadTx, + init_logging: &InitLogging, config: &RqbitDesktopConfig, ) -> anyhow::Result { let session = Session::new_with_opts( @@ -97,12 +95,21 @@ async fn api_from_config( .await .context("couldn't set up librqbit session")?; - let api = Api::new(session.clone(), None); + let api = Api::new( + session.clone(), + Some(init_logging.reload_stdout_tx.clone()), + Some(init_logging.line_rx.resubscribe()), + ); if !config.http_api.disable { - let http_api_task = - librqbit::http_api::HttpApi::new(session.clone(), Some(rust_log_reload_tx.clone())) - .make_http_api_and_run(config.http_api.listen_addr, config.http_api.read_only); + let http_api_task = librqbit::http_api::HttpApi::new( + api.clone(), + Some(librqbit::http_api::HttpApiOptions { + cors_enable_all: config.http_api.cors_enable_all, + read_only: config.http_api.read_only, + }), + ) + .make_http_api_and_run(config.http_api.listen_addr); session.spawn(error_span!("http_api"), http_api_task); } @@ -110,7 +117,7 @@ async fn api_from_config( } impl State { - async fn new(rust_log_reload_tx: tokio::sync::mpsc::UnboundedSender) -> Self { + async fn new(init_logging: InitLogging) -> Self { let config_filename = directories::ProjectDirs::from("com", "rqbit", "desktop") .expect("directories::ProjectDirs::from") .config_dir() @@ -120,19 +127,19 @@ impl State { .to_owned(); if let Ok(config) = read_config(&config_filename) { - let api = api_from_config(&rust_log_reload_tx, &config).await.ok(); + let api = api_from_config(&init_logging, &config).await.ok(); let shared = Arc::new(RwLock::new(Some(StateShared { config, api }))); return Self { config_filename, shared, - rust_log_reload_tx, + init_logging, }; } Self { config_filename, - rust_log_reload_tx, + init_logging, shared: Arc::new(RwLock::new(None)), } } @@ -162,7 +169,7 @@ impl State { api.session().stop().await; } - let api = api_from_config(&self.rust_log_reload_tx, &config).await?; + let api = api_from_config(&self.init_logging, &config).await?; if let Err(e) = write_config(&self.config_filename, &config) { error!("error writing config: {:#}", e); } @@ -294,12 +301,32 @@ fn get_version() -> &'static str { env!("CARGO_PKG_VERSION") } -fn init_logging() -> tokio::sync::mpsc::UnboundedSender { +struct InitLogging { + reload_stdout_tx: RustLogReloadTx, + line_rx: librqbit::log_subscriber::LineRx, +} + +fn init_logging() -> InitLogging { use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - let (stderr_filter, reload_stderr_filter) = - tracing_subscriber::reload::Layer::new(EnvFilter::builder().parse("info").unwrap()); + let (stderr_filter, reload_stderr_filter) = tracing_subscriber::reload::Layer::new( + EnvFilter::builder() + .with_default_directive("info".parse().unwrap()) + .from_env() + .unwrap(), + ); - let layered = tracing_subscriber::registry().with(fmt::layer().with_filter(stderr_filter)); + let (line_sub, line_rx) = librqbit::log_subscriber::Subscriber::new(); + + let layered = tracing_subscriber::registry() + .with(fmt::layer().with_filter(stderr_filter)) + .with( + fmt::layer() + .with_ansi(false) + .fmt_fields(tracing_subscriber::fmt::format::DefaultFields::new().delimited(",")) + .event_format(fmt::format().with_ansi(false)) + .with_writer(line_sub) + .with_filter(EnvFilter::builder().parse("info,librqbit=debug").unwrap()), + ); layered.init(); let (reload_tx, mut reload_rx) = tokio::sync::mpsc::unbounded_channel::(); @@ -321,7 +348,10 @@ fn init_logging() -> tokio::sync::mpsc::UnboundedSender { Ok(()) }, ); - reload_tx + InitLogging { + reload_stdout_tx: reload_tx, + line_rx, + } } async fn start() { diff --git a/desktop/src/api.tsx b/desktop/src/api.tsx index 6cbb11e9..d497f277 100644 --- a/desktop/src/api.tsx +++ b/desktop/src/api.tsx @@ -1,3 +1,4 @@ +import { RqbitDesktopConfig } from "./configuration"; import { AddTorrentResponse, ListTorrentsResponse, @@ -66,42 +67,49 @@ async function readFileAsBase64(file: File): Promise { }); } -export const API: RqbitAPI = { - listTorrents: async function (): Promise { - return await invokeAPI("torrents_list"); - }, - getTorrentDetails: async function (id: number): Promise { - return await invokeAPI("torrent_details", { id }); - }, - getTorrentStats: async function (id: number): Promise { - return await invokeAPI("torrent_stats", { id }); - }, - uploadTorrent: async function (data, opts): Promise { - if (data instanceof File) { - let contents = await readFileAsBase64(data); - return await invokeAPI( - "torrent_create_from_base64_file", - { - contents, - opts: opts ?? {}, - } - ); - } - return await invokeAPI("torrent_create_from_url", { - url: data, - opts: opts ?? {}, - }); - }, - pause: function (id: number): Promise { - return invokeAPI("torrent_action_pause", { id }); - }, - start: function (id: number): Promise { - return invokeAPI("torrent_action_start", { id }); - }, - forget: function (id: number): Promise { - return invokeAPI("torrent_action_forget", { id }); - }, - delete: function (id: number): Promise { - return invokeAPI("torrent_action_delete", { id }); - }, +export const makeAPI = (configuration: RqbitDesktopConfig): RqbitAPI => { + return { + getHttpBaseUrl: () => { + return configuration.http_api.listen_addr + ? `http://${configuration.http_api.listen_addr}` + : null; + }, + listTorrents: async function (): Promise { + return await invokeAPI("torrents_list"); + }, + getTorrentDetails: async function (id: number): Promise { + return await invokeAPI("torrent_details", { id }); + }, + getTorrentStats: async function (id: number): Promise { + return await invokeAPI("torrent_stats", { id }); + }, + uploadTorrent: async function (data, opts): Promise { + if (data instanceof File) { + let contents = await readFileAsBase64(data); + return await invokeAPI( + "torrent_create_from_base64_file", + { + contents, + opts: opts ?? {}, + } + ); + } + return await invokeAPI("torrent_create_from_url", { + url: data, + opts: opts ?? {}, + }); + }, + pause: function (id: number): Promise { + return invokeAPI("torrent_action_pause", { id }); + }, + start: function (id: number): Promise { + return invokeAPI("torrent_action_start", { id }); + }, + forget: function (id: number): Promise { + return invokeAPI("torrent_action_forget", { id }); + }, + delete: function (id: number): Promise { + return invokeAPI("torrent_action_delete", { id }); + }, + }; }; diff --git a/desktop/src/configuration.tsx b/desktop/src/configuration.tsx index 416eabd2..a0157a50 100644 --- a/desktop/src/configuration.tsx +++ b/desktop/src/configuration.tsx @@ -28,6 +28,7 @@ interface RqbitDesktopConfigHttpApi { disable: boolean; listen_addr: SocketAddr; read_only: boolean; + cors_enable_all: boolean; } interface RqbitDesktopConfigUpnp { diff --git a/desktop/src/configure.tsx b/desktop/src/configure.tsx index 872e1f17..88f943d6 100644 --- a/desktop/src/configure.tsx +++ b/desktop/src/configure.tsx @@ -295,6 +295,15 @@ export const ConfigModal: React.FC<{ help="If enabled, only GET requests will be allowed through the API" /> + + { return invoke("get_version"); @@ -22,13 +20,11 @@ Promise.all([get_version(), get_default_config(), get_current_config()]).then( ([version, defaultConfig, currentState]) => { ReactDOM.createRoot(document.getElementById("root") as HTMLElement).render( - - - + ); } diff --git a/desktop/src/rqbit-desktop.tsx b/desktop/src/rqbit-desktop.tsx index 0214c2f0..f7af6366 100644 --- a/desktop/src/rqbit-desktop.tsx +++ b/desktop/src/rqbit-desktop.tsx @@ -3,7 +3,10 @@ import { RqbitWebUI } from "./rqbit-webui-src/rqbit-web"; import { CurrentDesktopState, RqbitDesktopConfig } from "./configuration"; import { ConfigModal } from "./configure"; import { IconButton } from "./rqbit-webui-src/components/IconButton"; -import { BsSliders2 } from "react-icons/bs"; +import { BsBodyText, BsSliders2 } from "react-icons/bs"; +import { LogStreamModal } from "./rqbit-webui-src/components/LogStreamModal"; +import { APIContext } from "./rqbit-webui-src/context"; +import { makeAPI } from "./api"; export const RqbitDesktop: React.FC<{ version: string; @@ -15,21 +18,27 @@ export const RqbitDesktop: React.FC<{ currentState.config ?? defaultConfig ); let [configurationOpened, setConfigurationOpened] = useState(false); + let [logsOpened, setLogsOpened] = useState(false); return ( - <> + {configured && ( )} {configured && ( - { - setConfigurationOpened(true); - }} - > - - +
+ { + setConfigurationOpened(true); + }} + > + + + setLogsOpened(true)}> + + +
)} - + setLogsOpened(false)} /> +
); }; From 1a92d76dedb310229b7d075744cc8dc688215b91 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 8 Dec 2023 19:42:20 +0000 Subject: [PATCH 3/3] All working well now --- crates/librqbit/src/api.rs | 13 ++++----- crates/librqbit/src/log_subscriber.rs | 11 +++++--- .../webui/src/components/LogStream.tsx | 8 +++++- crates/rqbit/src/main.rs | 28 ++++++++++--------- desktop/src-tauri/src/main.rs | 13 +++++---- 5 files changed, 42 insertions(+), 31 deletions(-) diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 6afb6906..042020f2 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -2,7 +2,6 @@ use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; use buffers::ByteString; -use bytes::Bytes; use dht::{DhtStats, Id20}; use futures::Stream; use http::StatusCode; @@ -20,7 +19,7 @@ use crate::{ torrent_state::{ peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}, ManagedTorrentHandle, - }, + }, log_subscriber::LineBroadcast, }; pub use crate::torrent_state::stats::{LiveStats, TorrentStats}; @@ -33,19 +32,19 @@ pub type Result = std::result::Result; pub struct Api { session: Arc, rust_log_reload_tx: Option>, - line_rx: Option>>, + line_broadcast: Option, } impl Api { pub fn new( session: Arc, rust_log_reload_tx: Option>, - line_rx: Option>, + line_broadcast: Option ) -> Self { Self { session, rust_log_reload_tx, - line_rx: line_rx.map(Arc::new), + line_broadcast } } @@ -141,9 +140,9 @@ impl Api { + 'static, > { Ok(self - .line_rx + .line_broadcast .as_ref() - .map(|rx| BroadcastStream::new(rx.resubscribe())) + .map(|sender| BroadcastStream::new(sender.subscribe())) .context("line_rx wasn't set")?) } diff --git a/crates/librqbit/src/log_subscriber.rs b/crates/librqbit/src/log_subscriber.rs index 48ed8e79..3817915b 100644 --- a/crates/librqbit/src/log_subscriber.rs +++ b/crates/librqbit/src/log_subscriber.rs @@ -11,12 +11,12 @@ pub struct Writer { tx: tokio::sync::broadcast::Sender, } -pub type LineRx = tokio::sync::broadcast::Receiver; +pub type LineBroadcast = tokio::sync::broadcast::Sender; impl Subscriber { - pub fn new() -> (Self, LineRx) { - let (tx, rx) = tokio::sync::broadcast::channel(100); - (Self { tx }, rx) + pub fn new() -> (Self, LineBroadcast) { + let (tx, _) = tokio::sync::broadcast::channel(100); + (Self { tx: tx.clone() }, tx) } } @@ -33,6 +33,9 @@ impl<'a> MakeWriter<'a> for Subscriber { impl std::io::Write for Writer { fn write(&mut self, buf: &[u8]) -> std::io::Result { let len = buf.len(); + if self.tx.receiver_count() == 0 { + return Ok(len); + } let arc = buf.to_vec().into(); let _ = self.tx.send(arc); Ok(len) diff --git a/crates/librqbit/webui/src/components/LogStream.tsx b/crates/librqbit/webui/src/components/LogStream.tsx index 0091011b..59025759 100644 --- a/crates/librqbit/webui/src/components/LogStream.tsx +++ b/crates/librqbit/webui/src/components/LogStream.tsx @@ -22,13 +22,16 @@ const mergeBuffers = (a1: Uint8Array, a2: Uint8Array): Uint8Array => { const streamLogs = ( httpApiBase: string, addLine: (text: string) => void, - setError: (error: ErrorWithLabel) => void + setError: (error: ErrorWithLabel | null) => void ): (() => void) => { const controller = new AbortController(); const signal = controller.signal; + let canceled = true; + const cancel = () => { console.log("cancelling fetch"); + canceled = true; controller.abort(); }; @@ -37,6 +40,9 @@ const streamLogs = ( try { response = await fetch(httpApiBase + "/stream_logs", { signal }); } catch (e: any) { + if (canceled) { + return; + } setError({ text: "network error fetching logs", details: { diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index feb2b317..6e3436db 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -3,9 +3,9 @@ use std::{io::LineWriter, net::SocketAddr, path::PathBuf, sync::Arc, time::Durat use anyhow::Context; use clap::{Parser, ValueEnum}; use librqbit::{ - api::ApiAddTorrentResponse, http_api::HttpApi, http_api_client, librqbit_spawn, AddTorrent, + api::ApiAddTorrentResponse, http_api::{HttpApi, HttpApiOptions}, http_api_client, librqbit_spawn, AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, ManagedTorrentState, - PeerConnectionOptions, Session, SessionOptions, + PeerConnectionOptions, Session, SessionOptions, log_subscriber::LineBroadcast, Api, }; use size_format::SizeFormatterBinary as SF; use tracing::{error, error_span, info, trace_span, warn}; @@ -180,7 +180,7 @@ enum SubCommand { struct InitLoggingResult { rust_log_reload_tx: tokio::sync::mpsc::UnboundedSender, - line_rx: tokio::sync::broadcast::Receiver, + line_broadcast: LineBroadcast, } // Init logging and make a channel to send new RUST_LOG values to. @@ -209,7 +209,7 @@ fn init_logging(opts: &Opts) -> InitLoggingResult { use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - let (line_sub, line_rx) = librqbit::log_subscriber::Subscriber::new(); + let (line_sub, line_broadcast) = librqbit::log_subscriber::Subscriber::new(); #[cfg(feature = "tokio-console")] { @@ -293,7 +293,7 @@ fn init_logging(opts: &Opts) -> InitLoggingResult { ); InitLoggingResult { rust_log_reload_tx: reload_tx, - line_rx, + line_broadcast, } } @@ -445,14 +445,17 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { trace_span!("stats_printer"), stats_printer(session.clone()), ); + let api = Api::new(session, Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast)); let http_api = HttpApi::new( - session, - Some(log_config.rust_log_reload_tx), - Some(log_config.line_rx), + api, + Some(HttpApiOptions{ + read_only: false, + cors_enable_all: false, + }), ); let http_api_listen_addr = opts.http_api_listen_addr; http_api - .make_http_api_and_run(http_api_listen_addr, false) + .make_http_api_and_run(http_api_listen_addr) .await .context("error running HTTP API") } @@ -529,16 +532,15 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { trace_span!("stats_printer"), stats_printer(session.clone()), ); + let api = Api::new(session.clone(), Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast)); let http_api = HttpApi::new( - session.clone(), - Some(log_config.rust_log_reload_tx), - Some(log_config.line_rx), + api, Some(HttpApiOptions { cors_enable_all: false, read_only: true }) ); let http_api_listen_addr = opts.http_api_listen_addr; librqbit_spawn( "http_api", error_span!("http_api"), - http_api.make_http_api_and_run(http_api_listen_addr, true), + http_api.make_http_api_and_run(http_api_listen_addr), ); let mut added = false; diff --git a/desktop/src-tauri/src/main.rs b/desktop/src-tauri/src/main.rs index eedae27b..d5af5a32 100644 --- a/desktop/src-tauri/src/main.rs +++ b/desktop/src-tauri/src/main.rs @@ -19,8 +19,9 @@ use librqbit::{ TorrentStats, }, dht::PersistentDhtConfig, - librqbit_spawn, AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session, - SessionOptions, + librqbit_spawn, + log_subscriber::LineBroadcast, + AddTorrent, AddTorrentOptions, Api, ApiError, PeerConnectionOptions, Session, SessionOptions, }; use parking_lot::RwLock; use serde::Serialize; @@ -98,7 +99,7 @@ async fn api_from_config( let api = Api::new( session.clone(), Some(init_logging.reload_stdout_tx.clone()), - Some(init_logging.line_rx.resubscribe()), + Some(init_logging.line_broadcast.clone()), ); if !config.http_api.disable { @@ -303,7 +304,7 @@ fn get_version() -> &'static str { struct InitLogging { reload_stdout_tx: RustLogReloadTx, - line_rx: librqbit::log_subscriber::LineRx, + line_broadcast: LineBroadcast, } fn init_logging() -> InitLogging { @@ -315,7 +316,7 @@ fn init_logging() -> InitLogging { .unwrap(), ); - let (line_sub, line_rx) = librqbit::log_subscriber::Subscriber::new(); + let (line_sub, line_broadcast) = librqbit::log_subscriber::Subscriber::new(); let layered = tracing_subscriber::registry() .with(fmt::layer().with_filter(stderr_filter)) @@ -350,7 +351,7 @@ fn init_logging() -> InitLogging { ); InitLogging { reload_stdout_tx: reload_tx, - line_rx, + line_broadcast, } }