Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigtable: fix AccessToken issues #34213

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 35 additions & 30 deletions storage-bigtable/src/access_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,41 +91,46 @@ impl AccessToken {
}

/// Call this function regularly to ensure the access token does not expire
pub async fn refresh(&self) {
pub fn refresh(&self) {
// Check if it's time to try a token refresh
{
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
let token_r = self.token.read().unwrap();
if token_r.1.elapsed().as_secs() < token_r.0.expires_in() as u64 / 2 {
return;
}
let token_r = self.token.read().unwrap();
if token_r.1.elapsed().as_secs() < token_r.0.expires_in() as u64 / 2 {
debug!("Token is not expired yet");
return;
}

#[allow(deprecated)]
if self
.refresh_active
.compare_and_swap(false, true, Ordering::Relaxed)
{
// Refresh already pending
return;
}
// Refresh already is progress
let refresh_progress = self.refresh_active.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed);
if refresh_progress.is_err() {
debug!("Token update is already in progress");
return;
}

info!("Refreshing token");
match time::timeout(
time::Duration::from_secs(5),
Self::get_token(&self.credentials, &self.scope),
)
.await
{
Ok(new_token) => match (new_token, self.token.write()) {
(Ok(new_token), Ok(mut token_w)) => *token_w = new_token,
(Ok(_new_token), Err(err)) => warn!("{}", err),
(Err(err), _) => warn!("{}", err),
},
Err(_) => {
warn!("Token refresh timeout")
let credentials = self.credentials.clone();
let scope = self.scope.clone();
let refresh_active = Arc::clone(&self.refresh_active);
let token = Arc::clone(&self.token);
tokio::spawn(async move {
match time::timeout(
time::Duration::from_secs(5),
Self::get_token(&credentials, &scope),
)
.await
{
Ok(new_token) => match new_token {
Ok(new_token) => {
let mut token_w = token.write().unwrap();
*token_w = new_token;
}
Err(err) => error!("Failed to fetch new token: {}", err),
},
Err(_timeout) => {
warn!("Token refresh timeout")
}
}
}
self.refresh_active.store(false, Ordering::Relaxed);
refresh_active.store(false, Ordering::Relaxed);
info!("Token refreshed");
});
}

/// Return an access token suitable for use in an HTTP authorization header
Expand Down
18 changes: 9 additions & 9 deletions storage-bigtable/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
Ok(rows)
}

async fn refresh_access_token(&self) {
fn refresh_access_token(&self) {
if let Some(ref access_token) = self.access_token {
access_token.refresh().await;
access_token.refresh();
}
}

Expand All @@ -434,7 +434,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
if rows_limit == 0 {
return Ok(vec![]);
}
self.refresh_access_token().await;
self.refresh_access_token();
let response = self
.client
.read_rows(ReadRowsRequest {
Expand Down Expand Up @@ -479,7 +479,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {

/// Check whether a row key exists in a `table`
pub async fn row_key_exists(&mut self, table_name: &str, row_key: RowKey) -> Result<bool> {
self.refresh_access_token().await;
self.refresh_access_token();

let response = self
.client
Expand Down Expand Up @@ -524,7 +524,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
if rows_limit == 0 {
return Ok(vec![]);
}
self.refresh_access_token().await;
self.refresh_access_token();
let response = self
.client
.read_rows(ReadRowsRequest {
Expand Down Expand Up @@ -558,7 +558,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
table_name: &str,
row_keys: &[RowKey],
) -> Result<Vec<(RowKey, RowData)>> {
self.refresh_access_token().await;
self.refresh_access_token();

let response = self
.client
Expand Down Expand Up @@ -594,7 +594,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
table_name: &str,
row_key: RowKey,
) -> Result<RowData> {
self.refresh_access_token().await;
self.refresh_access_token();

let response = self
.client
Expand Down Expand Up @@ -623,7 +623,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {

/// Delete one or more `table` rows
async fn delete_rows(&mut self, table_name: &str, row_keys: &[RowKey]) -> Result<()> {
self.refresh_access_token().await;
self.refresh_access_token();

let mut entries = vec![];
for row_key in row_keys {
Expand Down Expand Up @@ -669,7 +669,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
family_name: &str,
row_data: &[(&RowKey, RowData)],
) -> Result<()> {
self.refresh_access_token().await;
self.refresh_access_token();

let mut entries = vec![];
for (row_key, row_data) in row_data {
Expand Down