-
Notifications
You must be signed in to change notification settings - Fork 65
Tame rendezvous windowing
When programming Web applications, or other distributed systems, I often find myself "RPC Windowing." That is, I have 1,000 RPCs to make but want to be polite and not blast them all at once. So the obvious solution is to set a window (like 5) and commit to having at most that many RPCs outstanding at a given time.
Here is a sample program (ex4.T).
#include "tame.h"
#include "arpc.h"
#include "parseopt.h"
#include "ex_prot.h"
tamed static void
dostuff (const str &h, int port, cbb cb)
{
// declare all of your "stack" variables here
tvars {
int fd (99999);
ptr<axprt_stream> x;
ptr<aclnt> cli;
vec<int> res;
vec<clnt_stat> errs;
int ntot (40), window_sz (5), id;
int nsent (0), nrecv (0);
bool err (false);
rendezvous_t<int> rv;
bool ret (true);
int return_id;
}
// Call tcpconnect, and block until it returns; when it does return,
// assign the local variable 'fd' to the result.
twait { tcpconnect (h, port, mkevent(fd)); }
if (fd < 0) {
warn ("%s:%d: connection failed: %m\n", h.cstr(), port);
err = true;
} else {
res.setsize (ntot);
errs.setsize (ntot);
x = axprt_stream::alloc (fd);
cli = aclnt::alloc (x, ex_prog_1);
// Now do the pipelined/windowed RPCs
while (nrecv < ntot) {
if (nsent < ntot && nsent - nrecv < window_sz) {
// Launch new calls since there is room in the window!
cli->call (EX_RANDOM, NULL, &res[nsent],
mkevent (rv, nsent, errs[nsent]));
nsent++;
} else {
// Harvest
twait (rv, id);
if (errs[id]) {
err = true;
warn << "RPC error: " << errs[id] << "\n";
} else {
warn << "Success " << id << ": " << res[id] << "\n";
}
nrecv++;
}
}
warn << "All done...\n";
}
(*cb) (!err);
}
static void finish (bool rc)
{
exit (rc ? 0 : -1);
}
int
main (int argc, char *argv[])
{
int port;
if (argc != 3 || !convertint (argv[2], &port))
fatal << "usage: ex2 <hostname> <port>\n";
dostuff (argv[1], port, wrap (finish));
amain ();
}
The details of establishing an RPC-over-TCP connection are exactly as before. The difference comes with how the asynchronous calls are fired off, and later joined.
cli->call (EX_RANDOM, NULL, &res[nsent],
mkevent (rv, nsent, errs[nsent]));
Asynchronous calls launched outside of twait {...}
block are first associated
with a pointer to a rendezvous.
A rendezvous_t
is a simple object that will
coordinate launches and joins for a given group of events (see
libtame/tame_core.h
for the class definition of a rendezvous_t
). Each
rendezvous_t
is also associated with data types used in distinguishing
callbacks from each other when they are eventually joined. In the given case,
we are launching window_sz
RPCs, labelled by an integer from 0 to _window_sz
- 1_. A distinct value for
i
is associated with each call, and consequently, with each callback.
As before, the callback argument is given with mkevent(..)
syntax, but with
an extra arguments, to express the rendezvous the new callback belongs to, and
also any bound-int values. When this function eventually "returns" by calling
its callback, it will set the status of the RPC to the _i_th slot of the vector
errs.
Once all RPCs in the windows are launched, control will continue to the
subsequent twait
statement:
while (nrecv < ntot) {
if (nsent < ntot && nsent - nrecv < window_sz) {
// Launch new calls since there is room in the window!
cli->call (EX_RANDOM, NULL, &res[nsent],
mkevent (rv, nsent, errs[nsent]));
nsent++;
} else {
// Harvest
twait (rv, id);
if (errs[id]) {
err = true;
warn << "RPC error: " << errs[id] << "\n";
} else {
warn << "Success " << id << ": " << res[id] << "\n"
}
nrecv++;
}
}
The condition in the while
loops returns true so long as there are calls that
remain to be received. Thus, assuming that window_sz
and ntot
are greater
than 0, control will reach the twait();
statement at least once. As the
comments suggest, control will appear to block at the twait
statement until
one of the RPC calls from above completes. When control passes the twait();
statement, the closure variable id
will be set with the values bounds to the
joined asynchronous call at call time. If the call launched when i==2
above,
then id
will have the value 2. In this way, code after the twait();
statement can respond to the specific RPC that returned.
In the above example, the declaration of the rendezvous_t
and all references
to it are in the same function, but keeping all references to a rendezvous_t
within a single function is not required. New callbacks can be added to
rendezvous and can be subtracted via twait
from anywhere where the given
rendezvous is in scope. Of course, rendezvous_t
s are regular C++ objects, so
they can be passed to functions, declared extern
, etc.
It is suggested that all coordination variables created with mkevent
calls
are later twait
ed on, and I have trouble thinking of a case in which one
would not want to twait
on all outstanding calls. However, programs should
still work as expected if zombie calls are leftover. That is, the rendezvous
will only be deallocated once all of its callbacks have been called.