Skip to content

Commit

Permalink
Add a configuration mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
kgantsov committed Sep 15, 2024
1 parent 8394414 commit bcf1797
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 112 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ go build -o doq
Run the first node

```bash
./doq -dataDir data -id node-1 -httpAddr 8001 -raftAddr localhost:9001
./doq --storage.data_dir data --cluster.node_id node-1 --http.port 8001 --raft.address localhost:9001
```

Run other nodes

```bash
./doq -dataDir data -id node-2 -httpAddr 8002 -raftAddr localhost:9002 -join localhost:8001
./doq -dataDir data -id node-3 -httpAddr 8003 -raftAddr localhost:9003 -join localhost:8001
./doq --storage.data_dir data --cluster.node_id node-2 --http.port 8002 --raft.address localhost:9002 --cluster.join_addr localhost:8001
./doq --storage.data_dir data --cluster.node_id node-3 --http.port 8003 --raft.address localhost:9003 --cluster.join_addr localhost:8001
```

You can find swagger docs by opening http://localhost:8001/docs
Expand Down
227 changes: 149 additions & 78 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"flag"
"fmt"
"os"
"path/filepath"
Expand All @@ -10,110 +9,182 @@ import (
"github.com/dgraph-io/badger/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/kgantsov/doq/pkg/cluster"
"github.com/kgantsov/doq/pkg/http"
"github.com/kgantsov/doq/pkg/raft"
)

const (
DefaultHTTPPort = "8000"
DefaultRaftPort = "localhost:9000"
)
type Config struct {
Http struct {
Port string `mapstructure:"port"`
}
Raft struct {
Address string `mapstructure:"address"`
}
Storage struct {
DataDir string `mapstructure:"data_dir"`
}
Cluster struct {
NodeID string `mapstructure:"node_id"`
ServiceName string `mapstructure:"service_name"`
JoinAddr string `mapstructure:"join_addr"`
}
}

var httpPort string
var raftPort string
var dataDir string
var nodeID string
var joinAddr string
var ServiceName string
var cfgFile string

var rootCmd = &cobra.Command{
Use: "doq",
Short: "DOQ is a distributed queue",
Run: func(cmd *cobra.Command, args []string) {
// Load the config
config, err := loadConfig()
if err != nil {
fmt.Printf("Error loading config: %v\n", err)
return
}

func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339})
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339})

zerolog.SetGlobalLevel(zerolog.DebugLevel)
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
zerolog.SetGlobalLevel(zerolog.DebugLevel)
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix

flag.StringVar(&httpPort, "httpAddr", DefaultHTTPPort, "Set the HTTP bind address")
flag.StringVar(&raftPort, "raftAddr", DefaultRaftPort, "Set Raft bind address")
flag.StringVar(&dataDir, "dataDir", DefaultRaftPort, "Set data directory")
flag.StringVar(&joinAddr, "join", "", "Set join address, if any")
flag.StringVar(&nodeID, "id", "", "Node ID. If not set, same as Raft bind address")
flag.StringVar(&ServiceName, "service-name", "", "Name of the service in Kubernetes")
if config.Storage.DataDir == "" {
log.Info().Msg("No storage directory specified")
}
if err := os.MkdirAll(config.Storage.DataDir, 0700); err != nil {
log.Fatal().Msgf("failed to create path '%s' for a storage: %s", config.Storage.DataDir, err.Error())
}

flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [options] <raft-data-path> \n", os.Args[0])
flag.PrintDefaults()
}
hosts := []string{}

flag.Parse()
var cl *cluster.Cluster
var j *cluster.Joiner

if dataDir == "" {
log.Info().Msg("No storage directory specified")
}
if err := os.MkdirAll(dataDir, 0700); err != nil {
log.Fatal().Msgf("failed to create path '%s' for a storage: %s", dataDir, err.Error())
}
if config.Cluster.ServiceName != "" {
namespace := "default"
serviceDiscovery := cluster.NewServiceDiscoverySRV(namespace, config.Cluster.ServiceName)
cl = cluster.NewCluster(serviceDiscovery, namespace, config.Cluster.ServiceName, config.Http.Port)

hosts := []string{}
if err := cl.Init(); err != nil {
log.Warn().Msgf("Error initialising a cluster: %s", err)
os.Exit(1)
}

var cl *cluster.Cluster
var j *cluster.Joiner
config.Cluster.NodeID = cl.NodeID()
config.Raft.Address = cl.RaftAddr()
hosts = cl.Hosts()

if ServiceName != "" {
namespace := "default"
serviceDiscovery := cluster.NewServiceDiscoverySRV(namespace, ServiceName)
cl = cluster.NewCluster(serviceDiscovery, namespace, ServiceName, httpPort)
} else {
if config.Cluster.JoinAddr != "" {
hosts = append(hosts, config.Cluster.JoinAddr)
}
}

if err := cl.Init(); err != nil {
log.Warn().Msgf("Error initialising a cluster: %s", err)
os.Exit(1)
opts := badger.DefaultOptions(
filepath.Join(config.Storage.DataDir, config.Cluster.NodeID, "store"),
)
db, err := badger.Open(opts)
if err != nil {
log.Fatal().Msg(err.Error())
}
defer db.Close()

log.Info().Msgf(
"Starting node (%s) %s with HTTP on %s and Raft on %s %+v",
config.Cluster.ServiceName,
config.Cluster.NodeID,
config.Http.Port,
config.Raft.Address,
hosts,
)
node := raft.NewNode(
db,
filepath.Join(config.Storage.DataDir, config.Cluster.NodeID, "raft"),
config.Cluster.NodeID,
config.Http.Port,
config.Raft.Address,
hosts,
)

if config.Cluster.ServiceName != "" {
node.SetLeaderChangeFunc(cl.LeaderChanged)
}

nodeID = cl.NodeID()
raftPort = cl.RaftAddr()
hosts = cl.Hosts()
node.Initialize()

} else {
if joinAddr != "" {
hosts = append(hosts, joinAddr)
// If join was specified, make the join request.
j = cluster.NewJoiner(config.Cluster.NodeID, config.Raft.Address, hosts)

if err := j.Join(); err != nil {
log.Fatal().Msg(err.Error())
}
}

opts := badger.DefaultOptions(filepath.Join(dataDir, nodeID, "store"))
db, err := badger.Open(opts)
if err != nil {
log.Fatal().Msg(err.Error())
}
defer db.Close()

log.Info().Msgf(
"Starting node (%s) %s with HTTP on %s and Raft on %s %+v",
ServiceName,
nodeID,
httpPort,
raftPort,
hosts,
)
node := raft.NewNode(
db, filepath.Join(dataDir, nodeID, "raft"), nodeID, httpPort, raftPort, hosts,
)

if ServiceName != "" {
node.SetLeaderChangeFunc(cl.LeaderChanged)
h := http.NewHttpService(config.Http.Port, node)
if err := h.Start(); err != nil {
log.Error().Msgf("failed to start HTTP service: %s", err.Error())
}
},
}

func loadConfig() (*Config, error) {
var config Config

// Unmarshal the config into the struct
if err := viper.Unmarshal(&config); err != nil {
return nil, fmt.Errorf("unable to decode into struct, %v", err)
}

node.Initialize()
return &config, nil
}

func init() {
cobra.OnInitialize(initConfig)

// Command-line flags
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is config.yaml)")
rootCmd.Flags().String("http.port", "8000", "Port to run the HTTP server on")
rootCmd.Flags().String("raft.address", "localhost:9000", "Raft bind address")
rootCmd.Flags().String("storage.data_dir", "data", "Data directory")
rootCmd.Flags().String("cluster.node_id", "node-1", "Node ID. If not set, same as Raft bind address")
rootCmd.Flags().String("cluster.service_name", "", "Name of the service in Kubernetes")
rootCmd.Flags().String("cluster.join_addr", "", "Set join address, if any")

// Bind CLI flags to Viper settings
viper.BindPFlag("http.port", rootCmd.Flags().Lookup("http.port"))
viper.BindPFlag("raft.address", rootCmd.Flags().Lookup("raft.address"))
viper.BindPFlag("storage.data_dir", rootCmd.Flags().Lookup("storage.data_dir"))
viper.BindPFlag("cluster.node_id", rootCmd.Flags().Lookup("cluster.node_id"))
viper.BindPFlag("cluster.service_name", rootCmd.Flags().Lookup("cluster.service_name"))
viper.BindPFlag("cluster.join_addr", rootCmd.Flags().Lookup("cluster.join_addr"))
}

func initConfig() {
log.Info().Msgf("Initializing config %s", cfgFile)

if cfgFile != "" {
// Use config file from the flag
viper.SetConfigFile(cfgFile)
} else {
// Default config file
viper.AddConfigPath(".")
viper.SetConfigName("config")
}

// If join was specified, make the join request.
j = cluster.NewJoiner(nodeID, raftPort, hosts)
// Enable environment variable support
viper.AutomaticEnv()

if err := j.Join(); err != nil {
log.Fatal().Msg(err.Error())
// Read the config file if found
if err := viper.ReadInConfig(); err == nil {
log.Warn().Msgf("Using config file: %s", viper.ConfigFileUsed())
}
}

h := http.NewHttpService(httpPort, node)
if err := h.Start(); err != nil {
log.Error().Msgf("failed to start HTTP service: %s", err.Error())
func main() {
if err := rootCmd.Execute(); err != nil {
log.Warn().Err(err)
}
}
28 changes: 14 additions & 14 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ services:
"go",
"run",
"main.go",
"-dataDir",
"--storage.data_dir",
"/usr/local/doq/data",
"-id",
"--cluster.node_id",
"node-1",
"-httpAddr",
"--http.port",
"8000",
"-raftAddr",
"--raft.address",
"node-1:9000",
]
node-2:
Expand All @@ -39,15 +39,15 @@ services:
"go",
"run",
"main.go",
"-dataDir",
"--storage.data_dir",
"/usr/local/doq/data",
"-id",
"--cluster.node_id",
"node-2",
"-httpAddr",
"--http.port",
"8000",
"-raftAddr",
"--raft.address",
"node-2:9000",
"-join",
"--cluster.join_addr",
"node-1:8000"
]
node-3:
Expand All @@ -64,15 +64,15 @@ services:
"go",
"run",
"main.go",
"-dataDir",
"--storage.data_dir",
"/usr/local/doq/data",
"-id",
"--cluster.node_id",
"node-3",
"-httpAddr",
"--http.port",
"8000",
"-raftAddr",
"--raft.address",
"node-3:9000",
"-join",
"--cluster.join_addr",
"node-1:8000"
]
python:
Expand Down
Loading

0 comments on commit bcf1797

Please sign in to comment.