forked from DataDog/dd-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transaction.py
221 lines (173 loc) · 7.6 KB
/
transaction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# stdlib
from datetime import datetime, timedelta
import logging
from operator import attrgetter
import sys
import time
# project
from checks.check_status import ForwarderStatus
from util import get_tornado_ioloop, plural
log = logging.getLogger(__name__)
FLUSH_LOGGING_PERIOD = 20
FLUSH_LOGGING_INITIAL = 5
class Transaction(object):
def __init__(self):
self._id = None
self._error_count = 0
self._next_flush = datetime.utcnow()
self._size = None
def get_id(self):
return self._id
def set_id(self, new_id):
assert self._id is None
self._id = new_id
def inc_error_count(self):
self._error_count = self._error_count + 1
def get_error_count(self):
return self._error_count
def get_size(self):
if self._size is None:
self._size = sys.getsizeof(self)
return self._size
def get_next_flush(self):
return self._next_flush
def compute_next_flush(self,max_delay):
# Transactions are replayed, try to send them faster for newer transactions
# Send them every MAX_WAIT_FOR_REPLAY at most
td = timedelta(seconds=self._error_count * 20)
if td > max_delay:
td = max_delay
newdate = datetime.utcnow() + td
self._next_flush = newdate.replace(microsecond=0)
def time_to_flush(self,now = datetime.utcnow()):
return self._next_flush < now
def flush(self):
raise NotImplementedError("To be implemented in a subclass")
class TransactionManager(object):
"""Holds any transaction derived object list and make sure they
are all commited, without exceeding parameters (throttling, memory consumption) """
def __init__(self, max_wait_for_replay, max_queue_size, throttling_delay):
self._MAX_WAIT_FOR_REPLAY = max_wait_for_replay
self._MAX_QUEUE_SIZE = max_queue_size
self._THROTTLING_DELAY = throttling_delay
self._flush_without_ioloop = False # useful for tests
self._transactions = [] # List of all non commited transactions
self._total_count = 0 # Maintain size/count not to recompute it everytime
self._total_size = 0
self._flush_count = 0
self._transactions_received = 0
self._transactions_flushed = 0
# Global counter to assign a number to each transaction: we may have an issue
# if this overlaps
self._counter = 0
self._trs_to_flush = None # Current transactions being flushed
self._last_flush = datetime.utcnow() # Last flush (for throttling)
# Track an initial status message.
ForwarderStatus().persist()
def get_transactions(self):
return self._transactions
def print_queue_stats(self):
log.debug("Queue size: at %s, %s transaction(s), %s KB" %
(time.time(), self._total_count, (self._total_size/1024)))
def get_tr_id(self):
self._counter = self._counter + 1
return self._counter
def append(self,tr):
# Give the transaction an id
tr.set_id(self.get_tr_id())
# Check the size
tr_size = tr.get_size()
log.debug("New transaction to add, total size of queue would be: %s KB" %
((self._total_size + tr_size) / 1024))
if (self._total_size + tr_size) > self._MAX_QUEUE_SIZE:
log.warn("Queue is too big, removing old transactions...")
new_trs = sorted(self._transactions,key=attrgetter('_next_flush'), reverse = True)
for tr2 in new_trs:
if (self._total_size + tr_size) > self._MAX_QUEUE_SIZE:
self._transactions.remove(tr2)
self._total_count = self._total_count - 1
self._total_size = self._total_size - tr2.get_size()
log.warn("Removed transaction %s from queue" % tr2.get_id())
# Done
self._transactions.append(tr)
self._total_count += 1
self._transactions_received += 1
self._total_size = self._total_size + tr_size
log.debug("Transaction %s added" % (tr.get_id()))
self.print_queue_stats()
def flush(self):
if self._trs_to_flush is not None:
log.debug("A flush is already in progress, not doing anything")
return
to_flush = []
# Do we have something to do ?
now = datetime.utcnow()
for tr in self._transactions:
if tr.time_to_flush(now):
to_flush.append(tr)
count = len(to_flush)
should_log = self._flush_count + 1 <= FLUSH_LOGGING_INITIAL or (self._flush_count + 1) % FLUSH_LOGGING_PERIOD == 0
if count > 0:
if should_log:
log.info("Flushing %s transaction%s during flush #%s" % (count,plural(count), str(self._flush_count + 1)))
else:
log.debug("Flushing %s transaction%s during flush #%s" % (count,plural(count), str(self._flush_count + 1)))
self._trs_to_flush = to_flush
self.flush_next()
else:
if should_log:
log.info("No transaction to flush during flush #%s" % str(self._flush_count + 1))
else:
log.debug("No transaction to flush during flush #%s" % str(self._flush_count + 1))
if self._flush_count + 1 == FLUSH_LOGGING_INITIAL:
log.info("First flushes done, next flushes will be logged every %s flushes." % FLUSH_LOGGING_PERIOD)
self._flush_count += 1
ForwarderStatus(
queue_length=self._total_count,
queue_size=self._total_size,
flush_count=self._flush_count,
transactions_received=self._transactions_received,
transactions_flushed=self._transactions_flushed).persist()
def flush_next(self):
if len(self._trs_to_flush) > 0:
td = self._last_flush + self._THROTTLING_DELAY - datetime.utcnow()
# Python 2.7 has this built in, python < 2.7 don't...
if hasattr(td,'total_seconds'):
delay = td.total_seconds()
else:
delay = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10.0**6
if delay <= 0:
tr = self._trs_to_flush.pop()
self._last_flush = datetime.utcnow()
log.debug("Flushing transaction %d" % tr.get_id())
try:
tr.flush()
except Exception,e :
log.exception(e)
self.tr_error(tr)
self.flush_next()
else:
# Wait a little bit more
tornado_ioloop = get_tornado_ioloop()
if tornado_ioloop._running:
tornado_ioloop.add_timeout(time.time() + delay,
lambda: self.flush_next())
elif self._flush_without_ioloop:
# Tornado is no started (ie, unittests), do it manually: BLOCKING
time.sleep(delay)
self.flush_next()
else:
self._trs_to_flush = None
def tr_error(self,tr):
tr.inc_error_count()
tr.compute_next_flush(self._MAX_WAIT_FOR_REPLAY)
log.warn("Transaction %d in error (%s error%s), it will be replayed after %s" %
(tr.get_id(), tr.get_error_count(), plural(tr.get_error_count()),
tr.get_next_flush()))
def tr_success(self,tr):
log.debug("Transaction %d completed" % tr.get_id())
self._transactions.remove(tr)
self._total_count -= 1
self._total_size -= tr.get_size()
self._transactions_flushed += 1
self.print_queue_stats()