-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
streaming-rpc.js
114 lines (95 loc) · 3.36 KB
/
streaming-rpc.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// this example demonstrates how to consume a streaming rpc service.
/*eslint-disable strict, no-console*/
var protobuf = require("..");
// Load a definition with services:
var root = protobuf.Root.fromJSON({
nested: {
Greeter: {
methods: {
"SayHello": {
requestType: "Hello",
requestStream: true,
responseType: "World",
responseStream: true
}
}
},
Hello: {
fields: {
name: {
type: "string",
id: 1
}
}
},
World: {
fields: {
message: {
type: "string",
id: 1
}
}
}
}
});
// Get its types:
var Greeter = root.lookup("Greeter"),
Hello = root.lookup("Hello"),
World = root.lookup("World");
// Provide a stream-aware RPC implementation:
var greeter = Greeter.create(/* rpcImpl */ (function() { // API documentation: Service#create
var ended = false;
return function myRPCImpl(method, requestData, callback) {
if (ended)
return;
if (!requestData) {
ended = true;
return;
}
// in a real-world scenario, the client would now send requestData to a server using some
// sort of transport layer (i.e. http), wait for responseData and call the callback.
performRequestOverTransportChannel(requestData, function(responseData) {
callback(null, responseData);
});
};
})(), /* requestDelimited? */ true, /* responseDelimited? */ true);
// examplary server-side code for the sake of this example
function performRequestOverTransportChannel(requestData, callback) {
setTimeout(/* simulated delay */function() {
// 1. server decodes the request
var request = Hello.decodeDelimited(requestData);
// 2. server handles the request and creates a response
var response = { message: "Hello " + request.name };
setTimeout(/* simulated delay */function() {
// 3. server encodes and sends the response
callback(World.encodeDelimited(response).finish());
}, Math.random() * 250);
}, Math.random() * 250);
}
// Listen for events:
greeter.on("data", function(response, method) {
console.log("data in " + method.name + ":", response.message);
});
greeter.on("end", function() {
console.log("end");
});
greeter.on("error", function(err, method) {
console.log("error in " + method.name + ":", err);
});
// Call methods:
greeter.sayHello({ name: "one" });
greeter.sayHello(Hello.create({ name: "two" })); // or use runtime messages
// Listen to and emit your own events if you like:
greeter.on("status", function(code, text) {
console.log("custom status:", code, text);
});
greeter.emit("status", 200, "OK");
// And, if applicable, end the service when you are done:
setTimeout(function() {
greeter.end();
// ^ Signals rpcImpl that the service has been ended client-side by calling it with a null buffer.
// Likewise, rpcImpl can also end the stream by calling its callback with an explicit null buffer.
greeter.sayHello({ name: "three" }, function(err) {
console.error("this should fail: " + err.message);
});
}, 501);