Skip to content

Commit

Permalink
NET-1226: Scalability Improvements (#2987)
Browse files Browse the repository at this point in the history
* add api to check if failover node existed

* remove 5 minute peerUpdate

* update peerUpdate to trigger pull

* update Action name to SignalPull

* revert the peerUpdate from SignalPull

* fix getfailover error issue

* rm acls creation for on-prem emqx

* remove use of acls

* add additional broker status field on status api

* NET-1165: Remove creation of acls on emqx (#2996)

* rm acls creation for on-prem emqx

* remove use of acls

* add additional broker status field on status api

* comment out mq reconnect logic

* configure mq conn params

* add metric_interval in ENV for publishing metrics

* add metric_interval in ENV for publishing metrics

* update PUBLISH_METRIC_INTERVAL env name

* revert the mq setttings back

* fix error nil issue

---------

Co-authored-by: abhishek9686 <abhi281342@gmail.com>
Co-authored-by: Abhishek K <32607604+abhishek9686@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 9, 2024
1 parent 5b9ccb7 commit 65faf73
Show file tree
Hide file tree
Showing 16 changed files with 77 additions and 231 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#first stage - builder
FROM gravitl/go-builder as builder
FROM gravitl/go-builder AS builder
ARG tags
WORKDIR /app
COPY . .
Expand Down
4 changes: 0 additions & 4 deletions auth/host_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@ func SessionHandler(conn *websocket.Conn) {
logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
return
}
if err := mq.GetEmqxHandler().CreateHostACL(result.Host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
return
}
}
logic.CheckHostPorts(&result.Host)
if err := logic.CreateHost(&result.Host); err != nil {
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type ServerConfig struct {
CacheEnabled string `yaml:"caching_enabled"`
EndpointDetection bool `json:"endpoint_detection"`
AllowedEmailDomains string `yaml:"allowed_email_domains"`
MetricInterval string `yaml:"metric_interval"`
}

// SQLConfig - Generic SQL Config
Expand Down
4 changes: 0 additions & 4 deletions controllers/enrollmentkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,6 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
return
}
if err := mq.GetEmqxHandler().CreateHostACL(newHost.ID.String(), servercfg.GetServerInfo().Server); err != nil {
logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
return
}
}
if err = logic.CreateHost(&newHost); err != nil {
logger.Log(
Expand Down
15 changes: 1 addition & 14 deletions controllers/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,23 +555,10 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
return
}
go func() {
// Create EMQX creds and ACLs if not found
// Create EMQX creds
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
slog.Error("failed to create host credentials for EMQX: ", err.Error())
} else {
if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
}
for _, nodeID := range host.Nodes {
if node, err := logic.GetNodeByID(nodeID); err == nil {
if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
slog.Error("failed to add ACLs for EMQX node", "error", err)
}
} else {
slog.Error("failed to get node", "nodeid", nodeID, "error", err)
}
}
}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions controllers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) {
type status struct {
DB bool `json:"db_connected"`
Broker bool `json:"broker_connected"`
IsBrokerConnOpen bool `json:"is_broker_conn_open"`
LicenseError string `json:"license_error"`
IsPro bool `json:"is_pro"`
TrialEndDate time.Time `json:"trial_end_date"`
Expand All @@ -141,6 +142,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) {
currentServerStatus := status{
DB: database.IsConnected(),
Broker: mq.IsConnected(),
IsBrokerConnOpen: mq.IsConnectionOpen(),
LicenseError: licenseErr,
IsPro: servercfg.IsPro,
TrialEndDate: trialEndDate,
Expand Down
2 changes: 2 additions & 0 deletions models/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ const (
UpdateKeys HostMqAction = "UPDATE_KEYS"
// RequestPull - request a pull from a host
RequestPull HostMqAction = "REQ_PULL"
// SignalPull - request a pull from a host without restart
SignalPull HostMqAction = "SIGNAL_PULL"
// UpdateMetrics - updates metrics data
UpdateMetrics HostMqAction = "UPDATE_METRICS"
)
Expand Down
27 changes: 14 additions & 13 deletions models/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,20 @@ type NodeJoinResponse struct {

// ServerConfig - struct for dealing with the server information for a netclient
type ServerConfig struct {
CoreDNSAddr string `yaml:"corednsaddr"`
API string `yaml:"api"`
APIPort string `yaml:"apiport"`
DNSMode string `yaml:"dnsmode"`
Version string `yaml:"version"`
MQPort string `yaml:"mqport"`
MQUserName string `yaml:"mq_username"`
MQPassword string `yaml:"mq_password"`
BrokerType string `yaml:"broker_type"`
Server string `yaml:"server"`
Broker string `yaml:"broker"`
IsPro bool `yaml:"isee" json:"Is_EE"`
TrafficKey []byte `yaml:"traffickey"`
CoreDNSAddr string `yaml:"corednsaddr"`
API string `yaml:"api"`
APIPort string `yaml:"apiport"`
DNSMode string `yaml:"dnsmode"`
Version string `yaml:"version"`
MQPort string `yaml:"mqport"`
MQUserName string `yaml:"mq_username"`
MQPassword string `yaml:"mq_password"`
BrokerType string `yaml:"broker_type"`
Server string `yaml:"server"`
Broker string `yaml:"broker"`
IsPro bool `yaml:"isee" json:"Is_EE"`
TrafficKey []byte `yaml:"traffickey"`
MetricInterval string `yaml:"metric_interval"`
}

// User.NameInCharset - returns if name is in charset below or not
Expand Down
5 changes: 1 addition & 4 deletions mq/emqx.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ type Emqx interface {
CreateEmqxUserforServer() error
CreateEmqxDefaultAuthenticator() error
CreateEmqxDefaultAuthorizer() error
CreateDefaultDenyRule() error
CreateHostACL(hostID, serverName string) error
AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error
GetUserACL(username string) (*aclObject, error)
CreateDefaultAllowRule() error
DeleteEmqxUser(username string) error
}

Expand Down
13 changes: 1 addition & 12 deletions mq/emqx_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,10 @@ func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ign

func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore

func (e *EmqxCloud) CreateDefaultDenyRule() error {
func (e *EmqxCloud) CreateDefaultAllowRule() error {
return nil
}

func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
return nil
}

func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
return nil

}

func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list

func (e *EmqxCloud) DeleteEmqxUser(username string) error {

client := &http.Client{}
Expand Down
156 changes: 3 additions & 153 deletions mq/emqx_on_prem.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"net/http"
"strings"
"sync"

"github.com/gravitl/netmaker/servercfg"
)
Expand Down Expand Up @@ -246,45 +245,14 @@ func (e *EmqxOnPrem) CreateEmqxDefaultAuthorizer() error {
return nil
}

// GetUserACL - returns ACL rules by username
func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) {
token, err := getEmqxAuthToken()
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil)
if err != nil {
return nil, err
}
req.Header.Add("content-type", "application/json")
req.Header.Add("authorization", "Bearer "+token)
resp, err := (&http.Client{}).Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
response, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("error fetching ACL rules %v", string(response))
}
body := new(aclObject)
if err := json.Unmarshal(response, body); err != nil {
return nil, err
}
return body, nil
}

// CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
// CreateDefaultAllowRule - creates a rule to deny access to all topics for all users by default
// to allow user access to topics use the `mq.CreateUserAccessRule` function
func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
func (e *EmqxOnPrem) CreateDefaultAllowRule() error {
token, err := getEmqxAuthToken()
if err != nil {
return err
}
payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}})
payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "allow", Action: "all"}}})
if err != nil {
return err
}
Expand All @@ -308,121 +276,3 @@ func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
}
return nil
}

// CreateHostACL - create host ACL rules
func (e *EmqxOnPrem) CreateHostACL(hostID, serverName string) error {
token, err := getEmqxAuthToken()
if err != nil {
return err
}
payload, err := json.Marshal(&aclObject{
Username: hostID,
Rules: []aclRule{
{
Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
Permission: "allow",
Action: "all",
},
{
Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
Permission: "allow",
Action: "all",
},
{
Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
Permission: "allow",
Action: "all",
},
},
})
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Add("content-type", "application/json")
req.Header.Add("authorization", "Bearer "+token)
resp, err := (&http.Client{}).Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
msg, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
}
return nil
}

// a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other
// might occur when multiple nodes belonging to the same host are created at the same time
var nodeAclMux sync.Mutex

// AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
func (e *EmqxOnPrem) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
nodeAclMux.Lock()
defer nodeAclMux.Unlock()
token, err := getEmqxAuthToken()
if err != nil {
return err
}
aclObject, err := emqx.GetUserACL(hostID)
if err != nil {
return err
}
aclObject.Rules = append(aclObject.Rules, []aclRule{
{
Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
Permission: "allow",
Action: "subscribe",
},
{
Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
Permission: "allow",
Action: "all",
},
{
Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
Permission: "allow",
Action: "all",
},
{
Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
Permission: "allow",
Action: "all",
},
{
Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
Permission: "allow",
Action: "all",
},
}...)
payload, err := json.Marshal(aclObject)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Add("content-type", "application/json")
req.Header.Add("authorization", "Bearer "+token)
resp, err := (&http.Client{}).Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
msg, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
}
return nil
}
6 changes: 0 additions & 6 deletions mq/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,6 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
slog.Error("failed to send new node to host", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
return
} else {
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err = emqx.AppendNodeUpdateACL(hu.Host.ID.String(), hu.Node.Network, hu.Node.ID.String(), servercfg.GetServer()); err != nil {
slog.Error("failed to add ACLs for EMQX node", "error", err)
return
}
}
nodes, err := logic.GetAllNodes()
if err != nil {
return
Expand Down
7 changes: 6 additions & 1 deletion mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func SetupMQTT(fatal bool) {
logger.Log(0, err.Error())
}
// create a default deny ACL to all topics for all users
if err := emqx.CreateDefaultDenyRule(); err != nil {
if err := emqx.CreateDefaultAllowRule(); err != nil {
log.Fatal(err)
}
} else {
Expand Down Expand Up @@ -142,6 +142,11 @@ func Keepalive(ctx context.Context) {

// IsConnected - function for determining if the mqclient is connected or not
func IsConnected() bool {
return mqclient != nil && mqclient.IsConnected()
}

// IsConnectionOpen - function for determining if the mqclient is connected or not
func IsConnectionOpen() bool {
return mqclient != nil && mqclient.IsConnectionOpen()
}

Expand Down
Loading

0 comments on commit 65faf73

Please sign in to comment.