Skip to content

Commit

Permalink
use bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
en0ma committed Dec 14, 2022
1 parent 09184d8 commit fb1d915
Showing 1 changed file with 31 additions and 10 deletions.
41 changes: 31 additions & 10 deletions shuttle/rpc/engines/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package websocket

import (
"context"
"encoding/json"
"fmt"
"net/url"
"sync"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/libp2p/go-libp2p/core/peer"
"golang.org/x/net/websocket"
gwebsocket "golang.org/x/net/websocket"

"github.com/application-research/estuary/config"
"go.uber.org/zap"
Expand All @@ -39,7 +41,7 @@ type Connection struct {
}

type IEstuaryRpcEngine interface {
Connect(ws *websocket.Conn, handle string, done chan struct{}) (func() error, func(), error)
Connect(ws *gwebsocket.Conn, handle string, done chan struct{}) (func() error, func(), error)
GetShuttleConnections() []*Connection
GetShuttleConnection(handle string) (*Connection, bool)
}
Expand Down Expand Up @@ -67,18 +69,28 @@ func NewEstuaryRpcEngine(ctx context.Context, db *gorm.DB, cfg *config.Estuary,
return wbsMgr
}

func (m *manager) Connect(ws *websocket.Conn, handle string, done chan struct{}) (func() error, func(), error) {
func (m *manager) Connect(ws *gwebsocket.Conn, handle string, done chan struct{}) (func() error, func(), error) {
var helloBytes []byte
if err := gwebsocket.Message.Receive(ws, &helloBytes); err != nil {
return nil, nil, err
}

var hello rpcevent.Hello
if err := websocket.JSON.Receive(ws, &hello); err != nil {
if err := json.Unmarshal(helloBytes, &hello); err != nil {
return nil, nil, err
}

b, err := json.Marshal(&rpcevent.Hi{QueueEngEnabled: m.cfg.RpcEngine.Queue.Enabled})
if err != nil {
return nil, nil, err
}

// tell shuttle if api support queue engine
if err := websocket.JSON.Send(ws, &rpcevent.Hi{QueueEngEnabled: m.cfg.RpcEngine.Queue.Enabled}); err != nil {
// tell shuttle if api supports queue engine
if err := gwebsocket.Message.Send(ws, b); err != nil {
return nil, nil, err
}

_, err := url.Parse(hello.Host)
_, err = url.Parse(hello.Host)
if err != nil {
m.log.Errorf("shuttle had invalid hostname %q: %s", hello.Host, err)
hello.Host = ""
Expand Down Expand Up @@ -128,25 +140,34 @@ func (m *manager) Connect(ws *websocket.Conn, handle string, done chan struct{})
select {
case msg := <-sc.cmds:
go func() {
// Write
err := websocket.JSON.Send(ws, msg)
msgBytes, err := json.Marshal(msg)
if err != nil {
m.log.Errorf("failed to serialize message: %s", err)
return
}

if err = websocket.Message.Send(ws, msgBytes); err != nil {
m.log.Errorf("failed to write command to shuttle: %s", err)
return
}
}()

case <-done:
return
}
}
}

readWebsocket := func() error {
var msgBytes []byte
if err := websocket.Message.Receive(ws, &msgBytes); err != nil {
return err
}

var msg *rpcevent.Message
if err := websocket.JSON.Receive(ws, &msg); err != nil {
if err := json.Unmarshal(msgBytes, &msg); err != nil {
return err
}

msg.Handle = handle
go func() {
m.rpcWebsocket <- msg
Expand Down

0 comments on commit fb1d915

Please sign in to comment.