-
Notifications
You must be signed in to change notification settings - Fork 173
/
api.go
220 lines (189 loc) · 6.94 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Package transport provides long-lived http/tcp connections for
// intra-cluster communications (see README for details and usage example).
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package transport
import (
"io"
"math"
"runtime"
"time"
"unsafe"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/hk"
"github.com/NVIDIA/aistore/memsys"
)
///////////////////
// object stream //
///////////////////
// range of 16 `Obj.Hdr.Opcode` and `Msg.Opcode` values
// reserved for _internal_ use
const (
opcFin = iota + math.MaxUint16 - 16
opcIdleTick
)
func ReservedOpcode(opc int) bool { return opc >= opcFin }
const (
SizeUnknown = -1 // obj size unknown (not set)
dfltSizePDU = memsys.DefaultBufSize
maxSizePDU = memsys.MaxPageSlabSize
// see also: cmn/config for (max, default) transport header sizes
)
const sizeofh = int(unsafe.Sizeof(Obj{}))
type (
// advanced usage: additional stream control
Extra struct {
Callback ObjSentCB // typical usage: to free SGLs, close files, etc.
Config *cmn.Config // (to optimize-out GCO.Get())
Compression string // see CompressAlways, etc. enum
SenderID string // e.g., xaction ID (optional)
IdleTeardown time.Duration // when exceeded, causes PUT to terminate (and to renew upon the very next send)
SizePDU int32 // NOTE: 0(zero): no PDUs; must be below maxSizePDU; unknown size _requires_ PDUs
MaxHdrSize int32 // overrides config.Transport.MaxHeaderSize
ChanBurst int // overrides config.Transport.Burst
}
// receive-side session stats indexed by session ID (see recv.go for "uid")
// optional, currently tests only
RxStats map[uint64]*Stats
// object header
ObjHdr struct {
Bck cmn.Bck
ObjName string
SID string // sender node ID
Opaque []byte // custom control (optional)
ObjAttrs cmn.ObjAttrs // attributes/metadata of the object that's being transmitted
Opcode int // (see reserved range above)
}
// object to transmit
Obj struct {
Reader io.ReadCloser // reader (to read the object, and close when done)
CmplArg any // optional context passed to the ObjSentCB callback
Callback ObjSentCB // called when the last byte is sent _or_ when the stream terminates (see term.reason)
prc *atomic.Int64 // private; if present, ref-counts so that we call ObjSentCB only once
Hdr ObjHdr
}
// object-sent callback that has the following signature can optionally be defined on a:
// a) per-stream basis (via NewStream constructor - see Extra struct above)
// b) for a given object that is being sent (for instance, to support a call-per-batch semantics)
// Naturally, object callback "overrides" the per-stream one: when object callback is defined
// (i.e., non-nil), the stream callback is ignored/skipped.
// NOTE: if defined, the callback executes asynchronously as far as the sending part is concerned
ObjSentCB func(*ObjHdr, io.ReadCloser, any, error)
Msg struct {
SID string
Body []byte
Opcode int
}
// stream collector
StreamCollector struct{}
// Rx callbacks
RecvObj func(hdr *ObjHdr, objReader io.Reader, err error) error
RecvMsg func(msg Msg, err error) error
)
///////////////////
// object stream //
///////////////////
func NewObjStream(client Client, dstURL, dstID string, extra *Extra) (s *Stream) {
if extra == nil {
extra = &Extra{Config: cmn.GCO.Get()}
} else if extra.Config == nil {
extra.Config = cmn.GCO.Get()
}
s = &Stream{streamBase: *newBase(client, dstURL, dstID, extra)}
s.streamBase.streamer = s
s.callback = extra.Callback
if extra.Compressed() {
s.initCompression(extra)
}
debug.Assert(s.usePDU() == extra.UsePDU())
chsize := burst(extra) // num objects the caller can post without blocking
s.workCh = make(chan *Obj, chsize) // Send Qeueue (SQ)
s.cmplCh = make(chan cmpl, chsize) // Send Completion Queue (SCQ)
s.wg.Add(2)
go s.sendLoop(dryrun()) // handle SQ
go s.cmplLoop() // handle SCQ
gc.ctrlCh <- ctrl{&s.streamBase, true /* collect */}
return
}
// Asynchronously send an object (transport.Obj) defined by its header and its reader.
//
// The sending pipeline is implemented as a pair (SQ, SCQ) where the former is a send
// queue realized as workCh, and the latter is a send completion queue (cmplCh).
// Together SQ and SCQ form a FIFO.
//
// - header-only objects are supported; when there's no data to send (that is,
// when the header's Dsize field is set to zero), the reader is not required and the
// corresponding argument in Send() can be set to nil.
// - object reader is *always* closed irrespectively of whether the Send() succeeds
// or fails. On success, if send-completion (ObjSentCB) callback is provided
// (i.e., non-nil), the closing is done by doCmpl().
// - Optional reference counting is also done by (and in) the doCmpl, so that the
// ObjSentCB gets called if and only when the refcount (if provided i.e., non-nil)
// reaches zero.
// - For every transmission of every object there's always an doCmpl() completion
// (with its refcounting and reader-closing). This holds true in all cases including
// network errors that may cause sudden and instant termination of the underlying
// stream(s).
func (s *Stream) Send(obj *Obj) (err error) {
debug.Assertf(len(obj.Hdr.Opaque) < len(s.maxhdr)-sizeofh, "(%d, %d)", len(obj.Hdr.Opaque), len(s.maxhdr))
if err = s.startSend(obj); err != nil {
s.doCmpl(obj, err) // take a shortcut
return
}
s.workCh <- obj
if l, c := len(s.workCh), cap(s.workCh); l > (c - c>>2) {
runtime.Gosched() // poor man's throttle
if l == c {
s.chanFull.Inc()
}
}
return
}
func (s *Stream) Fin() {
_ = s.Send(&Obj{Hdr: ObjHdr{Opcode: opcFin}})
s.wg.Wait()
}
//////////////////////
// receive-side API //
//////////////////////
func Handle(trname string, rxObj RecvObj, withStats ...bool) error {
var h handler
if len(withStats) > 0 && withStats[0] {
hkName := ObjURLPath(trname)
hex := &hdlExtra{hdl: hdl{trname: trname, rxObj: rxObj}, hkName: hkName}
hk.Reg(hkName+hk.NameSuffix, hex.cleanup, sessionIsOld)
h = hex
} else {
h = &hdl{trname: trname, rxObj: rxObj}
}
return oput(trname, h)
}
func Unhandle(trname string) error { return odel(trname) }
////////////////////
// stats and misc //
////////////////////
func ObjURLPath(trname string) string { return _urlPath(apc.ObjStream, trname) }
func _urlPath(endp, trname string) string {
if trname == "" {
return cos.JoinWords(apc.Version, endp)
}
return cos.JoinWords(apc.Version, endp, trname)
}
func GetRxStats() (netstats map[string]RxStats) {
netstats = make(map[string]RxStats)
for i, hmap := range hmaps {
hmtxs[i].Lock()
for trname, h := range hmap {
if s := h.getStats(); s != nil {
netstats[trname] = s
}
}
hmtxs[i].Unlock()
}
return
}