-
Notifications
You must be signed in to change notification settings - Fork 5
/
homa_stream.h
139 lines (107 loc) · 4.64 KB
/
homa_stream.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#ifndef HOMA_STREAM_H
#define HOMA_STREAM_H
#include <sys/uio.h>
#include <vector>
#include "src/core/lib/transport/transport_impl.h"
#include "homa_incoming.h"
#include "stream_id.h"
#include "wire.h"
/**
* This class provides common facilities used for Homa streams on both
* the client and server side. Each stream object corresponds to a single
* RPC, and it exists for the life of that RPC. It is used to manage the
* movement of data between gRPC and Homa messages, as well as gRPC callbacks.
*/
class HomaStream {
public:
// Must be held whenever accessing info in this structure.
grpc_core::Mutex mutex;
// File descriptor for the Homa socket to use for I/O.
int fd;
// Uniquely identifies this gRPC RPC, and also provides info about
// the peer (e.g. for sending responses).
StreamId streamId;
// Homa's identifier for the most recent request sent on this stream.
uint64_t sentHomaId;
// Homa's identifier for an unacknowledged Homa request received on this
// stream, or 0 if none. The next time we want to send a message, we'll
// send a reply to this RPC, rather than starting a new RPC.
uint64_t homaRequestId;
// Reference count (owned externally).
grpc_stream_refcount* refs;
// Small statically allocated buffer for outgoing messages; holds
// header plus initial and trailing metadata, if they fit.
uint8_t xmitBuffer[10000];
// If the metadata didn't completely fit in xmit_msg, extra chunks
// are allocated dynamically; this vector keeps track of them all
// so they can be freed.
std::vector<uint8_t *> xmitOverflows;
// How many bytes to allocate for each element of @xmitOverflows.
// This is a variable so it can be changed for unit testing.
size_t overflowChunkSize;
// Contains all of the Slices (of message data) referred to by vecs;
// keeps them alive and stable until the Homa request is sent.
std::vector<grpc_core::Slice> slices;
// Describes all of the pieces of the current outgoing message.
std::vector<struct iovec> vecs;
// Additional bytes available immediately following the last element
// of vecs.
size_t lastVecAvail;
// Current length of output message, in bytes.
size_t xmitSize;
// Sequence number to use for the next outgoing message.
int nextXmitSequence;
// Incoming Homa messages that have not been fully processed.
// Entries are sorted in increasing order of sequence number.
std::vector<HomaIncoming::UniquePtr> incoming;
// All incoming Homa messages with sequence numbers less than this one
// have already been processed.
int nextIncomingSequence;
// Information saved from "receive" stream ops, so that we can
// fill in message data/metadata and invoke callbacks.
grpc_metadata_batch* initMd;
grpc_closure* initMdClosure;
bool *initMdTrailMdAvail;
absl::optional<grpc_core::SliceBuffer>* messageBody;
grpc_closure* messageClosure;
grpc_metadata_batch* trailMd;
grpc_closure* trailMdClosure;
// True means we have passed trailing metadata to gRPC, so there is
// no more message data coming for this stream.
bool eof;
// True means this RPC has been cancelled, so we shouldn't send
// any more Homa messages.
bool cancelled;
// True means that trailing metadata has been sent for this stream.
bool trailMdSent;
// True means that this stream is for the server side of an RPC;
// false means client.
bool isServer;
// Error that has occurred on this stream, if any.
grpc_error_handle error;
// Maximum number of bytes to allow in a single Homa message (this
// is a variable so it can be modified for unit testing).
size_t maxMessageLength;
HomaStream(bool isServer, StreamId streamId, int fd,
grpc_stream_refcount* refcount);
Wire::Header *hdr()
{
return reinterpret_cast<Wire::Header*>(xmitBuffer);
}
virtual ~HomaStream(void);
void addPeerToMetadata(grpc_metadata_batch *md);
void cancelPeer(void);
void flush(void);
void handleIncoming(HomaIncoming::UniquePtr msg, uint64_t homaId);
void notifyError(grpc_error_handle error);
void resetXmit(void);
void saveCallbacks(grpc_transport_stream_op_batch* op);
void sendDummyResponse();
void serializeMetadata(const void *key, uint32_t keyLength,
const void *value, uint32_t valueLength);
void serializeMetadataBatch(grpc_metadata_batch *batch);
void transferData();
void xmit(grpc_transport_stream_op_batch* op);
static size_t metadataLength(grpc_metadata_batch* batch);
};
#endif // HOMA_STREAM_H