-
Notifications
You must be signed in to change notification settings - Fork 19
/
README.pipe
107 lines (68 loc) · 7.1 KB
/
README.pipe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
Overview
========
A "pipe" method is a special kind of method that returns multiple values. The server can send as many responses and errors as it wants, all with the same ID as the original pipe call.
Since JSON-RPC is bidirectional, the same thing could be done by sending notifications back to the client, and using some application-level mechanism to tie the notifications to the original method call, but this can get clumsy.
The motivation for building this wasn't so much that it's generically useful--although I believe that it is--but that I'm migrating a product (TuneUp) from a custom JSON-RPC-like mechanism to JSON-RPC, and it makes extensive use of pipes in its current implementation.
Protocol
========
At the protocol level, pipes look just like regular method calls, except that they may have any number of responses or errors to a single request, instead of exactly one. There is no way to distinguish between pipe calls and method calls; it's up to the application (on both sides). This is somewhat analogous to the fact that a Python function that yields looks just like a regular function; it's up to the caller to know that it has to iterate over responses instead of expecting a single response.
Needless to say, multiple responses are an extension to JSON-RPC, which means any application protocol that uses pipes is not standard JSON-RPC (unless for some reason every pipe call happens to have exactly one response or error, in which case there's no good reason to use them). If others find it useful, it might be worth proposing the extension to be added to the standard, but at this point I haven't done so.
Pipes do not depend on bidirectional JSON-RPC. In fact, that's one of the motivations for adding them: it's easier to modify most libraries to handle pipes than to add bidirectionality.
Pipes should work fine over HTTP, although I haven't tried that yet (because I don't have a need).
Completion
==========
There is no way to mark that a pipe is done. The server side simply stops send responses. If the client is still expecting them, it will wait forever. (Also, the client side has to close the pipe, or the Request object stays alive. Also note that there's also no way for a client to "drop" a pipe before the server is done, except by dropping the entire connection, of course.)
There are two reasonable ways for the client to know that a pipe is done.
Pre-Determined completion
-------------------------
If the client knows in advance how many responses to expect, you don't need any kind of message. For example, often the client sends an array of parameters, and gets exactly one response or error per parameter. The client can then just close the pipe after it's gotten a response for each.
Completion message
------------------
If the number of responses isn't known in advance, you have to build some kind of mechanism in at the application level to let the client know that the pipe is done.
(In TuneUp, this is generally done by including an optional "status" member of each response, with an optional "completed" boolean in the status; if completed is true, the pipe is done. The status also often include "processed" and "total" to allow the client to, e.g., show a progress bar.)
Use
===
Server
------
To define a pipe message on the server side, just create a generator function instead of a normal function:
class ServerHandler(BaseHandler):
def time(self):
return time.time()
def tick(self, delay, count=1):
for i in range(count):
yield time.time()
time.sleep(delay)
Client
------
To call a pipe on the client side, you must use the "pipe" async call, which returns a Response object just as for a normal "method" call, but each time you read the value property, it gives you a new response (or blocks for a new one). For example:
r = c.pipe.tick(1000, 5):
for i in range(5):
print "Tick:", r.value
r.close()
Note the close() on the end. Even if the server side closes a pipe (by returning from the generator), the client side has no way of knowing about that, so if you don't close it explicitly, it will stick around until it's garbage collected (probably when the connection is closed). (Normal call, method, and notify calls are still "auto-closing", but pipe is not.)
Of course you can use closing to handle that:
with contextlib.closing(c.pipe.tick(1000, 5)) as r:
for i in range(5):
print "Tick:", r.value
A pipe can also be used as an iterator:
for t in itertools.islice(c.method.tick(1000, 5), 5):
print "Tick:", t.value
(However, as compact and readable as that looks, note that it doesn't handle closing the pipe. You could write an islice_then_close function if you were doing this kind of thing frequently.)
Implementation
==============
To make bjsonrpc handle this, Connect.dispatch_item_single no longer calls a monolithic _dispatch_method function that just returns a dict to send the client; instead, it calls _find_method, which can return:
* A generator function, in which case dispatch_item_single iterates the
generator, calling _send_response once for each entry,
* A regular function, in which case dispatch_item_single calls it and
calls _send_response with the result,
* An error string, in which case dispatch_item_single calls _send_error
with the result, or
* None, in which case dispatch_item_single does nothing.
This means Request no longer holds a single response, but instead a queue of responses. Each time you read the value, the first one is pulled off and returned. (This means an ordinary method call's value can only be checked once, which is a change from stock bjsonrpc. It also might be a bit less efficient, because we've got Queue's thread-safety on top of the explicit thread stuff.)
There is no equivalent to the simple sync "call" mechanism for pipe methods, because it doesn't make much sense--it would have to return an iterator, which means you'd get the exact same thing as with "method". If you do use call on a pipe, you'll just get the first value, and all the rest will be lost.
Note that most services that implement pipes will probably want to use bjsonrpc's 'threaded' option--that isn't strictly-speaking necessary, but the way bjsonrpc works, any service that takes a long time to respond to any method, or needs to do anything truly asynchronous, pretty much needs threads, and it's hard to imagine a good reason for pipes that wasn't either long-lasting or asynchronous.
Future
======
It might be nice if the pipe were a context manager on its own, like a file, but I haven't added that.
It wouldn't be that hard to add a "close" mechanism at the protocol level. The main reason I haven't done so is that it's not necessary for my use case, as TuneUp's pre-existing JSON-RPC-esque mechanism had pipes that were closed at the application level.
There's also no reason the protocol couldn't signal pipes as different from method calls, e.g., by sending "pipe" instead of "method", but again, it wasn't necessary for my use case. Also, the analogy with Python generator functions seemed compelling to me. You don't call a generator function by saying g.generate(), you just say g().