-
Notifications
You must be signed in to change notification settings - Fork 0
/
historydb.py
260 lines (227 loc) · 8.89 KB
/
historydb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import sqlite3, time
from pythreader import Primitive, synchronized
class _ScannerRecord(object):
def __init__(self, server, location, t, nfiles, nnew, error):
self.Server = server
self.Location = location
self.T = t
self.NFiles = nfiles
self.NNew = nnew
self.Error = error
class _HistoryDB(Primitive):
def __init__(self, filename):
Primitive.__init__(self)
self.FileName = filename
self.createTables()
class DBConnectionGuard(object):
def __init__(self, filename):
self.DBConnection = sqlite3.connect(filename)
def __enter__(self):
return self.DBConnection
def __exit__(self, exc_type, exc_value, traceback):
self.DBConnection.close()
self.DBConnection = None
def dbconn(self):
return _HistoryDB.DBConnectionGuard(self.FileName)
def fetch_iter(self, c):
tup = c.fetchone()
while tup:
yield tup
tup = c.fetchone()
@synchronized
def createTables(self):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("""
create table if not exists config(
name text,
value, text,
primary key (name)
)
""")
c.execute("""
create table if not exists file_log(
filename text,
t float,
event text,
info text,
size bigint,
elapsed float,
primary key (filename, t))
""")
c.execute("""
create index if not exists file_log_fn_event_inx on file_log(filename, event)
""")
c.execute("""
create table if not exists scanner_log(
server text,
location text,
t float,
nfiles int,
nnew int,
error text,
primary key(server, location, t)
)
""")
@synchronized
def getConfig(self):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("select name, value from config", ())
return { name:value for name, value in c.fetchall() }
@synchronized
def setConfig(self, name, value):
with self.dbconn() as conn:
c = conn.cursor()
config = self.getConfig()
if name in config:
c.execute("update config set value=? where name=?", (str(value), name))
else:
c.execute("insert into config(name, value) values(?, ?)", (name, str(value)))
conn.commit()
@synchronized
def add_scanner_record(self, server, location, t, n, nnew):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("insert into scanner_log(server, location, t, nfiles, nnew) values(?,?,?,?,?)",
(server, location, t, n, nnew)
)
conn.commit()
@synchronized
def scannerHistorySince(self, t=0):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("""select server, location, t, nfiles, nnew, error
from scanner_log
where t >= ?
order by server, location, t""", (t,)
)
return [_ScannerRecord(*tup) for tup in c.fetchall()]
@synchronized
def fileQueued(self, filename):
#self.log("file queued: %s" % (filename,)
self.addFileRecord(filename, "queued", "")
@synchronized
def fileSucceeded(self, filename, size, elapsed):
self.addFileRecord(filename, "done", "", size=size, elapsed=elapsed)
@synchronized
def fileFailed(self, filename, reason):
self.addFileRecord(filename, "failed", reason)
@synchronized
def addFileRecord(self, filename, event, info, size=None, elapsed=None):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("""
insert into file_log(filename, t, event, info, size, elapsed) values(?,?,?,?,?,?)
""", (filename, time.time(), event, info, size, elapsed))
conn.commit()
@synchronized
def fileDone(self, fn):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("select * from file_log where filename=? and event='done' limit 1", (fn,))
done = not c.fetchone() is None
#print "fileDone(%s) = %s" % (fn, done)
return done
@synchronized
def removeFileDone(self, fn):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("delete from file_log where filename=? and event='done'", (fn,))
conn.commit()
#c.execute("commit")
@synchronized
def historySince(self, t):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("select filename, t, event, info from file_log where t >= ?", (t,))
return c.fetchall()
@synchronized
def doneHistory(self, t):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("""select filename, event, t, size, elapsed
from file_log
where t >= ?
and event = 'done'
order by t
""", (t,))
return c.fetchall()
@synchronized
def purgeOldRecords(self, before):
with self.dbconn() as conn:
c = conn.cursor()
c.execute("delete from file_log where t < ?", (before,))
c.execute("delete from scanner_log where t < ?", (before,))
conn.commit()
@synchronized
def eventCounts(self, event_types, bin, since_t = None):
event_types = ",".join(["'%s'" % (t,) for t in event_types])
since_t = since_t or 0
with self.dbconn() as conn:
c = conn.cursor()
c.execute("""select event, round(t/?)*? as tt, count(*)
from file_log
where event in (%s) and t > ?
group by event, tt
order by event, tt""" % (event_types,), (bin, bin, since_t))
return c.fetchall()
@synchronized
def getEvents(self, event_types, since_t = None):
event_types = ",".join(["'%s'" % (t,) for t in event_types])
since_t = since_t or 0
with self.dbconn() as conn:
c = conn.cursor()
c.execute("""select filename, event, t, size, elapsed
from file_log
where event in (%s) and t > ?
order by t""" % (event_types,), (since_t,))
return c.fetchall()
# group by filename
@synchronized
def historyForFile(self, filename):
# returns [(t, event, info),...]
with self.dbconn() as conn:
c = conn.cursor()
c.execute("""
select t, event, info
from file_log
where filename = ?
order by t
""", (filename,))
return c.fetchall()
@synchronized
def historyByFile(self, filename=None, window=3*24*3600):
# returns {filename: [(t, event, info),...]}
with self.dbconn() as conn:
t0 = time.time() - window
c = conn.cursor()
if filename is None:
c.execute("""
select filename, t, event, info
from file_log
where t >= ?
order by filename, t
""", (t0,))
else:
c.execute("""
select filename, t, event, info
from file_log
where filename = ? and t >= ?
order by filename, t
""", (filename, t0))
records = []
filename = None
lst = None
for fn, t, event, info in self.fetch_iter(c):
#print fn, t, event, info
if fn == filename:
lst.append((t, event, info))
else:
filename = fn
lst = [(t, event, info)]
records.append((filename, lst))
out = sorted(records, key=lambda x: -x[1][-1][0]) # reversed by timestamp of the last element in the record list
return out
def open(path):
return _HistoryDB(path)