-
Notifications
You must be signed in to change notification settings - Fork 389
/
Receiver.h
221 lines (175 loc) · 6.88 KB
/
Receiver.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/ReceiverThread.h>
#include <wdt/WdtBase.h>
#include <wdt/util/FileCreator.h>
#include <wdt/util/IServerSocket.h>
#include <wdt/util/TransferLogManager.h>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
namespace facebook {
namespace wdt {
/**
* Receiver is the receiving side of the transfer. Receiver listens on ports
* accepts connections, receives the files and writes to the destination
* directory. Receiver has two modes of operation : You can spawn a receiver
* for one transfer or alternatively it can also be used in a long running
* mode where it accepts subsequent transfers and runs in an infinite loop.
*/
class Receiver : public WdtBase {
public:
/// Constructor using wdt transfer request (@see in WdtBase.h)
explicit Receiver(const WdtTransferRequest &transferRequest);
/**
* Constructor with start port, number of ports and directory to write to.
* If the start port is specified as zero, it auto configures the ports
*/
Receiver(int port, int numSockets, const std::string &destDir);
/// Setup before starting (@see WdtBase.h)
const WdtTransferRequest &init() override;
/**
* Joins on the threads spawned by start. This method
* is called by default when the wdt receiver is expected
* to run as forever running process. However this has to
* be explicitly called when the caller expects to conclude
* a transfer.
*/
std::unique_ptr<TransferReport> finish() override;
/**
* Call this method instead of transferAsync() when you don't
* want the wdt receiver to stop after one transfer.
*/
ErrorCode runForever();
/**
* Starts the threads, and returns. Caller should call finish() after
* calling this method to get the statistics of the transfer.
*/
ErrorCode transferAsync() override;
/// @param recoveryId unique-id used to verify transfer log
void setRecoveryId(const std::string &recoveryId);
/// Returns true if at least one thread has accepted connection
bool hasNewTransferStarted() const;
// Different accept modes for the Receiver
enum AcceptMode {
ACCEPT_WITH_RETRIES, // Receiver gives up after max_accept_retries
ACCEPT_FOREVER, // Receiver never gives up
STOP_ACCEPTING, // Receiver stops accepting
};
/// @param acceptMode acceptMode to use
void setAcceptMode(AcceptMode acceptMode);
/// Interface to make socket
class ISocketCreator {
public:
virtual std::unique_ptr<IServerSocket> makeServerSocket(
ThreadCtx &threadCtx, int port, int backlog,
const EncryptionParams &encryptionParams, int64_t ivChangeInterval,
Func &&tagVerificationSuccessCallback, bool tls) = 0;
virtual ~ISocketCreator() = default;
};
/**
* Sets socket creator
*
* @param socketCreator socket-creator to be used
*/
void setSocketCreator(ISocketCreator *socketCreator);
/**
* Destructor for the receiver. The destructor automatically cancels
* any incomplete transfers that are going on. 'Incomplete transfer' is a
* transfer where there is no receiver thread that has received
* confirmation from wdt sender that the transfer is 'DONE'. Destructor also
* internally calls finish() for every transfer if finish() wasn't called
*/
~Receiver() override;
protected:
friend class ReceiverThread;
/**
* Traverses root directory and returns discovered file information
*
* @param fileChunksInfo discovered file info
*/
void traverseDestinationDir(std::vector<FileChunksInfo> &fileChunksInfo);
/// Get the transferred file chunks info
const std::vector<FileChunksInfo> &getFileChunksInfo() const;
/// Get file creator, used by receiver threads
std::unique_ptr<FileCreator> &getFileCreator();
/// Socket creator used to optionally create different kind of server socket
ISocketCreator *socketCreator_{nullptr};
/// Get the ref to transfer log manager
TransferLogManager &getTransferLogManager();
/// Responsible for basic setup and starting threads
ErrorCode start();
/**
* Periodically calculates current transfer report and send it to progress
* reporter. This only works in the single transfer mode.
*/
void progressTracker();
/**
* Adds a checkpoint to the global checkpoint list
* @param checkpoint checkpoint to be added
*/
void addCheckpoint(Checkpoint checkpoint);
/**
* @param startIndex number of checkpoints already transferred by the
* calling thread
* @return list of new checkpoints
*/
std::vector<Checkpoint> getNewCheckpoints(int startIndex);
/// Does the steps needed before a new transfer is started
void startNewGlobalSession(const std::string &peerIp);
/// Has steps to do when the current transfer is ended
void endCurGlobalSession();
/// adds log header and also a directory invalidation entry if needed
void addTransferLogHeader(bool isBlockMode, bool isSenderResuming);
/// fix, compact (if enabled) and close transfer log
void fixAndCloseTransferLog(bool transferSuccess);
/**
* Get transfer report, meant to be called after threads have been finished
* This method is not thread safe
*/
std::unique_ptr<TransferReport> getTransferReport();
void logPerfStats() const override;
/// @return transfer config encoded as int
int64_t getTransferConfig() const;
AcceptMode getAcceptMode();
/// The thread that is responsible for calling running the progress tracker
std::thread progressTrackerThread_;
/// Flag based on which threads finish processing on receiving a done
bool isJoinable_{false};
/// Responsible for writing files on the disk
std::unique_ptr<FileCreator> fileCreator_{nullptr};
/**
* Unique-id used to verify transfer log. This value must be same for
* transfers across resumption
*/
std::string recoveryId_;
/**
* The instance of the receiver threads are stored in this vector.
* This will not be destroyed until this object is destroyed, hence
* it has to be made sure that these threads are joined at least before
* the destruction of this object.
*/
std::vector<std::unique_ptr<WdtThread>> receiverThreads_;
/// Transfer log manager
std::unique_ptr<TransferLogManager> transferLogManager_;
/// Global list of checkpoints
std::vector<Checkpoint> checkpoints_;
/// Start time of the session
std::atomic<std::chrono::time_point<Clock>> startTime_;
/// already transferred file chunks
std::vector<FileChunksInfo> fileChunksInfo_;
/// Marks when a new transfer has started
std::atomic<bool> hasNewTransferStarted_{false};
/// Backlog used by the sockets
int backlog_;
AcceptMode acceptMode_{ACCEPT_WITH_RETRIES};
};
} // namespace wdt
} // namespace facebook