Skip to content

Commit

Permalink
feat: rewrite rendering pipeline for parallel requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Vilsol committed May 15, 2024
1 parent 1a2094e commit b3d8881
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
!src/
!build.rs
!pdf_rendering.proto
!cfg
!cfg
!.cargo
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread", "full"] }
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread", "rt", "full"] }
prost = "0.12.3"
headless_chrome = "1.0.9"
tonic = "0.11.0"
Expand All @@ -26,6 +26,7 @@ lopdf = "0.32.0"
serde_json = "1.0.116"
serde = { version = "1.0.198", features = ["derive"] }
prost-wkt-types = "0.5.1"
ulid = "1.1.2"

[build-dependencies]
tonic-build = "0.11.0"
Expand Down
4 changes: 2 additions & 2 deletions lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

set -ex

cargo fmt
cargo clippy
cargo +nightly fmt
cargo +nightly clippy
13 changes: 10 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tonic_health::ServingStatus;
use crate::proto::pdf_rendering::pdf_rendering_service_server::PdfRenderingServiceServer;
use crate::renderer::start_renderer;
use crate::server::PDFServer;
use crate::types::InternalRequest;
use crate::types::{IDExtension, InternalRequest};

mod pdf_utils;
mod proto;
Expand All @@ -37,6 +37,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
.write_style(WriteStyle::Always)
.init();

let metrics = tokio::runtime::Handle::current().metrics();
info!("worker count: {}", metrics.num_workers());

let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::pdf_rendering::FILE_DESCRIPTOR_SET)
.build()
Expand Down Expand Up @@ -99,8 +102,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}

fn logging(req: Request<()>) -> Result<Request<()>, Status> {
info!("Received request: {:?}", req);
fn logging(mut req: Request<()>) -> Result<Request<()>, Status> {
let id = ulid::Ulid::new();

req.extensions_mut().insert(IDExtension { id });

info!("[{}] Received request: {:?}", id, req);
Ok(req)
}

Expand Down
6 changes: 2 additions & 4 deletions src/pdf_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,8 @@ pub fn merge_pdfs(documents: Vec<Document>) -> std::io::Result<Vec<u8>> {

//Set all bookmarks to the PDF Object tree then set the Outlines to the Bookmark content map.
if let Some(n) = document.build_outline() {
if let Ok(x) = document.get_object_mut(catalog_object.0) {
if let Dictionary(ref mut dict) = x {
dict.set("Outlines", Reference(n));
}
if let Ok(Dictionary(ref mut dict)) = document.get_object_mut(catalog_object.0) {
dict.set("Outlines", Reference(n));
}
}

Expand Down
70 changes: 52 additions & 18 deletions src/renderer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use crate::proto::pdf_rendering::pdf_options::PaperFormat;
use crate::proto::pdf_rendering::render_source::Content;
use crate::proto::pdf_rendering::RenderOptions;
use crate::types::InternalRequest;
use crate::proto::pdf_rendering::{RenderData, RenderOptions};
use crate::types::{InternalRequest, RendererResponse};
use anyhow::{anyhow, Result};
use headless_chrome::browser::default_executable;
use headless_chrome::types::PrintToPdfOptions;
use headless_chrome::{Browser, LaunchOptionsBuilder};
use headless_chrome::{Browser, LaunchOptionsBuilder, Tab};
use std::error::Error;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;

impl PaperFormat {
Expand Down Expand Up @@ -47,7 +48,7 @@ impl PaperFormat {
}

pub fn content_to_pdf(
browser: Browser,
tab: Arc<Tab>,
content: Content,
options: Option<RenderOptions>,
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
Expand Down Expand Up @@ -126,8 +127,6 @@ pub fn content_to_pdf(
..Default::default()
};

let tab = browser.new_tab().expect("failed opening new browser tab");

let pdf = match content {
Content::Url(url) => tab
.navigate_to(url.as_str())?
Expand Down Expand Up @@ -186,21 +185,56 @@ pub async fn start_renderer(mut rx: Receiver<InternalRequest>) -> Result<(), Box
.unwrap();

tokio::spawn(async move {
let browser = Browser::new(options).expect("failed instantiating browser");
let browser = Arc::new(Browser::new(options).expect("failed instantiating browser"));

while let Some(cmd) = rx.recv().await {
for req in cmd.data {
let content = req
.clone()
.source
.expect("no source")
.content
.expect("no content");
let options = req.clone().options;
let out = content_to_pdf(browser.clone(), content, options.clone());
let _ = cmd.response.send(out).await;
}
handle_cmd(browser.clone(), cmd);
}
});

Ok(())
}

pub fn handle_cmd(browser: Arc<Browser>, cmd: InternalRequest) {
tokio::spawn(async move {
let (tx, mut rx) = mpsc::channel::<RendererResponse>(32);

let data = cmd.data;
for (i, req) in data.iter().enumerate() {
let tab = browser.new_tab().expect("failed opening new browser tab");
handle_req(tab, req, tx.clone(), i);
}

let mut rendered = Vec::with_capacity(data.clone().len());

for _ in data.clone().iter() {
rendered.push(rx.recv().await.unwrap());
}

rendered.sort_by_key(|r| r.order);

for out in rendered {
let _ = cmd.response.send(out.resp).await;
}
});
}

pub fn handle_req(
tab: Arc<Tab>,
req: &RenderData,
tx: mpsc::Sender<RendererResponse>,
order: usize,
) {
let req2 = req.clone();
tokio::spawn(async move {
let content = req2
.clone()
.source
.expect("no source")
.content
.expect("no content");
let options = req2.clone().options;
let out = content_to_pdf(tab, content, options.clone());
let _ = tx.clone().send(RendererResponse { resp: out, order }).await;
});
}
10 changes: 7 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use crate::proto::pdf_rendering::{
use crate::proto::status;
use crate::proto::status::OperationStatus;
use crate::s3::upload_to_s3;
use crate::types::{InternalRequest, InternalResponse};
use crate::types::{IDExtension, InternalRequest, InternalResponse};
use config::Config;
use log::{error, info};
use log::{debug, error, info};
use lopdf::Document;
use prost_wkt_types::Empty;
use tokio::sync::mpsc;
Expand All @@ -32,6 +32,10 @@ impl PdfRenderingService for PDFServer {
) -> Result<Response<RenderingResponse>, Status> {
let (tx, mut rx) = mpsc::channel::<InternalResponse>(32);

let id = request.extensions().get::<IDExtension>().unwrap();

debug!("[{}] Rendering request: {:?}", id.id, request.get_ref());

let data = match request.get_ref().clone().r#type.unwrap() {
Type::Individual(req) => req.data.iter().map(|x| x.clone().data.unwrap()).collect(),
Type::Combined(req) => req.data,
Expand All @@ -57,7 +61,7 @@ impl PdfRenderingService for PDFServer {
rendered.push(rx.recv().await);
}

info!("Rendering success: {:?}", request.get_ref());
info!("[{}] Rendering success", id.id);

let output = match request.get_ref().clone().r#type.unwrap() {
Type::Individual(req) => Ok(Self::individual_response(
Expand Down
9 changes: 9 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,12 @@ pub struct InternalRequest {
}

pub type InternalResponse = anyhow::Result<Vec<u8>, Box<dyn Error + Send + Sync>>;

pub struct RendererResponse {
pub resp: InternalResponse,
pub order: usize,
}

pub struct IDExtension {
pub id: ulid::Ulid,
}

0 comments on commit b3d8881

Please sign in to comment.