-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: unify queues in proxy #976
feat: unify queues in proxy #976
Conversation
b756f62
to
37c906e
Compare
bin/tx-prover/src/proxy/mod.rs
Outdated
// We use a new scope for each iteration to release the lock before sleeping | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this new scope as we are not acquiring a lock anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. Removing the scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thank you! I left some comments inline.
bin/tx-prover/src/proxy/mod.rs
Outdated
fn new() -> Self { | ||
Self { | ||
tries: 0, | ||
request_id: rand::random::<u64>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we were getting request ID from the session. Is there a reason not to do it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I changed it to use the context because it was fairly less complex this way. Using the session required to store the request_id in the headers and access it at almost every method in the life cycle of the request. This way the initialization is way easier (CTX::new
is implicitly called at the start of every request) and the access is much easier. And since it is something that was not useful to the worker nor the user, it sounded like the most correct option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I think it is probably fine to keep as is, but could we increase the value to u128
(to make the probability of collisions truly negligible).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A very common approach would be to use uuid here, but anything of that size should work fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thank you! I left a few small comments inline. After these are addressed, we should be good to merge.
bin/tx-prover/src/proxy/mod.rs
Outdated
/// Set an available worker | ||
/// | ||
/// This method will add a worker to the list of available workers. | ||
/// If the worker is already available, it will panic. | ||
pub async fn add_available_worker(&self, worker: Backend) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 59 needs to be updated. But also we could probably re-write this comment (and other similar related comments) as:
/// Adds the provided worker to the list of available workers.
///
/// # Panics
/// Panics if the provided worker is already in the list of available workers.
bin/tx-prover/src/proxy/mod.rs
Outdated
/// Get an available worker | ||
/// | ||
/// This method will return the first available worker from the list of available workers, and | ||
/// remove it from the list. | ||
/// If no worker is available, it will return None. | ||
pub async fn get_available_worker(&self) -> Option<Backend> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment as above.
bin/tx-prover/src/proxy/mod.rs
Outdated
fn new() -> Self { | ||
Self { | ||
tries: 0, | ||
request_id: rand::random::<u64>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I think it is probably fine to keep as is, but could we increase the value to u128
(to make the probability of collisions truly negligible).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left a couple of minor comments, mostly stylistic
bin/tx-prover/src/proxy/mod.rs
Outdated
|
||
const RESOURCE_EXHAUSTED_CODE: u16 = 8; | ||
// LoadBalancer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This would be written in all caps
bin/tx-prover/src/proxy/mod.rs
Outdated
/// Rate limiter | ||
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1))); | ||
|
||
// Request queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the last comment
bin/tx-prover/src/proxy/mod.rs
Outdated
/// Shared state. It is a map of workers to a vector of request IDs | ||
static QUEUES: Lazy<RwLock<HashMap<Backend, Vec<String>>>> = | ||
Lazy::new(|| RwLock::new(HashMap::new())); | ||
// RequestContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same stylistic comment here
bin/tx-prover/src/proxy/mod.rs
Outdated
fn new() -> Self { | ||
Self { | ||
tries: 0, | ||
request_id: rand::random::<u64>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A very common approach would be to use uuid here, but anything of that size should work fine
bin/tx-prover/src/proxy/mod.rs
Outdated
// Check if there is an available worker | ||
if let Some(worker) = self.get_available_worker().await { | ||
ctx.set_worker(worker); | ||
info!("Worker picked up the request with ID: {}", request_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to identify which worker picked up the request here for request traceability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok! Changing it.
// deletion. | ||
Self::remove_request_from_queue(request_id).await; | ||
// Mark the worker as available | ||
self.add_available_worker(ctx.worker.take().expect("Failed to get worker")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd add an info!()
log here (or maybe better within the add_available_worker
function) that mentions that the worker became available. Basically I'm trying to think if we have enough request and traceability to be able to tell how the system was working if something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, left a couple of minor comments
bin/tx-prover/src/proxy/mod.rs
Outdated
/// Note that the request is not removed from the queue here. It will be returned later in | ||
/// [Self::logging()] once the worker processes the it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove this part as the dequeue now happens here.
bin/tx-prover/src/proxy/mod.rs
Outdated
/// Gets an available worker and removes it from the list of available workers. | ||
/// | ||
/// If no worker is available, it will return None. | ||
pub async fn get_available_worker(&self) -> Option<Backend> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we rename this to pop_available_worker
just to make it clear that it's a get+remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
much better
a31360a
to
d11c8e8
Compare
closes #947