Skip to content

Commit

Permalink
[identity] fix add_reserved_usernames RPC
Browse files Browse the repository at this point in the history
Summary:
Since there aren't that many users in the users table currently, it's cheaper to just get all the usernames from DDB and then compare with the list from keyserver using HashSets.

Also fixed the BatchWriteRequests -- DynamoDB didn't like that we were sending more than 25 items per request, so I broke up the usernames list into chunks of 25. Also, I removed the retry logic for unprocessed items because we should rarely have those.

Test Plan:
1. Inserted 1000 new users with usernames into my local MariaDB users table.
2. Ran the cron job that calls the AddReservedUsernames RPC.
3. Confirmed that all the new usernames were in the prod DynamoDB reserved usernames table. Existing users (e.g. ashoat), were skipped.
4. Cleared the DDB table.

Reviewers: jon, bartek

Reviewed By: bartek

Subscribers: ashoat, tomek

Differential Revision: https://phab.comm.dev/D8631
  • Loading branch information
vdhanan committed Jul 26, 2023
1 parent 4fa8840 commit f7bc1ce
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 58 deletions.
17 changes: 5 additions & 12 deletions services/identity/src/client_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,18 +830,11 @@ impl IdentityClientService for ClientService {
&message.signature,
)?;

let mut filtered_usernames = Vec::new();

for username in usernames {
if !self
.client
.username_taken(username.clone())
.await
.map_err(handle_db_error)?
{
filtered_usernames.push(username);
}
}
let filtered_usernames = self
.client
.filter_out_taken_usernames(usernames)
.await
.map_err(handle_db_error)?;

self
.client
Expand Down
89 changes: 43 additions & 46 deletions services/identity/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use constant_time_eq::constant_time_eq;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -485,6 +485,23 @@ impl DatabaseClient {
Ok(result.is_some())
}

pub async fn filter_out_taken_usernames(
&self,
usernames: Vec<String>,
) -> Result<Vec<String>, Error> {
let db_usernames = self.get_all_usernames().await?;

let db_usernames_set: HashSet<String> = db_usernames.into_iter().collect();
let usernames_set: HashSet<String> = usernames.into_iter().collect();

let available_usernames: Vec<String> = usernames_set
.difference(&db_usernames_set)
.cloned()
.collect();

Ok(available_usernames)
}

async fn get_user_from_user_info(
&self,
user_info: String,
Expand Down Expand Up @@ -631,25 +648,25 @@ impl DatabaseClient {
.map_err(|e| Error::AwsSdk(e.into()))
}

pub async fn get_users(&self) -> Result<Vec<String>, Error> {
async fn get_all_usernames(&self) -> Result<Vec<String>, Error> {
let scan_output = self
.client
.scan()
.table_name(USERS_TABLE)
.projection_expression(USERS_TABLE_PARTITION_KEY)
.projection_expression(USERS_TABLE_USERNAME_ATTRIBUTE)
.send()
.await
.map_err(|e| Error::AwsSdk(e.into()))?;

let mut result = Vec::new();
if let Some(attributes) = scan_output.items {
for mut attribute in attributes {
let id = parse_string_attribute(
USERS_TABLE_PARTITION_KEY,
attribute.remove(USERS_TABLE_PARTITION_KEY),
)
.map_err(Error::Attribute)?;
result.push(id);
if let Ok(username) = parse_string_attribute(
USERS_TABLE_USERNAME_ATTRIBUTE,
attribute.remove(USERS_TABLE_USERNAME_ATTRIBUTE),
) {
result.push(username);
}
}
}
Ok(result)
Expand Down Expand Up @@ -683,49 +700,29 @@ impl DatabaseClient {
&self,
usernames: Vec<String>,
) -> Result<(), Error> {
let mut write_requests = vec![];

for username in usernames {
let item: HashMap<String, AttributeValue> = vec![(
RESERVED_USERNAMES_TABLE_PARTITION_KEY.to_string(),
AttributeValue::S(username),
)]
.into_iter()
.collect();

let write_request = WriteRequest::builder()
.put_request(PutRequest::builder().set_item(Some(item)).build())
.build();

write_requests.push(write_request);
}

loop {
let output = self
// A single call to BatchWriteItem can consist of up to 25 operations
for usernames_chunk in usernames.chunks(25) {
let write_requests = usernames_chunk
.iter()
.map(|username| {
let put_request = PutRequest::builder()
.item(
RESERVED_USERNAMES_TABLE_PARTITION_KEY,
AttributeValue::S(username.to_string()),
)
.build();

WriteRequest::builder().put_request(put_request).build()
})
.collect();

self
.client
.batch_write_item()
.request_items(RESERVED_USERNAMES_TABLE, write_requests)
.send()
.await
.map_err(|e| Error::AwsSdk(e.into()))?;

let unprocessed_items_map = match output.unprocessed_items() {
Some(map) => map,
None => break,
};

let unprocessed_requests =
match unprocessed_items_map.get(RESERVED_USERNAMES_TABLE) {
Some(requests) => requests,
None => break,
};

info!(
"{} unprocessed items, retrying...",
unprocessed_requests.len()
);

write_requests = unprocessed_requests.to_vec();
}

info!("Batch write item to reserved usernames table succeeded");
Expand Down

0 comments on commit f7bc1ce

Please sign in to comment.