Skip to content

Commit

Permalink
feat: display documentation generation jobs (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
woutersl committed Sep 5, 2024
1 parent b5434cb commit 394d5c3
Show file tree
Hide file tree
Showing 9 changed files with 515 additions and 83 deletions.
408 changes: 330 additions & 78 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ chrono = { version = "0.4.38", features = ["serde"] }
cookie = { version = "0.18", features = ["secure", "percent-encode"] }
data-encoding = "2.1"
flate2 ="1.0"
http-body = "1"
mime = "0.3"
opendal = { version = "0.49", features = ["services-fs", "services-s3"] }
quick-xml = "0.36"
Expand Down
17 changes: 16 additions & 1 deletion src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use std::sync::Arc;
use log::info;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::{Pool, Sqlite};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};

use crate::model::auth::{Authentication, RegistryUserToken, RegistryUserTokenWithSecret};
use crate::model::cargo::{
CrateUploadData, CrateUploadResult, OwnersQueryResult, RegistryUser, SearchResults, YesNoMsgResult, YesNoResult,
};
use crate::model::config::Configuration;
use crate::model::deps::DepsAnalysis;
use crate::model::docs::{DocGenJob, DocGenTrigger};
use crate::model::docs::{DocGenJob, DocGenJobUpdate, DocGenTrigger};
use crate::model::packages::CrateInfo;
use crate::model::stats::{DownloadStats, GlobalStats};
use crate::model::{CrateVersion, JobCrate, RegistryInformation};
Expand Down Expand Up @@ -454,6 +455,20 @@ impl Application {
.await
}

/// Adds a listener to job updates
pub async fn get_doc_gen_job_updates(&self, auth_data: &AuthData) -> Result<UnboundedReceiver<DocGenJobUpdate>, ApiError> {
let mut connection: sqlx::pool::PoolConnection<Sqlite> = self.service_db_pool.acquire().await?;
in_transaction(&mut connection, |transaction| async move {
let app = self.with_transaction(transaction);
let _principal = app.authenticate(auth_data).await?;
Ok::<_, ApiError>(())
})
.await?;
let (sender, receiver) = unbounded_channel();
self.service_docs_generator.add_update_listener(sender);
Ok(receiver)
}

/// Force the re-generation for the documentation of a package
pub async fn regen_crate_version_doc(
&self,
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ async fn main_serve_app(application: Arc<Application>, cookie_key: Key) -> Resul
.route("/", put(routes::api_v1_create_global_token))
.route("/:token_id", delete(routes::api_v1_revoke_global_token)),
)
.route("/jobs/docgen", get(routes::api_v1_get_doc_gen_jobs)),
.route("/jobs/docgen", get(routes::api_v1_get_doc_gen_jobs))
.route("/jobs/docgen/updates", get(routes::api_v1_get_doc_gen_job_updates)),
)
.nest(
"/crates",
Expand Down
15 changes: 15 additions & 0 deletions src/model/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,18 @@ pub struct DocGenJob {
/// The output log, if any
pub output: String,
}

/// An update to a documentation generation job
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocGenJobUpdate {
/// The unique identifier of the associated job
#[serde(rename = "jobId")]
pub job_id: i64,
/// The new state for the job
pub state: DocGenJobState,
/// The update timestamp
#[serde(rename = "lastUpdate")]
pub last_update: NaiveDateTime,
/// The appended output, if any
pub output: String,
}
18 changes: 17 additions & 1 deletion src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ use axum::body::{Body, Bytes};
use axum::extract::{Path, Query, State};
use axum::http::header::{HeaderName, SET_COOKIE};
use axum::http::{header, HeaderValue, Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{BoxError, Json};
use cookie::Key;
use futures::Stream;
use futures::{Stream, StreamExt};
use serde::Deserialize;
use tokio::fs::File;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::io::ReaderStream;

use crate::application::Application;
Expand All @@ -35,6 +37,7 @@ use crate::utils::apierror::{error_invalid_request, error_not_found, specialize,
use crate::utils::axum::auth::{AuthData, AxumStateForCookies};
use crate::utils::axum::embedded::{EmbeddedResources, WebappResource};
use crate::utils::axum::extractors::Base64;
use crate::utils::axum::sse::{Event, ServerSentEventStream};
use crate::utils::axum::{response, response_error, response_ok, ApiResult};
use crate::utils::token::generate_token;

Expand Down Expand Up @@ -385,6 +388,19 @@ pub async fn api_v1_get_doc_gen_jobs(auth_data: AuthData, State(state): State<Ar
response(state.application.get_doc_gen_jobs(&auth_data).await)
}

/// Gets a stream of updates for documentation generation jobs
pub async fn api_v1_get_doc_gen_job_updates(
auth_data: AuthData,
State(state): State<Arc<AxumState>>,
) -> Result<Response, (StatusCode, Json<ApiError>)> {
let receiver = match state.application.get_doc_gen_job_updates(&auth_data).await {
Ok(r) => r,
Err(e) => return Err(response_error(e)),
};
let stream = ServerSentEventStream::new(UnboundedReceiverStream::new(receiver).map(Event::from_data));
Ok(stream.into_response())
}

/// Gets the known users
pub async fn api_v1_get_users(auth_data: AuthData, State(state): State<Arc<AxumState>>) -> ApiResult<Vec<RegistryUser>> {
response(state.application.get_users(&auth_data).await)
Expand Down
30 changes: 28 additions & 2 deletions src/services/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@

use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use chrono::Local;
use flate2::bufread::GzDecoder;
use log::{error, info};
use sqlx::{Pool, Sqlite};
use tar::Archive;
use tokio::process::Command;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::interval;

use crate::model::config::Configuration;
use crate::model::docs::{DocGenJob, DocGenJobState, DocGenTrigger};
use crate::model::docs::{DocGenJob, DocGenJobState, DocGenJobUpdate, DocGenTrigger};
use crate::model::JobCrate;
use crate::services::database::Database;
use crate::services::storage::Storage;
Expand All @@ -33,6 +35,9 @@ pub trait DocsGenerator {

/// Queues a job for documentation generation
fn queue<'a>(&'a self, spec: &'a JobCrate, trigger: &'a DocGenTrigger) -> FaillibleFuture<'a, DocGenJob>;

/// Adds a listener to job updates
fn add_update_listener(&self, listener: UnboundedSender<DocGenJobUpdate>);
}

/// Gets the documentation generation service
Expand All @@ -45,6 +50,7 @@ pub fn get_docs_generator(
configuration,
service_db_pool,
service_storage,
listeners: Arc::new(Mutex::new(Vec::new())),
});
// launch workers
let _handle = tokio::spawn({
Expand All @@ -65,6 +71,8 @@ struct DocsGeneratorImpl {
service_db_pool: Pool<Sqlite>,
/// The storage layer
service_storage: Arc<dyn Storage + Send + Sync>,
/// The active listeners
listeners: Arc<Mutex<Vec<UnboundedSender<DocGenJobUpdate>>>>,
}

impl DocsGenerator for DocsGeneratorImpl {
Expand Down Expand Up @@ -93,6 +101,11 @@ impl DocsGenerator for DocsGeneratorImpl {
Ok(job)
})
}

/// Adds a listener to job updates
fn add_update_listener(&self, listener: UnboundedSender<DocGenJobUpdate>) {
self.listeners.lock().unwrap().push(listener);
}
}

impl DocsGeneratorImpl {
Expand All @@ -104,6 +117,19 @@ impl DocsGeneratorImpl {
database.update_docgen_job(job_id, state, output.unwrap_or_default()).await
})
.await?;

// send updates
let now = Local::now().naive_local();
self.listeners.lock().unwrap().retain_mut(|sender| {
sender
.send(DocGenJobUpdate {
job_id,
state,
last_update: now,
output: output.unwrap_or_default().to_string(),
})
.is_ok()
});
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/utils/axum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
pub mod auth;
pub mod embedded;
pub mod extractors;
pub mod sse;

use axum::http::StatusCode;
use axum::Json;
Expand Down
105 changes: 105 additions & 0 deletions src/utils/axum/sse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*******************************************************************************
* Copyright (c) 2021 Cénotélie Opérations SAS (cenotelie.fr)
******************************************************************************/

//! API for Server-Sent Events

use std::convert::Infallible;
use std::fmt::{Display, Formatter};
use std::pin::Pin;
use std::task::{Context, Poll};

use axum::body::{Body, Bytes, HttpBody};
use axum::http::{header, HeaderValue, Response};
use axum::response::IntoResponse;
use futures::Stream;
use http_body::Frame;
use serde::Serialize;

/// A Server-Sent Event
#[allow(clippy::struct_field_names, dead_code)]
pub struct Event<T> {
/// The event type, to be serialized in the `event` field
pub event_type: Option<String>,
/// The event unique id, if any
pub id: Option<String>,
/// The payload
pub data: T,
}

impl<T> Event<T> {
/// Produces an event from a payload
pub fn from_data(data: T) -> Event<T> {
Self {
event_type: None,
id: None,
data,
}
}
}

impl<T: Serialize> Display for Event<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(event_type) = self.event_type.as_deref() {
writeln!(f, "event: {event_type}")?;
}
if let Some(id) = self.event_type.as_deref() {
writeln!(f, "id: {id}")?;
}
let data = serde_json::to_string(&self.data).map_err(|_| std::fmt::Error)?;
writeln!(f, "data: {data}\n")
}
}

/// A stream of Server-Sent Events to be sent by axum
pub struct ServerSentEventStream<S>(S);

impl<S, T> ServerSentEventStream<S>
where
S: Send + Stream<Item = Event<T>>,
T: Serialize + Send + Unpin,
{
/// Encapsulate the original stream
pub fn new(stream: S) -> ServerSentEventStream<S> {
ServerSentEventStream(stream)
}
}

impl<S, T> HttpBody for ServerSentEventStream<S>
where
S: Send + Stream<Item = Event<T>> + Unpin,
T: Serialize + Send + Unpin,
{
type Data = Bytes;

type Error = Infallible;

fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let inner = Pin::new(&mut self.get_mut().0);
match inner.poll_next(cx) {
Poll::Ready(Some(event)) => {
let data = event.to_string().into_bytes();
Poll::Ready(Some(Ok(Frame::data(Bytes::from(data)))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

impl<S, T> IntoResponse for ServerSentEventStream<S>
where
S: Send + Stream<Item = Event<T>> + Unpin + 'static,
T: Serialize + Send + Unpin,
{
fn into_response(self) -> axum::response::Response {
let mut response = Response::new(Body::new(self));
response
.headers_mut()
.append(header::CONTENT_TYPE, HeaderValue::from_static("text/event-stream"));
response
.headers_mut()
.append(header::CACHE_CONTROL, HeaderValue::from_static("no-store"));
response
}
}

0 comments on commit 394d5c3

Please sign in to comment.