Skip to content

Commit

Permalink
[#4053] Add api-streams component
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed Apr 26, 2023
1 parent b3add04 commit 7d318a3
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 0 deletions.
49 changes: 49 additions & 0 deletions backend/components/streams/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
load("@com_github_airyhq_bazel_tools//lint:buildifier.bzl", "check_pkg")

# gazelle:prefix github.com/airyhq/airy/backend/components/streams
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
load("@io_bazel_rules_docker//go:image.bzl", "go_image")
load("//tools/build:container_release.bzl", "container_release")

go_library(
name = "streams_lib",
srcs = [
"auth.go",
"components_list.go",
"cors.go",
"main.go",
],
importpath = "github.com/airyhq/airy/backend/components/streams",
visibility = ["//visibility:private"],
deps = [
"@com_github_golang_jwt_jwt//:jwt",
"@com_github_gorilla_mux//:mux",
"@io_k8s_klog//:klog",
],
)

go_binary(
name = "streams",
out = "streams",
embed = [":streams_lib"],
visibility = ["//visibility:public"],
)

genrule(
name = "streams_bin_rule",
srcs = [":streams"],
outs = ["streams_bin"],
cmd = "cp $(SRCS) $@",
)

go_image(
name = "image",
embed = [":streams_lib"],
)

container_release(
registry = "ghcr.io/airyhq/api/components",
repository = "streams",
)

check_pkg(name = "buildifier")
108 changes: 108 additions & 0 deletions backend/components/streams/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"context"
"encoding/base64"
"log"
"net/http"
"regexp"
"strings"

"github.com/golang-jwt/jwt"
"k8s.io/klog"
)

type EnableAuthMiddleware struct {
pattern *regexp.Regexp
}

// MustNewAuthMiddleware Only paths that match the regexp pattern will be authenticated
func MustNewAuthMiddleware(pattern string) EnableAuthMiddleware {
r, err := regexp.Compile(pattern)
if err != nil {
log.Fatal(err)
}
return EnableAuthMiddleware{pattern: r}
}

func (a EnableAuthMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if !a.pattern.MatchString(r.URL.Path) {
next.ServeHTTP(w, r)
return
}

// Auth middlewares attach a flag to the context indicating that authentication was successful
if val, ok := ctx.Value("auth").(bool); ok && val {
next.ServeHTTP(w, r)
} else {
http.Error(w, "Forbidden", http.StatusForbidden)
}
})
}

type SystemTokenMiddleware struct {
systemToken string
}

func NewSystemTokenMiddleware(systemToken string) SystemTokenMiddleware {
return SystemTokenMiddleware{systemToken: systemToken}
}

func (s SystemTokenMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authPayload := r.Header.Get("Authorization")
authPayload = strings.TrimPrefix(authPayload, "Bearer ")

if authPayload == s.systemToken {
ctx := context.WithValue(r.Context(), "auth", true)
next.ServeHTTP(w, r.WithContext(ctx))
return
}
next.ServeHTTP(w, r)
})
}

type JwtMiddleware struct {
jwtSecret []byte
}

func NewJwtMiddleware(jwtSecret string) JwtMiddleware {
data, err := base64.StdEncoding.DecodeString(jwtSecret)
if err != nil {
klog.Fatal("failed to base64 decode jwt secret: ", err)
}

return JwtMiddleware{jwtSecret: data}
}

func (j JwtMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authPayload := r.Header.Get("Authorization")
authPayload = strings.TrimPrefix(authPayload, "Bearer ")
if authPayload == "" {
authPayload = getAuthCookie(r)
}

token, err := jwt.Parse(authPayload, func(token *jwt.Token) (interface{}, error) {
return j.jwtSecret, nil
})

if err != nil || !token.Valid {
next.ServeHTTP(w, r)
return
}

ctx := context.WithValue(r.Context(), "auth", true)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

func getAuthCookie(r *http.Request) string {
cookie, err := r.Cookie("airy_auth_token")
if err != nil {
return ""
}
return cookie.Value
}
49 changes: 49 additions & 0 deletions backend/components/streams/cors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"net/http"
"strings"
)

type CORS struct {
allowedOrigins map[string]struct{}
}

func NewCORSMiddleware(allowedOrigins string) CORS {
cors := CORS{allowedOrigins: make(map[string]struct{})}

for _, origin := range strings.Split(allowedOrigins, ",") {
cors.allowedOrigins[origin] = struct{}{}
}

return cors
}

func (c *CORS) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

origin := r.Header.Get("Origin")
_, allowed := c.allowedOrigins[origin]

if !allowed && r.Method == "OPTIONS" {
w.WriteHeader(http.StatusForbidden)
return
}

if allowed {
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Methods", "POST, GET")
w.Header().Set(
"Access-Control-Allow-Headers",
"Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, X-Requested-With, X-XSRF-Token",
)
}

if r.Method == "OPTIONS" {
return
}

next.ServeHTTP(w, r)
})
}
3 changes: 3 additions & 0 deletions backend/components/streams/helm/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
load("//tools/build:helm.bzl", "helm_ruleset_core_version")

helm_ruleset_core_version()
5 changes: 5 additions & 0 deletions backend/components/streams/helm/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: v2
appVersion: "1.0"
description: A Helm chart for the streams backend
name: streams
version: 0.1
10 changes: 10 additions & 0 deletions backend/components/streams/helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.name }}"
labels:
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: "{{ .Values.name }}"
annotations:
core.airy.co/enabled: "{{ .Values.enabled }}"
69 changes: 69 additions & 0 deletions backend/components/streams/helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: "{{ .Values.name }}"
labels:
app: "{{ .Values.name }}"
type: sources
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: "{{ .Values.name }}"
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: "{{ .Values.name }}"
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: "{{ .Values.name }}"
spec:
containers:
- name: app
image: "{{ .Values.global.containerRegistry}}/{{ .Values.image }}:{{ default .Chart.Version .Values.global.appImageTag }}"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: "{{ .Values.name }}"
env:
- name: jwtSecret
valueFrom:
configMapKeyRef:
name: security
key: jwtSecret
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
resources:
{{ toYaml .Values.resources | indent 10 }}
initContainers:
- name: wait
image: "{{ .Values.global.busyboxImage }}"
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "/opt/provisioning/wait-for-service.sh"]
env:
- name: SERVICE_NAME
value: "{{ .Values.ksqldbHost }}"
- name: SERVICE_PORT
value: "{{ .Values.ksqldbPort }}"
volumeMounts:
- name: provisioning-scripts
mountPath: /opt/provisioning
volumes:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
16 changes: 16 additions & 0 deletions backend/components/streams/helm/templates/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: v1
kind: Service
metadata:
name: "{{ .Values.name }}"
labels:
core.airy.co/prometheus: spring
core.airy.co/component: "{{ .Values.name }}"
spec:
ports:
- name: web
port: 80
targetPort: 8080
protocol: TCP
type: NodePort
selector:
app: "{{ .Values.name }}"
11 changes: 11 additions & 0 deletions backend/components/streams/helm/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: api-streams
mandatory: false
enabled: true
image: api/components/streams
resources: {}
ksqldbHost: ksqldb-server
ksqldbPort: "80"
global:
containerRegistry: ghcr.io/airyhq
busyboxImage: ghcr.io/airyhq/infrastructure/busybox:latest
workerType:
43 changes: 43 additions & 0 deletions backend/components/streams/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"log"
"net/http"
"os"

"github.com/gorilla/mux"
"k8s.io/klog"
)

func main() {
r := mux.NewRouter()

if allowedOrigins := os.Getenv("allowedOrigins"); allowedOrigins != "" {
klog.Info("adding cors")
middleware := NewCORSMiddleware(allowedOrigins)
r.Use(middleware.Middleware)
}

// Load authentication middleware only if auth env is present
authEnabled := false
systemToken := os.Getenv("systemToken")
jwtSecret := os.Getenv("jwtSecret")
if systemToken != "" && jwtSecret != "" {
klog.Info("adding system token auth")
r.Use(NewSystemTokenMiddleware(systemToken).Middleware)

klog.Info("adding jwt auth")
r.Use(NewJwtMiddleware(jwtSecret).Middleware)
authEnabled = true
}

if authEnabled {
authMiddleware := MustNewAuthMiddleware("/components|/cluster")
r.Use(authMiddleware.Middleware)
}

streamsList := StreamsList{KSqlHost: "ksqldb-server", KSqlPort: "80"}
r.Handle("/streams.list", &streamsList)

log.Fatal(http.ListenAndServe(":8080", r))
}
25 changes: 25 additions & 0 deletions backend/components/streams/streams_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"encoding/json"
"net/http"

"k8s.io/klog"
)

type StreamsList struct {
KSqlHost string
KSqlPort string
}

func (s *StreamsList) ServeHTTP(w http.ResponseWriter, r *http.Request) {
response, err := json.Marshal(map[string]interface{}{"streams": "none"})
if err != nil {
klog.Error(err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(response)
}

0 comments on commit 7d318a3

Please sign in to comment.