Skip to content

Commit

Permalink
restructure server functions and add some comments (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
Inveracity authored Jul 22, 2023
1 parent 75d172b commit c1832bd
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 176 deletions.
178 changes: 2 additions & 176 deletions relay/internal/server/server.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package server

import (
"context"
"fmt"
"log"
"time"

"github.com/nats-io/nats.go"

"github.com/inveracity/svelte-grpc-stream/internal/cache"
pb "github.com/inveracity/svelte-grpc-stream/internal/proto/chat/v1"
"github.com/inveracity/svelte-grpc-stream/internal/queue"

"google.golang.org/protobuf/encoding/protojson"
)

var ServerID = "myserver" // TODO: ServerID should be a configurable value

type Server struct {
pb.UnimplementedChatServiceServer
cache *cache.Cache
Expand All @@ -29,170 +22,3 @@ func NewServer(natsURL string, cache *cache.Cache) *Server {
natsURL: natsURL,
}
}

func (s *Server) Connect(in *pb.ConnectRequest, srv pb.ChatService_ConnectServer) error {
// Create a unique streamid for this connection
s.streamid = RandStringRunes(10)

log.Printf("GRPC %s: user %s connected to server %s", s.streamid, in.UserId, in.ServerId)

// Create a NATS queue subscriber for this s.streamid
s.queue = queue.NewQueue(s.natsURL, s.streamid)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go s.queue.Subscribe(ctx, in.ServerId)

// send a "connected" message to the client to tell the client it successfully connected
srv.Send(systemMessage("connected", "server"))
// getPastMessages
if err := s.getPastMessages(srv, in); err != nil {
log.Printf("error getting past messages: %v", err)
return err
}

go s.ping(ctx, srv, in, cancel)
// Receive messages from the NATS loop and forward them to the client
for {
select {
case <-ctx.Done():
log.Printf("GRPC %s: %s disconnected from %s. Global context cancelled.", s.streamid, in.UserId, in.ServerId)
return nil

default:
if err := srv.Context().Err(); err != nil {
log.Printf("GRPC %s: Server found the context to be done in the default case, cancelling global context", s.streamid)
cancel()
return nil
}

for message := range *s.queue.Messages {
if err := relay(message, srv, cancel, s.streamid); err != nil {
s.queue.ErrCh <- err
}
}
}
}
}

// Send: receives a message from the client and publishes it to the NATS server
func (s *Server) Send(ctx context.Context, in *pb.ChatMessage) (*pb.SendResponse, error) {

q := queue.NewQueue(s.natsURL, "")
// Override timstamp
in.Ts = fmt.Sprint(time.Now().UnixNano())

msg := nats.NewMsg("myserver")

payload, err := ProtoToJSON(in)
if err != nil {
return nil, err
}

msg.Data = payload

if in.ChannelId != "system" { // only cache non-system messages
if err := s.cache.Set("myserver", string(payload)); err != nil {
log.Printf("error writing message to cache: %v", err)
return nil, err
}
}

if err := q.Publish("myserver", payload); err != nil {
log.Printf("error publishing message to queue: %v", err)
return nil, err
}
q.Close()
return &pb.SendResponse{Ok: true, Error: ""}, nil
}

func systemMessage(msg, userid string) *pb.ChatMessage {
return &pb.ChatMessage{
ChannelId: "system", // system information channel - the UI implements behavior based on events received on this channel
UserId: userid, // 'server' is not an actual user
Text: msg,
Ts: "0",
}
}

// Send messages from NATS to the gRPC client
func relay(message nats.Msg, srv pb.ChatService_ConnectServer, cancel context.CancelFunc, streamid string) error {
// Convert JSON message to Notification object
chatMsg, err := JSONToProto(message.Data)
if err != nil {
log.Printf("unmarshal error %v", err)
return err
}

// Override the timestamp with the current time
chatMsg.Ts = fmt.Sprint(time.Now().UnixNano())
//log.Printf("N->G %s: %s %s %s: %s", streamid, chatMsg.ChannelId, chatMsg.UserId, chatMsg.Ts, chatMsg.Text)

if err := srv.Send(chatMsg); err != nil {
// If the client has disconnected, cancel the global context
cancel()
return err
}

return nil
}

func (s *Server) getPastMessages(srv pb.ChatService_ConnectServer, in *pb.ConnectRequest) error {
pastMessages, err := s.cache.GetFrom(in.ServerId, in.LastTs, "+inf")
if err != nil {
return err
}

for _, message := range pastMessages {
var chatmsg pb.ChatMessage
j := protojson.UnmarshalOptions{}

// Convert JSON message to Notification object
if err := j.Unmarshal([]byte(message), &chatmsg); err != nil {
log.Printf("unmarshal error %v", err)
return err
}

// Send the message to the client
if err := srv.Send(&chatmsg); err != nil {
log.Printf("an error occurred sending history to client: %v", err)
return err
}
}

return nil
}

// Ping will send a ping message to the client every second and cancel the global context if the client disconnects
func (s *Server) ping(ctx context.Context, srv pb.ChatService_ConnectServer, in *pb.ConnectRequest, cancel context.CancelFunc) {
for {
select {
case <-ctx.Done():
err := s.broadcast(in.UserId, "disconnected")
if err != nil {
log.Printf("PING %s: error broadcasting disconnect message: %v", s.streamid, err)
}
return

default:
time.Sleep(1 * time.Second)
s.broadcast(in.UserId, "connected")
err := srv.Send(&pb.ChatMessage{
ChannelId: "system", // system information channel
UserId: "server",
Text: "ping",
Ts: "0",
})

if err != nil {
cancel()
}
}
}
}

// Broadcast sends a message to all connected clients
func (s *Server) broadcast(user, msg string) error {
return s.queue.Publish("myserver", []byte(`{"channelId":"system","userId":"`+user+`","text":"`+msg+`","ts":"0"}`))
}
156 changes: 156 additions & 0 deletions relay/internal/server/server_connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package server

import (
"context"
"log"
"time"

pb "github.com/inveracity/svelte-grpc-stream/internal/proto/chat/v1"
"github.com/inveracity/svelte-grpc-stream/internal/queue"
"github.com/nats-io/nats.go"
"google.golang.org/protobuf/encoding/protojson"
)

// Connect: receives a connection request from the client and creates a NATS queue subscriber
func (s *Server) Connect(in *pb.ConnectRequest, srv pb.ChatService_ConnectServer) error {
// Create a unique streamid for this connection, this is used to trace logs for each connection
s.streamid = RandStringRunes(10)

log.Printf("GRPC %s: user %s connected to server %s", s.streamid, in.UserId, in.ServerId)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.queue = queue.NewQueue(s.natsURL, s.streamid)
go s.queue.Subscribe(ctx, in.ServerId)

Check failure on line 25 in relay/internal/server/server_connect.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `s.queue.Subscribe` is not checked (errcheck)

// Send a "connected" message to the client to tell the client it successfully connected.
srv.Send(systemMessage("connected", "server"))

Check failure on line 28 in relay/internal/server/server_connect.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `srv.Send` is not checked (errcheck)

// Once the client is connected, send all past messages from the cache.
if err := s.getHistory(srv, in); err != nil {
log.Printf("error getting past messages: %v", err)
return err
}

// Every client is continuously pinged to track whether they are still connected.
// It also makes other users aware of who is connected to show in the UI.
go s.ping(ctx, srv, in, cancel)

// This is the main loop that receives messages from the NATS queue and sends them to the client.
for {
select {
case <-ctx.Done():
log.Printf("GRPC %s: %s disconnected from %s. Global context cancelled.", s.streamid, in.UserId, in.ServerId)
return nil

default:
if err := srv.Context().Err(); err != nil {
log.Printf("GRPC %s: Server found the context to be done in the default case, cancelling global context", s.streamid)
cancel()
return nil
}

for message := range *s.queue.Messages {
if err := relay(message, srv, cancel, s.streamid); err != nil {
s.queue.ErrCh <- err
}
}
}
}
}

// systemMessage is a convenience function to create a system message
func systemMessage(msg, userid string) *pb.ChatMessage {
return &pb.ChatMessage{
ChannelId: "system", // system information channel - the UI implements behavior based on events received on this channel
UserId: userid, // 'server' is not an actual user
Text: msg,
Ts: "0",
}
}

// GetHistory finds all the messages stored in Redis for a given ServerID
func (s *Server) getHistory(srv pb.ChatService_ConnectServer, in *pb.ConnectRequest) error {
pastMessages, err := s.cache.GetFrom(in.ServerId, in.LastTs, "+inf")
if err != nil {
return err
}

for _, message := range pastMessages {
var chatmsg pb.ChatMessage
j := protojson.UnmarshalOptions{}

// Convert JSON message to Notification object
if err := j.Unmarshal([]byte(message), &chatmsg); err != nil {
log.Printf("unmarshal error %v", err)
return err
}

// Send the message to the client
if err := srv.Send(&chatmsg); err != nil {
log.Printf("an error occurred sending history to client: %v", err)
return err
}
}

return nil
}

// Send messages from NATS to the gRPC client
func relay(message nats.Msg, srv pb.ChatService_ConnectServer, cancel context.CancelFunc, streamid string) error {
// Convert JSON message to Notification object
chatMsg, err := JSONToProto(message.Data)
if err != nil {
log.Printf("unmarshal error %v", err)
return err
}

if err := srv.Send(chatMsg); err != nil {
// If the client has disconnected, cancel the global context. This closed the NATS queue and stops the main loop.
// This should hopefully never happen as the ping function should cancel the global context if the client disconnects.
cancel()
return err
}

return nil
}

// Ping will send a ping message to the client every second and cancel the global context if the client disconnects
func (s *Server) ping(ctx context.Context, srv pb.ChatService_ConnectServer, in *pb.ConnectRequest, cancel context.CancelFunc) {
for {
select {
case <-ctx.Done():
err := s.broadcast(in.UserId, "disconnected")
if err != nil {
log.Printf("PING %s: error broadcasting disconnect message: %v", s.streamid, err)
}
return

default:
time.Sleep(1 * time.Second)
s.broadcast(in.UserId, "connected")

Check failure on line 132 in relay/internal/server/server_connect.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `s.broadcast` is not checked (errcheck)
err := srv.Send(&pb.ChatMessage{
ChannelId: "system", // system information channel
UserId: "server",
Text: "ping",
Ts: "0",
})

if err != nil {
cancel()
}
}
}
}

// Broadcast sends a message to all connected clients
func (s *Server) broadcast(user, msg string) error {
/*
The channelid "system" is used to send system messages to the client that are not shown in the chat UI.
The UI implements behavior based on events received on this channel.
The "userId" field is used as metadata about which client the broadcast is coming from.
The timestamp is an ignored field.
*/
return s.queue.Publish(ServerID, []byte(`{"channelId":"system","userId":"`+user+`","text":"`+msg+`","ts":"0"}`))
}
44 changes: 44 additions & 0 deletions relay/internal/server/server_send.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package server

import (
"context"
"fmt"
"log"
"time"

pb "github.com/inveracity/svelte-grpc-stream/internal/proto/chat/v1"
"github.com/inveracity/svelte-grpc-stream/internal/queue"
"github.com/nats-io/nats.go"
)

// Send: receives messages from clients and publishes them to a NATS queue.
func (s *Server) Send(ctx context.Context, in *pb.ChatMessage) (*pb.SendResponse, error) {

q := queue.NewQueue(s.natsURL, "")

// Set message timestamp to the current time, this is done because the client may have a different time than the server.
in.Ts = fmt.Sprint(time.Now().UnixNano())

msg := nats.NewMsg(ServerID)

payload, err := ProtoToJSON(in)
if err != nil {
return nil, err
}

msg.Data = payload

if in.ChannelId != "system" { // only cache non-system messages
if err := s.cache.Set(ServerID, string(payload)); err != nil {
log.Printf("error writing message to cache: %v", err)
return nil, err
}
}

if err := q.Publish(ServerID, payload); err != nil {
log.Printf("error publishing message to queue: %v", err)
return nil, err
}
q.Close()
return &pb.SendResponse{Ok: true, Error: ""}, nil
}

0 comments on commit c1832bd

Please sign in to comment.