-
Notifications
You must be signed in to change notification settings - Fork 344
/
debugpack.rs
143 lines (126 loc) · 4.63 KB
/
debugpack.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use std::{
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use rusqlite::{backup, params, Connection};
use serde::{Deserialize, Serialize};
use smol::channel::Sender;
use structopt::StructOpt;
use crate::config::CommonOpt;
#[derive(Debug, StructOpt, Deserialize, Serialize, Clone)]
pub struct DebugPackOpt {
#[structopt(flatten)]
pub common: CommonOpt,
#[structopt(long)]
pub export_to: String, // path of file to backup DB to
}
pub struct DebugPack {
conn: Arc<Mutex<Connection>>,
send_log: Sender<String>,
send_timeseries: Sender<(String, f64)>,
}
pub static START_TIME: Lazy<Instant> = Lazy::new(Instant::now);
impl DebugPack {
pub fn new(db_path: &str) -> anyhow::Result<Self> {
// open database & create tables if not exist
let conn = Connection::open(db_path)?;
conn.execute(
"create table if not exists timeseries (
timestamp timestamp,
key text,
value real)",
[],
)?;
conn.execute(
"create table if not exists loglines (
timestamp timestamp,
line text)",
[],
)?;
conn.execute(
"create index if not exists loglines_idx on loglines (timestamp)",
[],
)?;
conn.execute(
"create index if not exists timeseries_idx on loglines (timestamp)",
[],
)?;
conn.execute(
"DELETE FROM loglines WHERE rowid NOT IN (SELECT rowid FROM loglines ORDER BY timestamp DESC LIMIT 2000) OR datetime(timestamp, '+1 day') < datetime()",
params![],
)?;
conn.execute(
"delete from timeseries where datetime(timestamp, '+1 day') < datetime()",
params![],
)?;
let (send_log, recv_log) = smol::channel::bounded(10);
let db_path2 = db_path.to_string();
std::thread::spawn(move || {
let conn = Connection::open(db_path2).unwrap();
while let Ok(next) = recv_log.recv_blocking() {
if let Err(err) = conn.execute(
"insert into loglines (timestamp, line) values (datetime(), ?1)",
params![next],
) {
log::error!("cannot write logline: {}", err)
}
}
});
let (send_timeseries, recv_timeseries) = smol::channel::bounded(10);
let db_path2 = db_path.to_string();
std::thread::spawn(move || {
let conn = Connection::open(db_path2).unwrap();
while let Ok((key, value)) = recv_timeseries.recv_blocking() {
if let Err(err) = conn.execute(
"insert into timeseries (timestamp, key, value) values (datetime(), ?1, ?2)",
params![key, value],
) {
log::error!("cannot write logline: {}", err)
}
}
});
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
send_log,
send_timeseries,
})
}
pub fn add_logline(&self, logline: &str) {
let _ = self.send_log.try_send(logline.into());
}
pub fn add_timeseries(&self, key: &str, value: f64) {
let _ = self.send_timeseries.try_send((key.to_string(), value));
}
pub fn backup(&self, dest: &str) -> anyhow::Result<()> {
let mut dst = Connection::open(dest)?;
let src = self.conn.lock();
let backup = backup::Backup::new(&src, &mut dst)?;
backup.run_to_completion(100, Duration::from_millis(1), None)?;
Ok(())
}
pub async fn get_loglines(
&self,
after: SystemTime,
) -> anyhow::Result<Vec<(SystemTime, String)>> {
let conn = self.conn.clone();
smol::unblock(move || {
let conn = conn.lock();
let mut stmt = conn.prepare("select cast(strftime('%s', timestamp) as integer), line from loglines where timestamp > datetime(?1, 'unixepoch') limit 2000")?;
let after_timestamp = after.duration_since(SystemTime::UNIX_EPOCH)?.as_secs();
let rows = stmt.query_map(params![after_timestamp], |row| {
let timestamp: i64 = row.get(0)?;
let line: String = row.get(1)?;
let timestamp = SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp as u64);
Ok((timestamp, line))
})?;
let mut results = Vec::new();
for row_result in rows {
results.push(row_result?);
}
Ok(results)
})
.await
}
}