Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
fix: reduce the work done while holding the updater write lock (#504)
Browse files Browse the repository at this point in the history
also fix a bug in the updater's handling of all_include_regions

DISCO-2203 & DISCO-2204

Closes #502
Closes #503
  • Loading branch information
pjenvey authored Jan 16, 2023
1 parent c046a7f commit cf71155
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 96 deletions.
121 changes: 59 additions & 62 deletions src/adm/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,41 +112,41 @@ pub fn spawn_updater(
return Ok(());
}
}
let mfilter = filter.clone();
let mfilter = Arc::clone(filter);
rt::spawn(async move {
let mut tags = crate::tags::Tags::default();
loop {
{
// Do the check inside of a scope so that the read lock can be released right away.
let should_update = { mfilter.read().await.requires_update(&storage_client).await };
match should_update {
Ok(true) => {
let mut filter = mfilter.write().await;
match filter.update(&storage_client).await {
Ok(_) => {
metrics.incr("filter.adm.update.ok").ok();
}
Err(e) => {
filter.report(&e, &mut tags);
metrics.incr("filter.adm.update.error").ok();
}
}
}
Ok(false) => {
metrics.incr("filter.adm.update.check.skip").ok();
}
Err(e) => {
mfilter.read().await.report(&e, &mut tags);
metrics.incr("filter.adm.update.check.error").ok();
}
}
}
updater(&mfilter, &storage_client, &metrics).await;
rt::time::sleep(refresh_rate).await;
}
});
Ok(())
}

/// Update `AdmFilter` from the Cloud Storage settings if they've been updated
async fn updater(
filter: &Arc<RwLock<AdmFilter>>,
storage_client: &cloud_storage::Client,
metrics: &Arc<StatsdClient>,
) {
// Do the check before matching so that the read lock can be released right away.
let result = filter.read().await.fetch_new_settings(storage_client).await;
match result {
Ok(Some(new_settings)) => {
filter.write().await.update(new_settings);
trace!("AdmFilter updated from cloud storage");
metrics.incr("filter.adm.update.ok").ok();
}
Ok(None) => {
metrics.incr("filter.adm.update.check.skip").ok();
}
Err(e) => {
trace!("AdmFilter update failed: {:?}", e);
metrics.incr("filter.adm.update.check.error").ok();
l_sentry::report(&e, &e.tags);
}
}
}

/// Filter a given tile data set provided by ADM and validate the various elements
impl AdmFilter {
/// convenience function to determine if settings are cloud ready.
Expand All @@ -166,14 +166,15 @@ impl AdmFilter {
l_sentry::report(error, &merged_tags);
}

/// check to see if the bucket has been modified since the last time we updated.
pub async fn requires_update(
/// Check if the bucket has been modified since the last time we updated,
/// returning new `AdmAdvertiserSettings` if so.
pub async fn fetch_new_settings(
&self,
storage_client: &cloud_storage::Client,
) -> HandlerResult<bool> {
) -> HandlerResult<Option<AdmAdvertiserSettings>> {
// don't update non-bucket versions (for now)
if !self.is_cloud() {
return Ok(false);
return Ok(None);
}
if let Some(bucket) = &self.source_url {
let host = bucket
Expand All @@ -182,44 +183,40 @@ impl AdmFilter {
HandlerError::internal(&format!("Missing bucket Host {:?}", self.source))
})?
.to_string();
let obj = storage_client
.object()
.read(&host, bucket.path().trim_start_matches('/'))
.await?;
let path = bucket.path().trim_start_matches('/');
let obj = storage_client.object().read(&host, path).await?;
if let Some(updated) = self.last_updated {
// if the bucket is older than when we last checked, do nothing.
return Ok(updated <= obj.updated);
// if the object is older than the last update, do nothing
if obj.updated < updated {
return Ok(None);
}
};
return Ok(true);

let bytes = storage_client.object().download(&host, path).await?;
let contents = String::from_utf8(bytes).map_err(|e| {
HandlerErrorKind::General(format!("Could not read ADM Settings: {:?}", e))
})?;
let new_settings = serde_json::from_str(&contents).map_err(|e| {
HandlerErrorKind::General(format!("Could not read ADM Settings: {:?}", e))
})?;
return Ok(Some(new_settings));
}
Ok(false)
Ok(None)
}

/// Try to update the ADM filter data from the remote bucket.
pub async fn update(&mut self, storage_client: &cloud_storage::Client) -> HandlerResult<()> {
if let Some(bucket) = &self.source_url {
let advertiser_filters =
AdmFilter::advertisers_from_settings_bucket(storage_client, bucket)
.await
.map_err(|e| {
HandlerError::internal(&format!(
"Invalid bucket data in {:?}: {:?}",
self.source, e
))
})?;
self.advertiser_filters.adm_advertisers.clear();
for (adv, setting) in advertiser_filters.adm_advertisers {
self.all_include_regions.clear();
for country in setting.keys() {
self.all_include_regions.insert(country.clone());
}
self.advertiser_filters
.adm_advertisers
.insert(adv.to_lowercase(), setting);
/// Clear and update the ADM filter data from new `AdmAdvertiserSettings`
pub fn update(&mut self, settings: AdmAdvertiserSettings) {
self.all_include_regions.clear();
self.advertiser_filters.adm_advertisers.clear();
for (adv, setting) in settings.adm_advertisers {
for country in setting.keys() {
self.all_include_regions.insert(country.clone());
}
self.last_updated = Some(chrono::Utc::now());
self.advertiser_filters
.adm_advertisers
.insert(adv.to_lowercase(), setting);
}
Ok(())
self.last_updated = Some(chrono::Utc::now());
}

/// Check the advertiser URL
Expand Down
34 changes: 1 addition & 33 deletions src/adm/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,38 +361,6 @@ impl AdmFilter {
);
Value::Object(adm_settings).to_string()
}

/// Try to fetch the ADM settings from a Google Storage bucket url.
pub async fn advertisers_from_settings_bucket(
cloud_storage: &cloud_storage::Client,
settings_bucket: &url::Url,
) -> Result<AdmAdvertiserSettings, ConfigError> {
let settings_str = settings_bucket.as_str();
if settings_bucket.scheme() != "gs" {
return Err(ConfigError::Message(format!(
"Improper bucket URL: {:?}",
settings_str
)));
}
let bucket_name = settings_bucket
.host()
.ok_or_else(|| {
ConfigError::Message(format!("Invalid adm settings bucket name {}", settings_str))
})?
.to_string();
let path = settings_bucket.path().trim_start_matches('/');
let contents = cloud_storage
.object()
.download(&bucket_name, path)
.await
.map_err(|e| ConfigError::Message(format!("Could not download settings: {:?}", e)))?;
serde_json::from_str(
&String::from_utf8(contents).map_err(|e| {
ConfigError::Message(format!("Could not read ADM Settings: {:?}", e))
})?,
)
.map_err(|e| ConfigError::Message(format!("Could not read ADM Settings: {:?}", e)))
}
}

/// Attempt to read the AdmSettings as either a path to a JSON file, or as a JSON string.
Expand Down Expand Up @@ -537,7 +505,7 @@ impl From<&mut Settings> for HandlerResult<AdmFilter> {
ignore_list,
legacy_list,
all_include_regions,
last_updated: source.starts_with("gs://").then(chrono::Utc::now),
last_updated: None,
source: Some(source),
source_url,
refresh_rate: std::time::Duration::from_secs(refresh_rate),
Expand Down
6 changes: 5 additions & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ impl Server {
let mut partner_filter = HandlerResult::<AdmFilter>::from(&mut settings)?;
// try to update from the bucket if possible.
if partner_filter.is_cloud() {
partner_filter.update(&storage_client).await?
let advertiser_settings = partner_filter
.fetch_new_settings(&storage_client)
.await?
.expect("Expected AdmAdvertiserSettings for is_cloud AdmFilter");
partner_filter.update(advertiser_settings);
}
let refresh_rate = partner_filter.refresh_rate;
let is_cloud = partner_filter.is_cloud();
Expand Down

0 comments on commit cf71155

Please sign in to comment.