Skip to content

Commit

Permalink
Merge pull request #2 from lmnzx/p2p
Browse files Browse the repository at this point in the history
p2p basic
  • Loading branch information
lmnzx committed Apr 19, 2024
2 parents 0bef13b + 2e36133 commit b401fe9
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 21 deletions.
15 changes: 15 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
package main

import (
"fmt"
"log"

"github.com/lmnzx/lemonfs/p2p"
)

func OnPeer(p p2p.Peer) error {
fmt.Println("We Cool")
p.Close()
return nil
}

func main() {
tcpOpts := p2p.TCPTransportOps{
ListenAddr: ":3000",
Decoder: p2p.DefaultDecoder{},
HandshakeFunc: p2p.NOPHandshakeFunc,
OnPeer: OnPeer,
}
tr := p2p.NewTCPTransport(tcpOpts)

go func() {
for {
msg := <-tr.Consume()
fmt.Printf("%+v\n", msg)
}
}()

if err := tr.ListenAndAccept(); err != nil {
log.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions p2p/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (
)

type Decoder interface {
Decode(io.Reader, *Message) error
Decode(io.Reader, *RPC) error
}

type GOBDecoder struct{}

func (dec GOBDecoder) Decode(r io.Reader, msg *Message) error {
func (dec GOBDecoder) Decode(r io.Reader, msg *RPC) error {
return gob.NewDecoder(r).Decode(msg)
}

type DefaultDecoder struct{}

func (dec DefaultDecoder) Decode(r io.Reader, msg *Message) error {
func (dec DefaultDecoder) Decode(r io.Reader, msg *RPC) error {
buf := make([]byte, 1024)
n, err := r.Read(buf)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "net"

// Represents any arbitrary data that is being
// sent over the transport between the nodes
type Message struct {
type RPC struct {
From net.Addr
Payload []byte
}
48 changes: 35 additions & 13 deletions p2p/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@ package p2p
import (
"fmt"
"net"
"sync"
)

// Configuration for a TCP transport
type TCPTransportOps struct {
ListenAddr string
HandshakeFunc HandshakeFunc
Decoder Decoder
OnPeer func(Peer) error
}

type TCPTransport struct {
TCPTransportOps
listener net.Listener

mu sync.RWMutex
peers map[net.Addr]Peer
rpcch chan RPC
}

// Represents the remote node over a TCP established connection
Expand All @@ -40,6 +38,7 @@ func NewTCPPeer(conn net.Conn, outbound bool) *TCPPeer {
func NewTCPTransport(opts TCPTransportOps) *TCPTransport {
return &TCPTransport{
TCPTransportOps: opts,
rpcch: make(chan RPC),
}
}

Expand All @@ -56,6 +55,17 @@ func (t *TCPTransport) ListenAndAccept() error {
return nil
}

// Implements the Transport interface
// retrun read-only channel for reading the incoming
// messages received from other peers in the network
func (t *TCPTransport) Consume() <-chan RPC {
return t.rpcch
}

func (p *TCPPeer) Close() error {
return p.conn.Close()
}

func (t *TCPTransport) startAcceptLoop() {
for {
conn, err := t.listener.Accept()
Expand All @@ -69,23 +79,35 @@ func (t *TCPTransport) startAcceptLoop() {
}

func (t *TCPTransport) handleConn(conn net.Conn) error {
peer := NewTCPPeer(conn, true)
var err error

if err := t.HandshakeFunc(peer); err != nil {
defer func() {
fmt.Printf("Dropping peer connection: %s\n", err)
conn.Close()
fmt.Printf("TCP handshake error: %s\n", err)
}()

peer := NewTCPPeer(conn, true)

if err = t.HandshakeFunc(peer); err != nil {
return err
}

if t.OnPeer != nil {
if err := t.OnPeer(peer); err != nil {
return err
}
}

// Read loop
msg := &Message{}
rpc := RPC{}
for {
if err := t.Decoder.Decode(conn, msg); err != nil {
err := t.Decoder.Decode(conn, &rpc)
if err != nil {
fmt.Printf("TCP error: %s\n", err)
continue
return nil
}

msg.From = conn.RemoteAddr()

fmt.Printf("Message: %+v\n", msg)
rpc.From = conn.RemoteAddr()
t.rpcch <- rpc
}
}
11 changes: 8 additions & 3 deletions p2p/tcp_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (
)

func TestTCPTransport(t *testing.T) {
listenAddr := ":4000"
tr := NewTCPTransport(listenAddr)
assert.Equal(t, tr.listenAddress, listenAddr)
opt := TCPTransportOps{
ListenAddr: ":3000",
HandshakeFunc: NOPHandshakeFunc,
Decoder: DefaultDecoder{},
}

tr := NewTCPTransport(opt)
assert.Equal(t, tr.ListenAddr, opt.ListenAddr)

// Server
assert.Nil(t, tr.ListenAndAccept())
Expand Down
5 changes: 4 additions & 1 deletion p2p/transport.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package p2p

// Interface that represents the remote node
type Peer interface{}
type Peer interface {
Close() error
}

// Handles communication between nodes in the network
// Can be TCP, UDP, websockets, ...
type Transport interface {
ListenAndAccept() error
Consume() <-chan RPC
}

0 comments on commit b401fe9

Please sign in to comment.