diff --git a/src/adm/tiles.rs b/src/adm/tiles.rs index b87587f1..4fe3cf38 100644 --- a/src/adm/tiles.rs +++ b/src/adm/tiles.rs @@ -104,7 +104,7 @@ impl Tile { pub fn from_adm_tile(tile: AdmTile) -> Self { // Generate a base response tile from the ADM provided tile structure. // NOTE: the `image_size` is still required to be determined, and is - // provided by `StoreImage.store()` + // provided by `ImageStore.store()` Self { id: tile.id, name: tile.name, diff --git a/src/server/img_storage.rs b/src/server/img_storage.rs index f0326a61..6583ad97 100644 --- a/src/server/img_storage.rs +++ b/src/server/img_storage.rs @@ -1,18 +1,22 @@ //! Fetch and store a given remote image into Google Storage for CDN caching -use std::{env, io::Cursor, time::Duration}; +use std::{env, io::Cursor, sync::Arc}; use actix_http::http::HeaderValue; -use actix_web::http::uri; -use actix_web::web::Bytes; -use cloud_storage::{Bucket, Object}; +use actix_web::{http::uri, web::Bytes}; +use cadence::{CountedExt, StatsdClient}; +use chrono::{DateTime, Duration, Utc}; +use cloud_storage::Bucket; +use dashmap::DashMap; use image::{io::Reader as ImageReader, ImageFormat}; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; -use crate::error::{HandlerError, HandlerErrorKind, HandlerResult}; -use crate::settings::Settings; -use crate::tags::Tags; +use crate::{ + error::{HandlerError, HandlerErrorKind, HandlerResult}, + settings::Settings, + tags::Tags, +}; /// These values generally come from the Google console for Cloud Storage. #[derive(Clone, Debug, Deserialize, Serialize)] @@ -51,7 +55,7 @@ pub struct StorageSettings { /// The external CDN host cdn_host: String, /// The bucket TTL is determined by the policy set for the given bucket when it's created. - bucket_ttl: u64, + bucket_ttl: Option, /// The max time to live for cached data, ~ 15 days. cache_ttl: u64, /// Max dimensions for an image @@ -91,7 +95,7 @@ impl Default for StorageSettings { project_name: "topsites-nonprod".to_owned(), bucket_name: "moz-topsites-stage-cdn".to_owned(), cdn_host: "https://cdn.stage.topsites.nonprod.cloudops.mozgcp.net/".to_owned(), - bucket_ttl: 86400 * 15, + bucket_ttl: None, cache_ttl: 86400 * 15, metrics: ImageMetricSettings::default(), request_timeout: 3, @@ -102,8 +106,8 @@ impl Default for StorageSettings { /// Image storage container #[derive(Clone)] -pub struct StoreImage { - // No `Default` stated for `StoreImage` because we *ALWAYS* want a timeout +pub struct ImageStore { + // No `Default` stated for `ImageStore` because we *ALWAYS* want a timeout // for the `reqwest::Client` // // bucket isn't really needed here, since `Object` stores and manages itself, @@ -111,18 +115,31 @@ pub struct StoreImage { // // bucket: Option, settings: StorageSettings, + // `Settings::tiles_ttl` + tiles_ttl: u32, + cadence_metrics: StatsdClient, req: reqwest::Client, + /// `StoredImage`s already fetched/uploaded + stored_images: Arc>, } /// Stored image information, suitable for determining the URL to present to the CDN -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct StoredImage { pub url: uri::Uri, - pub object: Option, pub image_metrics: ImageMetrics, + expiry: DateTime, } -#[derive(Clone, Debug, Deserialize, Default, Serialize, PartialEq)] +impl StoredImage { + /// Whether this image should be refetched and checked against the Cloud + /// Storage Bucket + fn expired(&self) -> bool { + self.expiry <= Utc::now() + } +} + +#[derive(Copy, Clone, Debug, Deserialize, Default, Serialize, PartialEq)] pub struct ImageMetrics { pub width: u32, pub height: u32, @@ -130,18 +147,21 @@ pub struct ImageMetrics { } /// Store a given image into Google Storage -impl StoreImage { +impl ImageStore { /// Connect and optionally create a new Google Storage bucket based off [Settings] pub async fn create( settings: &Settings, + cadence_metrics: &StatsdClient, client: &reqwest::Client, ) -> HandlerResult> { let sset = StorageSettings::from(settings); - Self::check_bucket(&sset, client).await + Self::check_bucket(&sset, settings.tiles_ttl, cadence_metrics, client).await } pub async fn check_bucket( settings: &StorageSettings, + tiles_ttl: u32, + cadence_metrics: &StatsdClient, client: &reqwest::Client, ) -> HandlerResult> { if env::var("SERVICE_ACCOUNT").is_err() @@ -179,7 +199,10 @@ impl StoreImage { Ok(Some(Self { // bucket: Some(bucket), settings: settings.clone(), + tiles_ttl, + cadence_metrics: cadence_metrics.clone(), req: client.clone(), + stored_images: Default::default(), })) } @@ -210,15 +233,20 @@ impl StoreImage { /// Store an image fetched from the passed `uri` into Google Cloud Storage /// - /// This will absolutely fetch and store the img into the bucket. - /// We don't do any form of check to see if it matches what we got before. - /// If you have "Storage Legacy Bucket Writer" previous content is overwritten. - /// (e.g. set the path to be the SHA1 of the bytes or whatever.) - + /// This will fetch and store the img into the bucket if necessary (fetch + /// results are cached for a short time). pub async fn store(&self, uri: &uri::Uri) -> HandlerResult { + if let Some(stored_image) = self.stored_images.get(uri) { + if !stored_image.expired() { + return Ok(stored_image.clone()); + } + } let (image, content_type) = self.fetch(uri).await?; let metrics = self.validate(uri, &image, &content_type).await?; - self.upload(image, &content_type, metrics).await + let stored_image = self.upload(image, &content_type, metrics).await?; + self.stored_images + .insert(uri.to_owned(), stored_image.clone()); + Ok(stored_image) } /// Generate a unique hash based on the content of the image @@ -229,10 +257,14 @@ impl StoreImage { /// Fetch the bytes for an image based on a URI pub(crate) async fn fetch(&self, uri: &uri::Uri) -> HandlerResult<(Bytes, String)> { trace!("fetching... {:?}", &uri); + self.cadence_metrics.incr("image.fetch").ok(); + let res = self .req .get(&uri.to_string()) - .timeout(Duration::from_secs(self.settings.request_timeout)) + .timeout(std::time::Duration::from_secs( + self.settings.request_timeout, + )) .send() .await? .error_for_status()?; @@ -339,19 +371,21 @@ impl StoreImage { ); // check to see if image has already been stored. + self.cadence_metrics.incr("image.object.check").ok(); if let Ok(exists) = cloud_storage::Object::read_with(&self.settings.bucket_name, &image_path, &self.req) .await { trace!("Found existing image in bucket: {:?}", &exists.media_link); - return Ok(StoredImage { - url: format!("{}/{}", &self.settings.cdn_host, &image_path).parse()?, - object: Some(exists), + return Ok(self.new_image( + format!("{}/{}", &self.settings.cdn_host, &image_path).parse()?, image_metrics, - }); + exists.time_created, + )); } // store new data to the googles + self.cadence_metrics.incr("image.object.create").ok(); match cloud_storage::Object::create_with_params( &self.settings.bucket_name, image.to_vec(), @@ -365,14 +399,11 @@ impl StoreImage { Ok(mut object) => { object.content_disposition = Some("inline".to_owned()); object.cache_control = Some(format!("public, max-age={}", self.settings.cache_ttl)); + self.cadence_metrics.incr("image.object.update").ok(); object.update().await?; let url = format!("{}/{}", &self.settings.cdn_host, &image_path); trace!("Stored to {:?}: {:?}", &object.self_link, &url); - Ok(StoredImage { - url: url.parse()?, - object: Some(object), - image_metrics, - }) + Ok(self.new_image(url.parse()?, image_metrics, object.time_created)) } Err(e) => { if let cloud_storage::Error::Other(ref json) = e { @@ -388,18 +419,44 @@ impl StoreImage { // 412 Precondition Failed: the image already exists, so we // can continue on trace!("Store Precondition Failed (412), image already exists, continuing"); + self.cadence_metrics + .incr("image.object.already_exists") + .ok(); let url = format!("{}/{}", &self.settings.cdn_host, &image_path); - return Ok(StoredImage { - url: url.parse()?, - object: None, + return Ok(self.new_image( + url.parse()?, image_metrics, - }); + // approximately (close enough) + Utc::now(), + )); } } Err(e.into()) } } } + + fn new_image( + &self, + url: uri::Uri, + image_metrics: ImageMetrics, + time_created: DateTime, + ) -> StoredImage { + // Images should not change (any image modification should result in a + // new url from upstream). However, poll it every `Settings::tiles_ttl` + // anyway, just in case + let mut expiry = Utc::now() + Duration::seconds(self.tiles_ttl.into()); + if let Some(bucket_ttl) = self.settings.bucket_ttl { + // Take `StorageSettings::bucket_ttl` into account in the rare case + // it's set the image to expire earlier than now + `tiles_ttl` + expiry = std::cmp::min(expiry, time_created + Duration::seconds(bucket_ttl as i64)); + } + StoredImage { + url, + image_metrics, + expiry, + } + } } #[cfg(test)] @@ -408,6 +465,7 @@ mod tests { use crate::settings::test_settings; use actix_http::http::Uri; + use cadence::{NopMetricSink, SpyMetricSink}; use rand::Rng; fn set_env() { @@ -427,13 +485,28 @@ mod tests { StorageSettings { project_name: project, bucket_name: bucket, - bucket_ttl: 86400 * 15, + bucket_ttl: None, cache_ttl: 86400 * (15 + 1), cdn_host: cdn, ..Default::default() } } + fn test_store() -> ImageStore { + let settings = test_storage_settings(); + let timeout = std::time::Duration::from_secs(settings.request_timeout); + ImageStore { + settings, + tiles_ttl: 15 * 60, + cadence_metrics: StatsdClient::builder("", NopMetricSink).build(), + req: reqwest::Client::builder() + .connect_timeout(timeout) + .build() + .unwrap(), + stored_images: Default::default(), + } + } + fn test_image_buffer(height: u32, width: u32) -> Bytes { let mut rng = rand::thread_rng(); // generate a garbage image. @@ -475,15 +548,22 @@ mod tests { let test_settings = test_storage_settings(); let client = reqwest::Client::builder() - .connect_timeout(Duration::from_secs(test_settings.request_timeout)) + .connect_timeout(std::time::Duration::from_secs( + test_settings.request_timeout, + )) .build() .unwrap(); - let bucket = StoreImage::check_bucket(&test_settings, &client) - .await - .unwrap() - .unwrap(); + let img_store = ImageStore::check_bucket( + &test_settings, + 15 * 60, + &StatsdClient::builder("", NopMetricSink).build(), + &client, + ) + .await + .unwrap() + .unwrap(); let target = src_img.parse::().unwrap(); - bucket.store(&target).await.expect("Store failed"); + img_store.store(&target).await.expect("Store failed"); Ok(()) } @@ -492,17 +572,8 @@ mod tests { set_env(); let test_valid_image = test_image_buffer(96, 96); let test_uri: Uri = "https://example.com/test.jpg".parse().unwrap(); - let test_settings = test_storage_settings(); - let timeout = Duration::from_secs(test_settings.request_timeout); - let bucket = StoreImage { - settings: test_settings, - req: reqwest::Client::builder() - .connect_timeout(timeout) - .build() - .unwrap(), - }; - - let result = bucket + let img_store = test_store(); + let result = img_store .validate(&test_uri, &test_valid_image, "image/jpg") .await .unwrap(); @@ -517,15 +588,8 @@ mod tests { set_env(); let test_valid_image = test_image_buffer(96, 100); let test_uri: Uri = "https://example.com/test.jpg".parse().unwrap(); - let bucket = StoreImage { - settings: test_storage_settings(), - req: reqwest::Client::builder() - .connect_timeout(Duration::from_secs(3)) - .build() - .unwrap(), - }; - - assert!(bucket + let img_store = test_store(); + assert!(img_store .validate(&test_uri, &test_valid_image, "image/jpg") .await .is_err()); @@ -542,4 +606,59 @@ mod tests { setting.storage = test_val.to_owned(); let _store_set: StorageSettings = (&setting).into(); } + + #[tokio::test] + async fn test_image_caching() -> Result<(), ()> { + if std::env::var("GOOGLE_APPLICATION_CREDENTIALS").is_err() { + print!("Skipping test: No credentials found."); + return Ok(()); + } + let src_img = "https://evilonastick.com/test/128px.jpg"; + + let test_settings = test_storage_settings(); + let tiles_ttl = 2; + let client = reqwest::Client::builder() + .connect_timeout(std::time::Duration::from_secs( + test_settings.request_timeout, + )) + .build() + .unwrap(); + let (rx, sink) = SpyMetricSink::new(); + let img_store = ImageStore::check_bucket( + &test_settings, + tiles_ttl, + &StatsdClient::builder("contile", sink).build(), + &client, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(rx.len(), 0); + + let target = src_img.parse::().unwrap(); + img_store.store(&target).await.expect("Store failed"); + assert_eq!(rx.len(), 2); + + img_store.store(&target).await.expect("Store failed"); + assert_eq!(rx.len(), 2); + + tokio::time::delay_for(std::time::Duration::from_secs(tiles_ttl.into())).await; + img_store.store(&target).await.expect("Store failed"); + assert_eq!(rx.len(), 4); + let spied_metrics: Vec = rx + .iter() + .take(4) + .map(|x| String::from_utf8(x).unwrap()) + .collect(); + assert_eq!( + spied_metrics, + vec![ + "contile.image.fetch:1|c", + "contile.image.object.check:1|c", + "contile.image.fetch:1|c", + "contile.image.object.check:1|c", + ] + ); + Ok(()) + } } diff --git a/src/server/mod.rs b/src/server/mod.rs index a0886b72..e5e47223 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -12,7 +12,7 @@ use crate::{ adm::{spawn_updater, AdmFilter}, error::{HandlerError, HandlerResult}, metrics::metrics_from_opts, - server::{img_storage::StoreImage, location::location_config_from_settings}, + server::{img_storage::ImageStore, location::location_config_from_settings}, settings::Settings, web::{dockerflow, handlers, middleware}, }; @@ -37,7 +37,7 @@ pub struct ServerState { pub tiles_cache: cache::TilesCache, pub settings: Settings, pub filter: Arc>, - pub img_store: Option, + pub img_store: Option, pub excluded_dmas: Option>, pub start_up: Instant, } @@ -116,7 +116,7 @@ impl Server { .build()?; spawn_updater(&filter, req.clone()); let tiles_cache = cache::TilesCache::new(TILES_CACHE_INITIAL_CAPACITY); - let img_store = StoreImage::create(&settings, &req).await?; + let img_store = ImageStore::create(&settings, &metrics, &req).await?; let excluded_dmas = if let Some(exclude_dmas) = &settings.exclude_dma { serde_json::from_str(exclude_dmas).map_err(|e| { HandlerError::internal(&format!("Invalid exclude_dma field: {:?}", e))