From b3d888159e1ff06527f96ab1b72de2ecae57a3b6 Mon Sep 17 00:00:00 2001 From: Vilsol Date: Wed, 15 May 2024 21:09:42 +0300 Subject: [PATCH] feat: rewrite rendering pipeline for parallel requests --- .cargo/config.toml | 2 ++ .dockerignore | 3 +- Cargo.lock | 24 ++++++++++++++++ Cargo.toml | 3 +- lint.sh | 4 +-- src/main.rs | 13 +++++++-- src/pdf_utils.rs | 6 ++-- src/renderer.rs | 70 ++++++++++++++++++++++++++++++++++------------ src/server.rs | 10 +++++-- src/types.rs | 9 ++++++ 10 files changed, 112 insertions(+), 32 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..bff29e6 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/.dockerignore b/.dockerignore index 2cf53b3..0d15cda 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,4 +4,5 @@ !src/ !build.rs !pdf_rendering.proto -!cfg \ No newline at end of file +!cfg +!.cargo \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 4104cd2..292eaac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1155,8 +1155,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1922,6 +1924,7 @@ dependencies = [ "tonic-build", "tonic-health", "tonic-reflection", + "ulid", ] [[package]] @@ -3177,6 +3180,17 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "ulid" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" +dependencies = [ + "getrandom", + "rand", + "web-time", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -3371,6 +3385,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.2" diff --git a/Cargo.toml b/Cargo.toml index d6859f2..a0ebd61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread", "full"] } +tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread", "rt", "full"] } prost = "0.12.3" headless_chrome = "1.0.9" tonic = "0.11.0" @@ -26,6 +26,7 @@ lopdf = "0.32.0" serde_json = "1.0.116" serde = { version = "1.0.198", features = ["derive"] } prost-wkt-types = "0.5.1" +ulid = "1.1.2" [build-dependencies] tonic-build = "0.11.0" diff --git a/lint.sh b/lint.sh index 1257034..c984cac 100755 --- a/lint.sh +++ b/lint.sh @@ -2,5 +2,5 @@ set -ex -cargo fmt -cargo clippy \ No newline at end of file +cargo +nightly fmt +cargo +nightly clippy \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index cfdd772..614e355 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use tonic_health::ServingStatus; use crate::proto::pdf_rendering::pdf_rendering_service_server::PdfRenderingServiceServer; use crate::renderer::start_renderer; use crate::server::PDFServer; -use crate::types::InternalRequest; +use crate::types::{IDExtension, InternalRequest}; mod pdf_utils; mod proto; @@ -37,6 +37,9 @@ async fn main() -> Result<(), Box> { .write_style(WriteStyle::Always) .init(); + let metrics = tokio::runtime::Handle::current().metrics(); + info!("worker count: {}", metrics.num_workers()); + let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(proto::pdf_rendering::FILE_DESCRIPTOR_SET) .build() @@ -99,8 +102,12 @@ async fn main() -> Result<(), Box> { Ok(()) } -fn logging(req: Request<()>) -> Result, Status> { - info!("Received request: {:?}", req); +fn logging(mut req: Request<()>) -> Result, Status> { + let id = ulid::Ulid::new(); + + req.extensions_mut().insert(IDExtension { id }); + + info!("[{}] Received request: {:?}", id, req); Ok(req) } diff --git a/src/pdf_utils.rs b/src/pdf_utils.rs index 55aa743..28a857a 100644 --- a/src/pdf_utils.rs +++ b/src/pdf_utils.rs @@ -168,10 +168,8 @@ pub fn merge_pdfs(documents: Vec) -> std::io::Result> { //Set all bookmarks to the PDF Object tree then set the Outlines to the Bookmark content map. if let Some(n) = document.build_outline() { - if let Ok(x) = document.get_object_mut(catalog_object.0) { - if let Dictionary(ref mut dict) = x { - dict.set("Outlines", Reference(n)); - } + if let Ok(Dictionary(ref mut dict)) = document.get_object_mut(catalog_object.0) { + dict.set("Outlines", Reference(n)); } } diff --git a/src/renderer.rs b/src/renderer.rs index 2e068eb..668e17c 100644 --- a/src/renderer.rs +++ b/src/renderer.rs @@ -1,15 +1,16 @@ use crate::proto::pdf_rendering::pdf_options::PaperFormat; use crate::proto::pdf_rendering::render_source::Content; -use crate::proto::pdf_rendering::RenderOptions; -use crate::types::InternalRequest; +use crate::proto::pdf_rendering::{RenderData, RenderOptions}; +use crate::types::{InternalRequest, RendererResponse}; use anyhow::{anyhow, Result}; use headless_chrome::browser::default_executable; use headless_chrome::types::PrintToPdfOptions; -use headless_chrome::{Browser, LaunchOptionsBuilder}; +use headless_chrome::{Browser, LaunchOptionsBuilder, Tab}; use std::error::Error; use std::io; use std::sync::Arc; use std::time::Duration; +use tokio::sync::mpsc; use tokio::sync::mpsc::Receiver; impl PaperFormat { @@ -47,7 +48,7 @@ impl PaperFormat { } pub fn content_to_pdf( - browser: Browser, + tab: Arc, content: Content, options: Option, ) -> Result, Box> { @@ -126,8 +127,6 @@ pub fn content_to_pdf( ..Default::default() }; - let tab = browser.new_tab().expect("failed opening new browser tab"); - let pdf = match content { Content::Url(url) => tab .navigate_to(url.as_str())? @@ -186,21 +185,56 @@ pub async fn start_renderer(mut rx: Receiver) -> Result<(), Box .unwrap(); tokio::spawn(async move { - let browser = Browser::new(options).expect("failed instantiating browser"); + let browser = Arc::new(Browser::new(options).expect("failed instantiating browser")); + while let Some(cmd) = rx.recv().await { - for req in cmd.data { - let content = req - .clone() - .source - .expect("no source") - .content - .expect("no content"); - let options = req.clone().options; - let out = content_to_pdf(browser.clone(), content, options.clone()); - let _ = cmd.response.send(out).await; - } + handle_cmd(browser.clone(), cmd); } }); Ok(()) } + +pub fn handle_cmd(browser: Arc, cmd: InternalRequest) { + tokio::spawn(async move { + let (tx, mut rx) = mpsc::channel::(32); + + let data = cmd.data; + for (i, req) in data.iter().enumerate() { + let tab = browser.new_tab().expect("failed opening new browser tab"); + handle_req(tab, req, tx.clone(), i); + } + + let mut rendered = Vec::with_capacity(data.clone().len()); + + for _ in data.clone().iter() { + rendered.push(rx.recv().await.unwrap()); + } + + rendered.sort_by_key(|r| r.order); + + for out in rendered { + let _ = cmd.response.send(out.resp).await; + } + }); +} + +pub fn handle_req( + tab: Arc, + req: &RenderData, + tx: mpsc::Sender, + order: usize, +) { + let req2 = req.clone(); + tokio::spawn(async move { + let content = req2 + .clone() + .source + .expect("no source") + .content + .expect("no content"); + let options = req2.clone().options; + let out = content_to_pdf(tab, content, options.clone()); + let _ = tx.clone().send(RendererResponse { resp: out, order }).await; + }); +} diff --git a/src/server.rs b/src/server.rs index ba809eb..cf775b9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,9 +11,9 @@ use crate::proto::pdf_rendering::{ use crate::proto::status; use crate::proto::status::OperationStatus; use crate::s3::upload_to_s3; -use crate::types::{InternalRequest, InternalResponse}; +use crate::types::{IDExtension, InternalRequest, InternalResponse}; use config::Config; -use log::{error, info}; +use log::{debug, error, info}; use lopdf::Document; use prost_wkt_types::Empty; use tokio::sync::mpsc; @@ -32,6 +32,10 @@ impl PdfRenderingService for PDFServer { ) -> Result, Status> { let (tx, mut rx) = mpsc::channel::(32); + let id = request.extensions().get::().unwrap(); + + debug!("[{}] Rendering request: {:?}", id.id, request.get_ref()); + let data = match request.get_ref().clone().r#type.unwrap() { Type::Individual(req) => req.data.iter().map(|x| x.clone().data.unwrap()).collect(), Type::Combined(req) => req.data, @@ -57,7 +61,7 @@ impl PdfRenderingService for PDFServer { rendered.push(rx.recv().await); } - info!("Rendering success: {:?}", request.get_ref()); + info!("[{}] Rendering success", id.id); let output = match request.get_ref().clone().r#type.unwrap() { Type::Individual(req) => Ok(Self::individual_response( diff --git a/src/types.rs b/src/types.rs index 3913636..00c154c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -8,3 +8,12 @@ pub struct InternalRequest { } pub type InternalResponse = anyhow::Result, Box>; + +pub struct RendererResponse { + pub resp: InternalResponse, + pub order: usize, +} + +pub struct IDExtension { + pub id: ulid::Ulid, +}