-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
143 lines (129 loc) · 3.35 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package utp_go
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
)
var (
ErrNotConnected = errors.New("not connected")
)
type UtpStream struct {
streamCtx context.Context
streamCancel context.CancelFunc
logger log.Logger
cid *ConnectionId
reads chan *readOrWriteResult
writes chan *queuedWrite
streamEvents chan *streamEvent
shutdown *atomic.Bool
connHandle *sync.WaitGroup
conn *connection
closeOnce sync.Once
}
func NewUtpStream(
ctx context.Context,
logger log.Logger,
cid *ConnectionId,
config *ConnectionConfig,
syn *packet,
socketEvents chan *socketEvent,
streamEvents chan *streamEvent,
connected chan error,
) *UtpStream {
if logger.Enabled(BASE_CONTEXT, log.LevelInfo) {
logger.Info("new a utp stream", "dst.peer", cid.Peer, "dst.send", cid.Send, "dst.recv", cid.Recv)
}
connHandle := &sync.WaitGroup{}
connHandle.Add(1)
streamCtx, cancel := context.WithCancel(ctx)
utpStream := &UtpStream{
streamCtx: streamCtx,
streamCancel: cancel,
logger: logger,
cid: cid,
reads: make(chan *readOrWriteResult, 100),
writes: make(chan *queuedWrite, 100),
streamEvents: streamEvents,
connHandle: connHandle,
shutdown: &atomic.Bool{},
}
utpStream.conn = newConnection(streamCtx, logger, cid, config, syn, connected, socketEvents, utpStream.reads)
go utpStream.start()
return utpStream
}
func (s *UtpStream) Cid() *ConnectionId {
return s.cid
}
func (s *UtpStream) start() {
defer s.connHandle.Done()
err := s.conn.eventLoop(s)
if err != nil {
s.logger.Error("utp stream evenLoop has error and return", "err", err)
}
}
func (s *UtpStream) ReadToEOF(ctx context.Context, buf *[]byte) (int, error) {
n := 0
data := make([]byte, 0)
for {
select {
case <-ctx.Done():
s.logger.Error("ctx has been canceled", "err", ctx.Err(), "n", n)
return n, ctx.Err()
case <-s.streamCtx.Done():
s.logger.Error("streamCtx has been canceled", "err", s.streamCtx.Err(), "n", n)
return 0, s.streamCtx.Err()
case res, ok := <-s.reads:
if !ok {
return n, nil
}
if s.logger.Enabled(BASE_CONTEXT, log.LevelTrace) {
s.logger.Trace("read a new buf", "len", res.Len)
}
if len(res.Data) == 0 {
return n, res.Err
}
n += res.Len
data = append(data, res.Data[:res.Len]...)
*buf = data
}
}
}
func (s *UtpStream) Write(ctx context.Context, buf []byte) (int, error) {
if s.shutdown.Load() {
return 0, ErrNotConnected
}
resCh := make(chan *readOrWriteResult, 1)
s.writes <- &queuedWrite{buf, 0, resCh}
if s.logger.Enabled(BASE_CONTEXT, log.LevelTrace) {
s.logger.Trace("created a new queued write to writes channel",
"dst.peer", s.cid.Peer,
"buf.len", len(buf),
"len(s.writes)", len(s.writes),
"ptr(s)", fmt.Sprintf("%p", s),
"ptr(writes)", fmt.Sprintf("%p", s.writes))
}
var writtenLen int
var err error
select {
case writeRes := <-resCh:
if writeRes != nil {
return writeRes.Len, writeRes.Err
}
case <-ctx.Done():
return 0, ctx.Err()
case <-s.streamCtx.Done():
return 0, s.streamCtx.Err()
}
return writtenLen, err
}
func (s *UtpStream) Close() {
s.logger.Info("call close utp stream", "dst.Peer", s.cid.Peer, "dst.send", s.cid.Send, "dst.recv", s.cid.Recv)
s.closeOnce.Do(func() {
s.shutdown.Store(true)
s.connHandle.Wait()
s.streamCancel()
})
}