-
Notifications
You must be signed in to change notification settings - Fork 7
/
ipc_server_with_database.rs
404 lines (341 loc) · 13.2 KB
/
ipc_server_with_database.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
#![allow(clippy::unused_io_amount)]
//! A fully functioning example of a web server, an on-disk database, and an http client, with
//! subprocesses communicating via a unix socket.
//!
//! We start a web server thread, which in turn "opens a connection" to a database server, which is
//! just sqlite running in another thread. The server has a /read and a /write endpoint. The write
//! endpoint writes a message to the db, and the read endpoint returns all the messages in the db.
//!
//! We run two clients that write to the database, then have a third client read back the messages
//! from it. Additionally, the read client also makes a request to a real https server to
//! demonstrate DNS and HTTPS support.
use extrasafe::builtins::{danger_zone::Threads, Networking, SystemIO};
use extrasafe::SafetyContext;
use warp::Filter;
use std::os::unix::net::UnixDatagram;
use std::os::unix::process::CommandExt;
use std::sync::{Arc, Mutex};
/// This is essentially the wire format for our DB connection
enum DBMsg {
// Send a list of all the messages in the db over the channel.
List,
// Write a message to the db.
Write(String),
}
type DbConn = Arc<Mutex<UnixDatagram>>;
fn run_subprocess(cmd: &[&str]) -> std::process::Child {
let exe_path = std::env::current_exe().unwrap();
let args: Vec<_> = ["run_main", "--", "--sub"].iter().chain(cmd.iter()).collect();
std::process::Command::new(exe_path.to_str().unwrap())
.arg0(format!("{}-subprocess", cmd[0]))
.args(&args)
.spawn()
.map_err(|e| format!("subcommand `{}` failed to start: {:?}", cmd.join(" "), e))
.unwrap()
}
fn with_db(
db: DbConn,
) -> impl Filter<Extract = (DbConn,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || db.clone())
}
fn run_webserver(db_socket_path: &str, our_socket_path: &str) {
// we open the socket ahead of time and share it among all webserver http threads (just like
// with a real db connection we could have a pool of them instead of a single one)
println!("webserver thread connecting to db unix socket");
let socket = UnixDatagram::bind(our_socket_path).expect("failed to create unix dg socket");
socket.connect(db_socket_path).expect("failed to connect to db socket");
let db_socket: DbConn = Arc::new(Mutex::new(socket));
// set up runtime
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build().unwrap();
let listener = std::net::TcpListener::bind("127.0.0.1:5576").unwrap();
// extrasafe context
SafetyContext::new()
.enable(Networking::nothing()
.allow_running_tcp_servers()).unwrap()
.apply_to_current_thread()
.unwrap();
// set up server routes
let routes = warp::path("write")
.and(warp::post())
.and(warp::body::bytes())
.and(with_db(db_socket.clone()))
.map(|param: bytes::Bytes, db_conn: DbConn| {
println!("webserver got write request");
let conn = db_conn.lock().unwrap();
let s = std::str::from_utf8(¶m).unwrap();
conn.send(format!("write {}", s).as_bytes())
.expect("failed to send write message to db");
"ok"
})
.or(warp::path("read")
.and(warp::get())
.and(with_db(db_socket))
.map(|db_conn: DbConn| {
println!("webserver got read request");
let conn = db_conn.lock().unwrap();
println!("sending list command to db");
conn.send("list".as_bytes())
.expect("failed to send read message to db");
println!("waiting for response from db");
let mut buf: [u8; 100] = [0; 100];
conn.recv(&mut buf)
.expect("failed to read response from db");
println!("got response from db");
let messages = String::from_utf8(buf.to_vec())
.unwrap()
.trim_end_matches('\0')
.to_string();
messages
})
);
let svc = warp::service(routes);
let make_svc = hyper::service::make_service_fn(move |_| {
let warp_svc = svc.clone();
async move { Ok::<_, std::convert::Infallible>(warp_svc) }
});
// https://docs.rs/tokio/latest/tokio/net/struct.TcpListener.html#method.from_std
// requires a runtime to be active when converting the std::net::TcpListener to a tokio
// listener
let _in_runtime = runtime.enter();
let server = hyper::Server::from_tcp(listener).unwrap();
println!("Server about to start listening...");
// block on server
runtime.block_on(server.serve(make_svc)).unwrap();
}
fn run_db(socket_path: &str) {
// bind socket connection to get data packets on
let sock = UnixDatagram::bind(socket_path).unwrap();
// open sqlite database
let dir = tempfile::tempdir().unwrap();
let mut path = dir.path().to_path_buf();
path.push("testdb.sql3");
let db = rusqlite::Connection::open(&path).unwrap();
// Enabling either of these and then running a transaction will create the journal/wal files,
// so that we don't have to enable opening files in our db thread after initialization.
db.pragma_update(None, "locking_mode", "exclusive").unwrap();
db.pragma_update(None, "journal_mode", "wal").unwrap();
db.execute("CREATE TABLE messages ( msg TEXT NOT NULL );", []).unwrap();
let mut get_rows = db.prepare("SELECT msg FROM messages;").unwrap();
let mut insert_row = db.prepare("INSERT INTO messages VALUES (?)").unwrap();
// after opening connection socket and db file, set extrasafe context
SafetyContext::new()
.enable(Networking::nothing()
.allow_connect()
.yes_really()
.allow_running_unix_servers()
).unwrap()
.enable(SystemIO::nothing()
.allow_read()
.allow_write()
.allow_metadata()
.allow_ioctl()
.allow_close()).unwrap()
.enable(Threads::nothing()
.allow_sleep().yes_really()).unwrap()
.apply_to_current_thread()
.unwrap();
println!("database opened at {:?}", &path);
println!("db server waiting for messages");
loop {
println!("db server waiting for unix socket message");
let mut buf: [u8; 100] = [0; 100];
let (count, return_addr) = sock.recv_from(&mut buf)
.expect("failed reading request to db server");
let buf = String::from_utf8(buf[..count].to_vec())
.expect("unix socket message was not valid utf8");
println!("db got unix socket message: '{}'", buf);
let msg: DBMsg;
if buf == "list" {
msg = DBMsg::List;
}
else if buf.starts_with("write") {
msg = DBMsg::Write(buf[6..].to_string());
}
else {
panic!("unknown message recieved in db: {}", buf);
}
match msg {
DBMsg::List => {
let messages: Vec<String> = get_rows
.query_map([], |row| row.get(0)).unwrap()
.map(Result::unwrap)
.collect();
sock.send_to_addr(messages.join("\n").as_bytes(), &return_addr)
.expect("failed writing response from db server");
}
DBMsg::Write(s) => {
insert_row.execute([s]).unwrap();
}
}
}
}
fn run_client_write(msg: &str) {
// set up runtime
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
// Set up extrasafe context
SafetyContext::new()
.enable(Networking::nothing()
.allow_start_tcp_clients()).unwrap()
.apply_to_current_thread()
.unwrap();
println!("about to make request with msg {}", msg);
// clone to move into async block
let msg = msg.to_string();
runtime.block_on(async {
let client = reqwest::Client::new();
let res = client
.post("http://127.0.0.1:5576/write")
.body(msg)
.send()
.await;
assert!(
res.is_ok(),
"Error writing to server db: {:?}",
res.unwrap_err()
);
let text = res.unwrap().text().await.unwrap();
assert_eq!(text, "ok");
});
}
fn run_client_read() {
// set up runtime
let runtime = tokio::runtime::Builder::new_current_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let client = reqwest::Client::new();
// enable extrasafe context
let ctx = SafetyContext::new()
.enable(Networking::nothing()
// Necessary for DNS
.allow_start_udp_servers().yes_really()
.allow_start_tcp_clients()
).unwrap()
// For some reason only if we make two requests with a client does it use multiple threads,
// so we only need them in the reader thread rather than the writer.
.enable(Threads::nothing()
.allow_create()).unwrap();
#[cfg(not(feature = "landlock"))]
let ctx = ctx.enable(
SystemIO::nothing()
.allow_open_readonly()
.allow_read()
.allow_metadata()
.allow_close(),
).unwrap();
#[cfg(feature = "landlock")]
let ctx = ctx.enable(
SystemIO::nothing()
.allow_dns_files()
.allow_ssl_files()
).unwrap();
ctx.apply_to_current_thread()
.unwrap();
// make request
runtime.block_on(async {
// Show that we can resolve dns and do ssl. Data returned isn't checked or used anywhere,
// we just get it.
let resp = client.get("https://example.org/").send().await.unwrap();
let res = resp.text().await;
assert!(
res.is_ok(),
"failed getting example.org response: {:?}",
res.unwrap_err()
);
println!("about to make read request to webserver");
let res = client.get("http://127.0.0.1:5576/read").send().await;
assert!(
res.is_ok(),
"Error reading from server db: {:?}",
res.unwrap_err()
);
let text = res.unwrap().text().await.unwrap();
assert_eq!(text, "hello\nextrasafe");
println!("got response: {}", text);
});
}
#[cfg(target_env = "musl")]
fn main() {
println!("without building sqlite from source we can't link to sqlite, but it would be annoying to build it in CI.");
}
#[cfg(not(target_env = "musl"))]
fn main() {
let args: Vec<String> = std::env::args().collect();
println!("main args: {:?}", args);
if args.contains(&"--sub".into()) {
// If args is "example_prog [possible other options] --sub subcommand subargs...", run the subcommand
if let Some(idx) = args.iter().position(|s| s == "db") {
run_db(&args[idx+1]);
}
else if let Some(idx) = args.iter().position(|s| s == "webserver") {
run_webserver(&args[idx+1], &args[idx+2]);
}
else if args.contains(&"read_client".into()) {
run_client_read();
}
else if let Some(idx) = args.iter().position(|s| s == "write_client") {
run_client_write(&args[idx+1]);
}
return;
}
// otherwise, spawn db, spawn webserver as subprocesses which communicate over a unix socket.
// then spawn write subprocesses and read subprocesses sequentially, waiting for them to exit
// each time.
let dir = tempfile::TempDir::new().unwrap();
let mut db_path = dir.path().to_path_buf();
db_path.push("db.sock");
let mut web_path = dir.path().to_path_buf();
web_path.push("web.sock");
// -- Spawn database, spawn http server, waiting a bit for each to finish getting ready.
let mut db_child = run_subprocess(&["db", db_path.to_str().unwrap()]);
std::thread::sleep(std::time::Duration::from_millis(100));
let mut webserver_child = run_subprocess(&["webserver", db_path.to_str().unwrap(), web_path.to_str().unwrap()]);
std::thread::sleep(std::time::Duration::from_millis(100));
// -- write "hello" to db
let res1 = run_subprocess(&["write_client", "hello"]).wait();
assert!(
res1.is_ok(),
"client1 failed to finish: {:?}",
res1.unwrap_err()
);
let status = res1.unwrap();
assert!(
status.success(),
"client1 exited unsuccessfully: {:?}",
status
);
// -- write "extrasafe" to db
let res2 = run_subprocess(&["write_client", "extrasafe"]).wait();
assert!(
res2.is_ok(),
"client2 failed to finish: {:?}",
res2.unwrap_err()
);
let status = res2.unwrap();
assert!(
status.success(),
"client2 exited unsuccessfully: {:?}",
status
);
// -- read back, check messages are there in order
let res3 = run_subprocess(&["read_client"]).wait();
assert!(
res3.is_ok(),
"client3 failed to finish: {:?}",
res3.unwrap_err()
);
let status = res3.unwrap();
assert!(
status.success(),
"client3 exited unsuccessfully: {:?}",
status
);
db_child.kill().unwrap();
webserver_child.kill().unwrap();
}