Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STELLAR-3554 : add support for stream resume, frame bundling #110

Merged
merged 16 commits into from
Jun 4, 2020
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ require (
github.com/MakeNowJust/heredoc v0.0.0-20171113091838-e9091a26100e
github.com/cenkalti/backoff v2.1.1+incompatible
github.com/golang/protobuf v1.3.1
github.com/infostellarinc/go-stellarstation v0.8.0
github.com/infostellarinc/go-stellarstation v0.8.1-0.20200601115159-cd35751d0a51
github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/onsi/ginkgo v1.8.0
github.com/onsi/gomega v1.5.0
github.com/spf13/cobra v0.0.5
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 // indirect
golang.org/x/sys v0.0.0-20200523222454-059865788121 // indirect
google.golang.org/appengine v1.6.1 // indirect
google.golang.org/genproto v0.0.0-20190620144150-6af8c5fc6601 // indirect
google.golang.org/grpc v1.21.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/infostellarinc/go-stellarstation v0.8.0 h1:rpGrzmYIIj40hAIgii33gsh1shTH1SDATuTuLAKkasw=
github.com/infostellarinc/go-stellarstation v0.8.0/go.mod h1:Pmo6ejsQ9Squvsb6Kuv4pgDOHvJhacOU4TEo1XAlp3A=
github.com/infostellarinc/go-stellarstation v0.8.1-0.20200601115159-cd35751d0a51 h1:wT/mlQzGu144Uv7ayRH4/cqK2rY0xQieu/Ld1YNJAlY=
github.com/infostellarinc/go-stellarstation v0.8.1-0.20200601115159-cd35751d0a51/go.mod h1:++wmnJPPi8D/sdVeqST85CVqEbIquJ1DMOQJFrTqjTI=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down Expand Up @@ -113,8 +113,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200523222454-059865788121 h1:rITEj+UZHYC927n8GT97eC3zrpzXdb/voyeOuVKS46o=
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
71 changes: 48 additions & 23 deletions pkg/satellite/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,28 @@ func (ss *satelliteStream) Close() error {
return nil
}

// send telemetryMessageAckId to support enableFlowControl feature
func (ss *satelliteStream) ackReceivedTelemetry(telemetryMessageAckId string) {
if telemetryMessageAckId != "" {
req := stellarstation.SatelliteStreamRequest{
SatelliteId: ss.satelliteId,
Request: &stellarstation.SatelliteStreamRequest_TelemetryReceivedAck{
TelemetryReceivedAck: &stellarstation.ReceiveTelemetryAck{
MessageAckId: telemetryMessageAckId,
ReceivedTimestamp: timestampNow(),
},
},
}
log.Debug("sending ack index: %v", telemetryMessageAckId)
ss.stream.Send(&req)
}
}

func (ss *satelliteStream) recvLoop() {
// Initialize exponential back off settings.
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = MaxElapsedTime
telemetryMessageAckId := ""

pq := collection.NewPriorityQueue((*stellarstation.Telemetry)(nil), func(i, j interface{}) bool {
telemetry1 := i.(*stellarstation.Telemetry)
Expand Down Expand Up @@ -185,7 +203,7 @@ func (ss *satelliteStream) recvLoop() {
log.Println("reconnecting to the API stream.")

rcErr := backoff.RetryNotify(func() error {
err := ss.openStream()
err := ss.openStream(telemetryMessageAckId)
if err != nil {
return err
}
Expand Down Expand Up @@ -232,23 +250,28 @@ func (ss *satelliteStream) recvLoop() {
break
}

telemetry := telResponse.Telemetry
if telemetry == nil {
break
}
payload := telemetry.Data
log.Debug("received data: streamId: %v, planId: %s, framing type: %s, size: %d bytes\n", ss.streamId, planId, telemetry.Framing, len(payload))
if ss.showStats {
metrics.collectTelemetry(telemetry)
}
if ss.correctOrder {
go func() {
ss.mu.Lock()
defer ss.mu.Unlock()
pq.Push(telemetry)
}()
} else {
ss.recvChan <- payload
for _, telemetry := range telResponse.Telemetry {
if telemetry == nil {
break
}
payload := telemetry.Data
log.Debug("received data: streamId: %v, planId: %s, framing type: %s, size: %d bytes\n", ss.streamId, planId, telemetry.Framing, len(payload))
if ss.showStats {
metrics.collectTelemetry(telemetry)
}
if ss.correctOrder {
go func() {
ss.mu.Lock()
defer ss.mu.Unlock()
pq.Push(telemetry)
}()
} else {
ss.recvChan <- payload
}

// send ack & update telemetryMessageAckId in case we need to resume from disconnects
telemetryMessageAckId = telResponse.MessageAckId
ss.ackReceivedTelemetry(telResponse.MessageAckId)
}
case *stellarstation.SatelliteStreamResponse_StreamEvent:
if res.GetStreamEvent() == nil || res.GetStreamEvent().GetPlanMonitoringEvent() == nil {
Expand Down Expand Up @@ -277,7 +300,7 @@ func (ss *satelliteStream) recvLoop() {
}
}

func (ss *satelliteStream) openStream() error {
func (ss *satelliteStream) openStream(resumeStreamMessageAckId string) error {
conn, err := apiclient.Dial()
if err != nil {
return err
Expand All @@ -292,9 +315,11 @@ func (ss *satelliteStream) openStream() error {
}

req := stellarstation.SatelliteStreamRequest{
AcceptedFraming: ss.acceptedFraming,
SatelliteId: ss.satelliteId,
StreamId: ss.streamId,
AcceptedFraming: ss.acceptedFraming,
SatelliteId: ss.satelliteId,
StreamId: ss.streamId,
ResumeStreamMessageAckId: resumeStreamMessageAckId,
EnableFlowControl: true,
}

err = stream.Send(&req)
Expand Down Expand Up @@ -327,7 +352,7 @@ func (ss *satelliteStream) start() (func(), error) {
}
}

err := ss.openStream()
err := ss.openStream("")
if err != nil {
return nil, err
}
Expand Down