forked from uroni/urbackup_backend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
PipeThrottler.cpp
83 lines (67 loc) · 1.63 KB
/
PipeThrottler.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include "PipeThrottler.h"
#include "Server.h"
#include "Interface/Mutex.h"
#include "stringtools.h"
#define DLOG(x) //x
PipeThrottler::PipeThrottler(size_t bps)
: throttle_bps(bps), curr_bytes(0), lastresettime(0)
{
mutex=Server->createMutex();
}
PipeThrottler::~PipeThrottler(void)
{
Server->destroy(mutex);
}
bool PipeThrottler::addBytes(size_t new_bytes, bool wait)
{
IScopedLock lock(mutex);
if(throttle_bps==0) return true;
int64 ctime=Server->getTimeMS();
if(ctime-lastresettime>1000)
{
lastresettime=ctime;
curr_bytes=0;
}
curr_bytes+=new_bytes;
int64 passed_time=ctime-lastresettime;
size_t maxRateTime=(size_t)(((_i64)curr_bytes*1000)/throttle_bps);
if(passed_time>0)
{
size_t bps=(size_t)(((_i64)curr_bytes*1000)/passed_time);
if(bps>throttle_bps)
{
unsigned int sleepTime=(unsigned int)(maxRateTime-passed_time);
if(sleepTime>0)
{
if(wait)
{
DLOG(Server->Log("Throttler: Sleeping for " + nconvert(sleepTime)+ "ms", LL_DEBUG));
Server->wait(sleepTime);
if(Server->getTimeMS()-lastresettime>1000)
{
curr_bytes=0;
lastresettime=Server->getTimeMS();
}
}
return false;
}
}
}
else if(curr_bytes>=throttle_bps)
{
if(wait)
{
DLOG(Server->Log("Throttler: Sleeping for " + nconvert(maxRateTime)+ "ms", LL_DEBUG));
Server->wait(static_cast<unsigned int>(maxRateTime));
}
curr_bytes=0;
lastresettime=Server->getTimeMS();
return false;
}
return true;
}
void PipeThrottler::changeThrottleLimit(size_t bps)
{
IScopedLock lock(mutex);
throttle_bps=bps;
}