Skip to content

Commit

Permalink
go websocket implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
charles-edouard.breteche committed Sep 28, 2020
1 parent a394549 commit 2737c60
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 25 deletions.
25 changes: 24 additions & 1 deletion cmd/dashboard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package main

import (
"context"
"flag"
"os"

"github.com/eddycharly/kloops/apis/config/v1alpha1"
_ "github.com/eddycharly/kloops/pkg/chatbot/pluginimports"
"github.com/eddycharly/kloops/pkg/dashboard/server"
"github.com/eddycharly/kloops/pkg/utils"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Expand Down Expand Up @@ -64,9 +66,30 @@ func main() {
os.Exit(1)
}

// Define all broadcasters/channels
// Keep broadcaster channels open indefinitely
var resourcesChannel = make(chan utils.SocketData)

var resourcesBroadcaster = utils.NewBroadcaster(resourcesChannel)

cache := mgr.GetCache()
informer, err := cache.GetInformer(context.TODO(), &v1alpha1.RepoConfig{})
if err != nil {
setupLog.Error(err, "unable to start controller")
os.Exit(1)
}

utils.NewController(
resourcesChannel,
informer,
"created",
"updated",
"deleted",
)

stopCh := ctrl.SetupSignalHandler()

server := server.NewServer(namespace, mgr.GetConfig(), mgr.GetClient(), ctrl.Log)
server := server.NewServer(namespace, mgr.GetConfig(), mgr.GetClient(), resourcesBroadcaster, ctrl.Log)

go func() {
if err := server.Start("", 8090); err != nil {
Expand Down
31 changes: 22 additions & 9 deletions pkg/dashboard/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/eddycharly/kloops/pkg/dashboard/server/handlers"
"github.com/eddycharly/kloops/pkg/utils"
"github.com/go-logr/logr"
"github.com/gorilla/mux"
"k8s.io/client-go/rest"
Expand All @@ -18,18 +19,20 @@ type Server interface {
}

type server struct {
namespace string
config *rest.Config
client client.Client
logger logr.Logger
namespace string
config *rest.Config
client client.Client
broadcaster *utils.Broadcaster
logger logr.Logger
}

func NewServer(namespace string, config *rest.Config, client client.Client, logger logr.Logger) Server {
func NewServer(namespace string, config *rest.Config, client client.Client, broadcaster *utils.Broadcaster, logger logr.Logger) Server {
return &server{
namespace: namespace,
config: config,
client: client,
logger: logger.WithName("Server"),
namespace: namespace,
config: config,
client: client,
broadcaster: broadcaster,
logger: logger.WithName("Server"),
}
}

Expand All @@ -51,6 +54,16 @@ func (s *server) Start(addr string, port int) error {
r.HandleFunc("/api/repos/{name}", repoConfig.Get).Methods("GET")
r.HandleFunc("/api/repos", repoConfig.Create).Methods("POST")
r.HandleFunc("/api/hooks/{name}", repoConfig.Hook).Methods("POST")
// Websocket
r.HandleFunc("/resources", func(w http.ResponseWriter, r *http.Request) {
connection, err := utils.UpgradeToWebsocket(w, r)
if err != nil {
fmt.Println(err)
// log.Error().Err(err).Msg("Could not upgrade to websocket connection")
return
}
utils.WriteOnlyWebsocket(connection, s.broadcaster)
})
// Static content
r.PathPrefix("/").Handler(http.StripPrefix("/", http.FileServer(http.Dir("./dashboard/build"))))
return http.ListenAndServe(fmt.Sprintf("%s:%d", addr, port), r)
Expand Down
37 changes: 37 additions & 0 deletions pkg/utils/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package utils

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
toolscache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
)

func NewController(ch chan<- SocketData, informer cache.Informer, onCreated, onUpdated, onDeleted string) {
informer.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
data := SocketData{
MessageType: onCreated,
Payload: obj,
}
ch <- data
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMeta, newMeta := oldObj.(metav1.Object), newObj.(metav1.Object)
// If resourceVersion differs between old and new, an actual update event was observed
if oldMeta.GetResourceVersion() != newMeta.GetResourceVersion() {
data := SocketData{
MessageType: onUpdated,
Payload: newObj,
}
ch <- data
}
},
DeleteFunc: func(obj interface{}) {
data := SocketData{
MessageType: onDeleted,
Payload: obj,
}
ch <- data
},
})
}
32 changes: 17 additions & 15 deletions pkg/utils/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,29 @@ package utils

import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/gorilla/websocket"
ctrl "sigs.k8s.io/controller-runtime"
)

// // UpgradeToWebsocket attempts to upgrade connection from HTTP(S) to WS(S)
// func UpgradeToWebsocket(request *restful.Request, response *restful.Response) (*websocket.Conn, error) {
// var writer http.ResponseWriter = response
// log.Debug().Msg("Upgrading connection to websocket...")
// // Handles writing error to response
// upgrader := websocket.Upgrader{
// ReadBufferSize: 1024,
// WriteBufferSize: 4096,
// CheckOrigin: func(r *http.Request) bool {
// return true
// },
// }
// connection, err := upgrader.Upgrade(writer, request.Request, nil)
// return connection, err
// }
// UpgradeToWebsocket attempts to upgrade connection from HTTP(S) to WS(S)
func UpgradeToWebsocket(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
// log.Debug().Msg("Upgrading connection to websocket...")
fmt.Println("Upgrading connection to websocket...")
// Handles writing error to response
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
connection, err := upgrader.Upgrade(w, r, nil)
return connection, err
}

// WriteOnlyWebsocket discards text messages from the peer connection
func WriteOnlyWebsocket(connection *websocket.Conn, b *Broadcaster) {
Expand Down

0 comments on commit 2737c60

Please sign in to comment.