diff --git a/cmd/dashboard/main.go b/cmd/dashboard/main.go index c77658f..447e11b 100644 --- a/cmd/dashboard/main.go +++ b/cmd/dashboard/main.go @@ -17,12 +17,14 @@ limitations under the License. package main import ( + "context" "flag" "os" "github.com/eddycharly/kloops/apis/config/v1alpha1" _ "github.com/eddycharly/kloops/pkg/chatbot/pluginimports" "github.com/eddycharly/kloops/pkg/dashboard/server" + "github.com/eddycharly/kloops/pkg/utils" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -64,9 +66,30 @@ func main() { os.Exit(1) } + // Define all broadcasters/channels + // Keep broadcaster channels open indefinitely + var resourcesChannel = make(chan utils.SocketData) + + var resourcesBroadcaster = utils.NewBroadcaster(resourcesChannel) + + cache := mgr.GetCache() + informer, err := cache.GetInformer(context.TODO(), &v1alpha1.RepoConfig{}) + if err != nil { + setupLog.Error(err, "unable to start controller") + os.Exit(1) + } + + utils.NewController( + resourcesChannel, + informer, + "created", + "updated", + "deleted", + ) + stopCh := ctrl.SetupSignalHandler() - server := server.NewServer(namespace, mgr.GetConfig(), mgr.GetClient(), ctrl.Log) + server := server.NewServer(namespace, mgr.GetConfig(), mgr.GetClient(), resourcesBroadcaster, ctrl.Log) go func() { if err := server.Start("", 8090); err != nil { diff --git a/pkg/dashboard/server/server.go b/pkg/dashboard/server/server.go index 55c8ae1..1d71b8e 100644 --- a/pkg/dashboard/server/server.go +++ b/pkg/dashboard/server/server.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/eddycharly/kloops/pkg/dashboard/server/handlers" + "github.com/eddycharly/kloops/pkg/utils" "github.com/go-logr/logr" "github.com/gorilla/mux" "k8s.io/client-go/rest" @@ -18,18 +19,20 @@ type Server interface { } type server struct { - namespace string - config *rest.Config - client client.Client - logger logr.Logger + namespace string + config *rest.Config + client client.Client + broadcaster *utils.Broadcaster + logger logr.Logger } -func NewServer(namespace string, config *rest.Config, client client.Client, logger logr.Logger) Server { +func NewServer(namespace string, config *rest.Config, client client.Client, broadcaster *utils.Broadcaster, logger logr.Logger) Server { return &server{ - namespace: namespace, - config: config, - client: client, - logger: logger.WithName("Server"), + namespace: namespace, + config: config, + client: client, + broadcaster: broadcaster, + logger: logger.WithName("Server"), } } @@ -51,6 +54,16 @@ func (s *server) Start(addr string, port int) error { r.HandleFunc("/api/repos/{name}", repoConfig.Get).Methods("GET") r.HandleFunc("/api/repos", repoConfig.Create).Methods("POST") r.HandleFunc("/api/hooks/{name}", repoConfig.Hook).Methods("POST") + // Websocket + r.HandleFunc("/resources", func(w http.ResponseWriter, r *http.Request) { + connection, err := utils.UpgradeToWebsocket(w, r) + if err != nil { + fmt.Println(err) + // log.Error().Err(err).Msg("Could not upgrade to websocket connection") + return + } + utils.WriteOnlyWebsocket(connection, s.broadcaster) + }) // Static content r.PathPrefix("/").Handler(http.StripPrefix("/", http.FileServer(http.Dir("./dashboard/build")))) return http.ListenAndServe(fmt.Sprintf("%s:%d", addr, port), r) diff --git a/pkg/utils/controller.go b/pkg/utils/controller.go new file mode 100644 index 0000000..98472d8 --- /dev/null +++ b/pkg/utils/controller.go @@ -0,0 +1,37 @@ +package utils + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + toolscache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" +) + +func NewController(ch chan<- SocketData, informer cache.Informer, onCreated, onUpdated, onDeleted string) { + informer.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + data := SocketData{ + MessageType: onCreated, + Payload: obj, + } + ch <- data + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldMeta, newMeta := oldObj.(metav1.Object), newObj.(metav1.Object) + // If resourceVersion differs between old and new, an actual update event was observed + if oldMeta.GetResourceVersion() != newMeta.GetResourceVersion() { + data := SocketData{ + MessageType: onUpdated, + Payload: newObj, + } + ch <- data + } + }, + DeleteFunc: func(obj interface{}) { + data := SocketData{ + MessageType: onDeleted, + Payload: obj, + } + ch <- data + }, + }) +} diff --git a/pkg/utils/websocket.go b/pkg/utils/websocket.go index 32bcb9f..1cd3b5f 100644 --- a/pkg/utils/websocket.go +++ b/pkg/utils/websocket.go @@ -2,27 +2,29 @@ package utils import ( "encoding/json" + "fmt" + "net/http" "time" "github.com/gorilla/websocket" ctrl "sigs.k8s.io/controller-runtime" ) -// // UpgradeToWebsocket attempts to upgrade connection from HTTP(S) to WS(S) -// func UpgradeToWebsocket(request *restful.Request, response *restful.Response) (*websocket.Conn, error) { -// var writer http.ResponseWriter = response -// log.Debug().Msg("Upgrading connection to websocket...") -// // Handles writing error to response -// upgrader := websocket.Upgrader{ -// ReadBufferSize: 1024, -// WriteBufferSize: 4096, -// CheckOrigin: func(r *http.Request) bool { -// return true -// }, -// } -// connection, err := upgrader.Upgrade(writer, request.Request, nil) -// return connection, err -// } +// UpgradeToWebsocket attempts to upgrade connection from HTTP(S) to WS(S) +func UpgradeToWebsocket(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) { + // log.Debug().Msg("Upgrading connection to websocket...") + fmt.Println("Upgrading connection to websocket...") + // Handles writing error to response + upgrader := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 4096, + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + connection, err := upgrader.Upgrade(w, r, nil) + return connection, err +} // WriteOnlyWebsocket discards text messages from the peer connection func WriteOnlyWebsocket(connection *websocket.Conn, b *Broadcaster) {