Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

feat: Read ADM settings data from a Google Storage bucket. #331

Merged
merged 10 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 92 additions & 2 deletions src/adm/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use url::Url;

use super::{
tiles::{AdmTile, Tile},
AdmAdvertiserFilterSettings, DEFAULT,
AdmAdvertiserFilterSettings, AdmSettings, DEFAULT,
};
use crate::{
adm::settings::PathMatching,
Expand Down Expand Up @@ -48,12 +48,16 @@ pub struct AdmFilter {
pub filter_set: HashMap<String, AdmAdvertiserFilterSettings>,
/// Ignored (not included but also not reported to Sentry) Advertiser names
pub ignore_list: HashSet<String>,
/// All countries set for inclusion in at least one of the
/// All countries set for inclusion in at least one of the advertiser regions
/// [crate::adm::AdmAdvertiserFilterSettings]
pub all_include_regions: HashSet<String>,
/// Temporary list of advertisers with legacy images built into firefox
/// for pre 91 tile support.
pub legacy_list: HashSet<String>,
pub source: String,
pub source_url: Option<url::Url>,
pub last_updated: Option<chrono::DateTime<chrono::Utc>>,
pub refresh_rate: std::time::Duration,
}

/// Parse &str into a `Url`
Expand Down Expand Up @@ -98,6 +102,14 @@ fn check_url(url: Url, species: &'static str, filter: &[Vec<String>]) -> Handler

/// Filter a given tile data set provided by ADM and validate the various elements
impl AdmFilter {
/// convenience function to determine if settings are cloud ready.
pub fn is_cloud(&self) -> bool {
if let Some(source) = &self.source_url {
return source.scheme() == "gs";
}
false
}

/// Report the error directly to sentry
fn report(&self, error: &HandlerError, tags: &Tags) {
// trace!(&error, &tags);
Expand All @@ -107,6 +119,84 @@ impl AdmFilter {
l_sentry::report(&merged_tags, sentry::event_from_error(error));
}

/// check to see if the bucket has been modified since the last time we updated.
pub async fn requires_update(&self) -> HandlerResult<bool> {
// don't update non-bucket versions (for now)
if !self.is_cloud() {
return Ok(false);
}
if let Some(bucket) = &self.source_url {
let host = match bucket.host() {
Some(v) => v,
None => {
return Err(HandlerError::internal(&format!(
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
"Missing bucket Host {:?}",
self.source
)))
}
}
.to_string();

let obj: cloud_storage::Object = cloud_storage::Object::read(&host, bucket.path())
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
.await
.map_err(|e| {
HandlerError::internal(&format!(
"Could not read bucket {:?}, {:?}",
self.source, e
))
})?;
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
if let Some(updated) = self.last_updated {
// if the bucket is older than when we last checked, do nothing.
return Ok(updated <= obj.updated);
};
return Ok(true);
}
Ok(false)
}

/// Try to update the ADM filter data from the remote bucket.
pub async fn update(&mut self) -> HandlerResult<()> {
if let Some(bucket) = &self.source_url {
let adm_settings = AdmSettings::from_settings_bucket(bucket)
.await
.map_err(|e| {
HandlerError::internal(&format!(
"Invalid bucket data in {:?}: {:?}",
self.source, e
))
})?;
for (adv, setting) in adm_settings.advertisers {
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
trace!("Processing records for {:?}", &adv);
// DEFAULT included but sans special processing -- close enough
for country in &setting.include_regions {
if !self.all_include_regions.contains(country) {
self.all_include_regions.insert(country.clone());
}
}
// map the settings to the URL we're going to be checking
self.filter_set.insert(adv.to_lowercase(), setting);
}
self.last_updated = Some(chrono::Utc::now());
}
Ok(())
}

// background settings updater.
pub async fn spawn_updater(&self) {
if !self.is_cloud() {
return;
}
let mut s = self.clone();
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
actix_rt::spawn(async move {
loop {
if s.requires_update().await.expect("Failed to detect update") {
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
s.update().await.expect("Failed to update");
actix_rt::time::delay_for(s.refresh_rate).await;
}
}
});
}

/// Check the advertiser URL
fn check_advertiser(
&self,
Expand Down
154 changes: 123 additions & 31 deletions src/adm/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
};

use config::ConfigError;

use serde::{ser::SerializeSeq, Deserialize, Deserializer, Serialize, Serializer};

use super::AdmFilter;
Expand Down Expand Up @@ -174,7 +173,104 @@ where
seq.end()
}

pub(crate) type AdmSettings = HashMap<String, AdmAdvertiserFilterSettings>;
#[derive(Debug, Default, Clone)]
pub struct AdmSettings {
bucket: Option<url::Url>,
pub advertisers: HashMap<String, AdmAdvertiserFilterSettings>,
}

impl Serialize for AdmSettings {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.collect_map(self.advertisers.clone())
}
}

/// Create AdmSettings from a string serialized JSON format
impl TryFrom<String> for AdmSettings {
type Error = ConfigError;

fn try_from(settings_str: String) -> Result<Self, Self::Error> {
// don't try to serialize bucket values quite yet.
if settings_str.starts_with("gs://") {
return Ok(Self {
bucket: Some(settings_str.parse::<url::Url>().map_err(|err| {
ConfigError::Message(format!(
"Invalid bucket url: {:?} {:?}",
settings_str, err
))
})?),
..Default::default()
});
}
let adm_settings: HashMap<String, AdmAdvertiserFilterSettings> =
serde_json::from_str(&settings_str).expect("Invalid ADM Settings JSON string");
for (adv, filter_setting) in &adm_settings {
if filter_setting
.include_regions
.iter()
.any(|region| region != &region.to_uppercase())
{
return Err(ConfigError::Message(format!(
"Advertiser {:?} include_regions must be uppercase",
adv
)));
}
if filter_setting.advertiser_urls.iter().any(|filter| {
if let Some(ref paths) = filter.paths {
return paths.iter().any(|path| match path.matching {
PathMatching::Prefix => !path.value.ends_with('/'),
PathMatching::Exact => !path.value.starts_with('/'),
});
}
false
}) {
return Err(ConfigError::Message(format!("Advertiser {:?} advertiser_urls contain invalid prefix PathFilter (missing trailing '/')", adv)));
}
}
Ok(AdmSettings {
advertisers: adm_settings,
..Default::default()
})
}
}

impl AdmSettings {
/// Try to fetch the ADM settings from a Google Storage bucket url.
pub async fn from_settings_bucket(
settings_bucket: &url::Url,
) -> Result<AdmSettings, ConfigError> {
let settings_str = settings_bucket.as_str();
if settings_bucket.scheme() != "gs" {
return Err(ConfigError::Message(format!(
"Improper bucket URL: {:?}",
settings_str
)));
}
let bucket_name = match settings_bucket.host() {
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
Some(v) => v,
None => {
return Err(ConfigError::Message(format!(
"Invalid adm settings bucket name {}",
settings_str
)))
}
}
.to_string();
let path = settings_bucket.path();
let contents = cloud_storage::Object::download(&bucket_name, path)
.await
.map_err(|e| ConfigError::Message(format!("Could not download settings: {:?}", e)))?;
let mut reply =
AdmSettings::try_from(String::from_utf8(contents).map_err(|e| {
ConfigError::Message(format!("Could not read ADM Settings: {:?}", e))
})?)?;
reply.bucket = Some(settings_bucket.clone());
Ok(reply)
}
}

/// Attempt to read the AdmSettings as either a path to a JSON file, or as a JSON string.
///
Expand Down Expand Up @@ -226,32 +322,7 @@ impl TryFrom<&mut Settings> for AdmSettings {
})?;
}
}
let adm_settings: AdmSettings =
serde_json::from_str(&settings_str).expect("Invalid ADM Settings JSON string");
for (adv, filter_setting) in &adm_settings {
if filter_setting
.include_regions
.iter()
.any(|region| region != &region.to_uppercase())
{
return Err(ConfigError::Message(format!(
"Advertiser {:?} include_regions must be uppercase",
adv
)));
}
if filter_setting.advertiser_urls.iter().any(|filter| {
if let Some(ref paths) = filter.paths {
return paths.iter().any(|path| match path.matching {
PathMatching::Prefix => !path.value.ends_with('/'),
PathMatching::Exact => !path.value.starts_with('/'),
});
}
false
}) {
return Err(ConfigError::Message(format!("Advertiser {:?} advertiser_urls contain invalid prefix PathFilter (missing trailing '/')", adv)));
}
}
Ok(adm_settings)
AdmSettings::try_from(settings_str)
}
}

Expand Down Expand Up @@ -283,6 +354,7 @@ impl TryFrom<&mut Settings> for AdmSettings {
impl From<&mut Settings> for HandlerResult<AdmFilter> {
fn from(settings: &mut Settings) -> Self {
let mut filter_map: HashMap<String, AdmAdvertiserFilterSettings> = HashMap::new();
let refresh_rate = settings.adm_refresh_rate_secs;
let ignore_list = settings
.adm_ignore_advertisers
.clone()
Expand All @@ -294,8 +366,20 @@ impl From<&mut Settings> for HandlerResult<AdmFilter> {
.unwrap_or_else(|| "[]".to_owned())
.to_lowercase();
let mut all_include_regions = HashSet::new();
for (adv, setting) in
AdmSettings::try_from(settings).map_err(|e| HandlerError::internal(&e.to_string()))?
let source = settings.adm_settings.clone();
let source_url = match source.parse::<url::Url>() {
Ok(v) => Some(v),
Err(e) => {
warn!(
"Source may be path or unparsable URL: {:?} {:?}",
&source, e
);
None
}
};
for (adv, setting) in AdmSettings::try_from(settings)
.map_err(|e| HandlerError::internal(&e.to_string()))?
.advertisers
{
trace!("Processing records for {:?}", &adv);
// DEFAULT included but sans special processing -- close enough
Expand All @@ -316,6 +400,13 @@ impl From<&mut Settings> for HandlerResult<AdmFilter> {
ignore_list,
all_include_regions,
legacy_list,
last_updated: match &source.starts_with("gs://") {
true => Some(chrono::Utc::now()),
false => None,
},
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
source,
source_url,
refresh_rate: std::time::Duration::from_secs(refresh_rate),
})
}
}
Expand Down Expand Up @@ -378,10 +469,11 @@ mod tests {
let mut settings = Settings::with_env_and_config_file(&None, true).unwrap();
let mut adm_settings = adm_settings();
adm_settings
.advertisers
.get_mut("Dunder Mifflin")
.expect("No Dunder Mifflin tile")
.include_regions = vec!["MX".to_owned()];
settings.adm_settings = json!(adm_settings).to_string();
settings.adm_settings = json!(adm_settings.advertisers).to_string();
let filter = HandlerResult::<AdmFilter>::from(&mut settings).unwrap();
assert!(
filter.all_include_regions
Expand Down
15 changes: 12 additions & 3 deletions src/adm/tiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,22 @@ pub async fn get_tiles(
metrics.incr_with_tags("filter.adm.empty_response", Some(tags));
}

// update the filters if needed.
if settings.adm_live_update && state.filter.read().unwrap().requires_update().await? {
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
let mut mfilter = state.filter.write().unwrap();
(*mfilter).update().await?;
}
let filtered: Vec<Tile> = response
.tiles
.into_iter()
.filter_map(|tile| {
state
.filter
.filter_and_process(tile, location, &device_info, tags, metrics)
state.filter.read().unwrap().filter_and_process(
tile,
location,
&device_info,
tags,
metrics,
)
})
.take(settings.adm_max_tiles as usize)
.collect();
Expand Down
Loading