Skip to content

Commit

Permalink
RPC over UDP support, --port udp://192.168.1.23:8910/
Browse files Browse the repository at this point in the history
Device library is https://github.com/mongoose-os-libs/rpc-udp

CL: mos: RPC over UDP support, --port udp://192.168.1.23:8910/
  • Loading branch information
rojer committed Jul 17, 2019
1 parent ff7a5cb commit 18877dc
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 21 deletions.
1 change: 1 addition & 0 deletions common/mgrpc/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Options struct {
GCP GCPCodecOptions
MQTT MQTTCodecOptions
Serial SerialCodecOptions
UDP UDPCodecOptions
Watson WatsonCodecOptions
}

Expand Down
93 changes: 93 additions & 0 deletions common/mgrpc/codec/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// Copyright (c) 2014-2019 Cesanta Software Limited
// All rights reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package codec

import (
"context"
"encoding/json"
"net"

"github.com/cesanta/errors"

"github.com/mongoose-os/mos/common/mgrpc/frame"
)

const (
UDPURLScheme = "udp"
)

type UDPCodecOptions struct {
}

type udpCodec struct {
conn net.Conn
}

func UDP(addr string) Codec {
conn, err := net.Dial("udp", addr)
if err != nil {
return nil
}
return &udpCodec{conn: conn}
}

func (c *udpCodec) Recv(ctx context.Context) (*frame.Frame, error) {
buf := make([]byte, 10000)
readLen, err := c.conn.Read(buf)
if err != nil {
c.Close()
return nil, errors.Trace(err)
}
var f frame.Frame
if err = json.Unmarshal(buf[:readLen], &f); err != nil {
return nil, errors.Trace(err)
}
return &f, nil
}

func (c *udpCodec) Send(ctx context.Context, f *frame.Frame) error {
b, err := json.Marshal(f)
if err != nil {
return errors.Trace(err)
}
_, err = c.conn.Write(b)
return errors.Trace(err)
}

func (c *udpCodec) Close() {
c.conn.Close()
}

func (c *udpCodec) CloseNotify() <-chan struct{} {
return nil
}

func (c *udpCodec) MaxNumFrames() int {
return -1
}

func (c *udpCodec) Info() ConnectionInfo {
return ConnectionInfo{
IsConnected: true,
TLS: false,
RemoteAddr: c.conn.RemoteAddr().String(),
}
}

func (c *udpCodec) SetOptions(opts *Options) error {
return errors.NotImplementedf("SetOptions")
}
4 changes: 3 additions & 1 deletion common/mgrpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"io/ioutil"
"net/url"

"github.com/mongoose-os/mos/common/mgrpc/codec"
"github.com/cesanta/errors"
"github.com/mongoose-os/mos/common/mgrpc/codec"
)

type connectOptions struct {
Expand Down Expand Up @@ -121,6 +121,8 @@ func connectTo(connectURL string) ConnectOption {
t, a = tGCP, url.String()
case url.Scheme == codec.WatsonURLScheme:
t, a = tWatson, url.String()
case url.Scheme == codec.UDPURLScheme:
t, a = tUDP, url.Host
default:
return badConnectOption(errors.Errorf("invalid ConnectTo protocol %q", url.Scheme))
}
Expand Down
6 changes: 4 additions & 2 deletions common/mgrpc/mgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (

"golang.org/x/net/websocket"

"github.com/mongoose-os/mos/common/mgrpc/codec"
"github.com/mongoose-os/mos/common/mgrpc/frame"
"github.com/cesanta/errors"
"github.com/golang/glog"
"github.com/mongoose-os/mos/common/mgrpc/codec"
"github.com/mongoose-os/mos/common/mgrpc/frame"
)

const (
Expand Down Expand Up @@ -261,6 +261,8 @@ func (r *mgRPCImpl) connect(ctx context.Context, opts ...ConnectOption) error {
c, err := r.tcpConnect(tcpAddress, r.opts)
return c, errors.Trace(err)
})
case tUDP:
r.codec = codec.UDP(r.opts.connectAddress)
case tSerial:
if r.opts.enableReconnect {
r.codec = codec.NewReconnectWrapperCodec(
Expand Down
1 change: 1 addition & 0 deletions common/mgrpc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ const (
tAzureDM
tGCP
tWatson
tUDP
)
20 changes: 2 additions & 18 deletions common/mgrpc/transport_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 18877dc

Please sign in to comment.