Skip to content

Commit

Permalink
add sync mode
Browse files Browse the repository at this point in the history
  • Loading branch information
konrin-thesoul committed Sep 28, 2021
1 parent 62dfb94 commit f2d8807
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 85 deletions.
8 changes: 4 additions & 4 deletions cmd/skasync/api/sync.http
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ PUT http://localhost:60001/sync/in/pod
Content-Type: application/json

{
"artifact": "thesoul/thesoul-dev",
"container": "php",
"path": "app/TheSoul/src/TheSoul"
"artifact": "nginx/nginx-dev",
"container": "nginx",
"path": "to/path"
}

###
Expand All @@ -13,5 +13,5 @@ PUT http://localhost:60001/sync/in/allPods
Content-Type: application/json

{
"path": "app/TheSoul/src/TheSoul/Article"
"path": "to/path"
}
108 changes: 102 additions & 6 deletions cmd/skasync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,47 @@ import (
"skasync/pkg/k8s"
"skasync/pkg/skaffold"
"skasync/pkg/sync"
"strings"

"github.com/kelseyhightower/envconfig"
)

const (
WatcherMode = "watcher"
SyncMode = "sync"
)

const (
InSyncDiraction uint = iota
OutSyncDiraction
)

type Config struct {
Context,
Namespace,
RootDir string
Mode string
Pods []k8s.PodConfig
Sync sync.Config
Skaffold skaffold.Config
API api.Config
SyncArgs SyncArgs
}

type SyncArgs struct {
SyncDiraction uint
SyncInArgs SyncInArgs
SyncOutArgs SyncOutArgs
}

type SyncInArgs struct {
Pods []string
IsAllPods bool
Paths []string
}

type SyncOutArgs struct{}

type envConfig struct {
Context,
Namespace string
Expand All @@ -47,7 +74,12 @@ func LoadConfig() (*Config, error) {
return nil, err
}

flagsCfg, err := readFlags(currentDirPath)
err = readMode(&cfg)
if err != nil {
return nil, err
}

flagsCfg, err := readFlags(cfg.Mode, currentDirPath)
if err != nil {
return nil, err
}
Expand All @@ -57,6 +89,13 @@ func LoadConfig() (*Config, error) {
return nil, err
}

if cfg.Mode == SyncMode {
err = readSyncArgs(&cfg)
if err != nil {
return nil, err
}
}

if len(cfg.Context) == 0 {
cfg.Context = flagsCfg.Context
}
Expand Down Expand Up @@ -93,20 +132,54 @@ func defaultConfig(rootDirPath string) Config {
}
}

func readFlags(rootDirPath string) (*flagsConfig, error) {
func readFlags(mode, rootDirPath string) (*flagsConfig, error) {
cfg := &flagsConfig{}

argBais := 2
for i, arg := range os.Args {
if arg[0] != '-' {
continue
}

argBais = i
}

configPath := filepath.Join(rootDirPath, "skasync.config.json")

flag.StringVar(&cfg.ConfigFilePath, "c", configPath, "Config file")
flag.StringVar(&cfg.Context, "context", "", "Using kubctl context")
flag.StringVar(&cfg.Namespace, "ns", "", "Using kubctl namespace")
flagSet := flag.NewFlagSet("config", flag.ContinueOnError)

flag.Parse()
flagSet.StringVar(&cfg.ConfigFilePath, "c", configPath, "Config file")
flagSet.StringVar(&cfg.Context, "context", "", "Using kubctl context")
flagSet.StringVar(&cfg.Namespace, "ns", "", "Using kubctl namespace")

flagSet.Parse(os.Args[argBais:])

if !filepath.IsAbs(cfg.ConfigFilePath) {
cfg.ConfigFilePath = filepath.Join(rootDirPath, cfg.ConfigFilePath)
}

return cfg, nil
}

func readMode(cfg *Config) error {
if len(os.Args) < 2 {
return errors.New("mode not found")
}

mode := os.Args[1]

switch mode {
case WatcherMode:
cfg.Mode = WatcherMode
case SyncMode:
cfg.Mode = SyncMode
default:
return errors.New("undefined mode: " + mode)
}

return nil
}

func readEnvs(cfg *Config) error {
envCfg := envConfig{}

Expand All @@ -126,6 +199,29 @@ func readEnvs(cfg *Config) error {
return nil
}

func readSyncArgs(cfg *Config) error {
if len(os.Args) < 4 {
return errors.New("args length error")
}

switch os.Args[2] {
case "in":
pods := os.Args[3]
if pods == "all" {
cfg.SyncArgs.SyncInArgs.IsAllPods = true
} else {
cfg.SyncArgs.SyncInArgs.Pods = strings.Split(pods, ",")
}

cfg.SyncArgs.SyncInArgs.Paths = strings.Split(os.Args[4], ",")
case "out":
default:
return fmt.Errorf("sync diraction %s is undefined", os.Args[2])
}

return nil
}

func readFile(cfg *Config, configFilePath string) error {
currentPath, err := os.Getwd()
if err != nil {
Expand Down
75 changes: 4 additions & 71 deletions cmd/skasync/skasync.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,7 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"skasync/cmd/skasync/api"
"skasync/pkg/cli"
"skasync/pkg/filemon"
"skasync/pkg/filesystem"
"skasync/pkg/k8s"
"skasync/pkg/skaffold"
"skasync/pkg/sync"
"syscall"

"github.com/labstack/echo/v4"
)

func main() {
Expand All @@ -23,64 +10,10 @@ func main() {
log.Fatal(err)
}

mainCtx := context.Background()

watcherCh := make(chan []string, 100)
skaffoldLayerCh := make(chan []string, 100)
errorsCh := make(chan error, 1)

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT)

ccli := cli.NewCLI(cfg.Context, cfg.Namespace)
kubeCtl := cli.NewKubeCtl(ccli)
podsCtrl := k8s.NewPodsCtrl(cfg.RootDir, cfg.Pods, kubeCtl)
refFilesMapService := filesystem.NewFilesMapService(cfg.RootDir)
// refFilesMap := filesystem.NewRefFilesMap(refFilesMapService)
watcher := filemon.NewWatcher(cfg.RootDir, cfg.Sync.Debounce)
podSyncker := sync.NewPodSyncer(cfg.RootDir, ccli, podsCtrl, refFilesMapService)
skaffoldStatusProbe := skaffold.NewStatusProbe(cfg.Skaffold.Addr, podsCtrl)
skaffoldStatusLayer := sync.NewSkaffoldStatusLayer(skaffoldLayerCh, podsCtrl)

go func() {
errorsCh <- api.NewAPIListenerAndStart(cfg.API, func(e *echo.Echo) error {
api.NewSyncController(e.Group("/sync"), podSyncker, podsCtrl)

return nil
})
}()

if err := podsCtrl.Refresh(); err != nil {
log.Fatal(err)
if cfg.Mode == WatcherMode {
RunWatcher(cfg)
return
}

skaffoldStatusProbe.Subscribe(skaffoldStatusLayer.StatusHandler)

go func() {
errorsCh <- watcher.Watch(mainCtx, watcherCh)
}()

go func() {
errorsCh <- skaffoldStatusLayer.Do(mainCtx, watcherCh)
}()

go func() {
errorsCh <- podSyncker.Do(mainCtx, skaffoldLayerCh)
}()

go func() {
errorsCh <- skaffoldStatusProbe.Listen(mainCtx)
}()

println("Skasync is started")

// Wait critical error or term signal
select {
case err := <-errorsCh:
mainCtx.Done()
log.Fatal(err)
case <-sigChan:
mainCtx.Done()
println("Receive stop signal")
}
RunSync(cfg)
}
64 changes: 64 additions & 0 deletions cmd/skasync/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"log"
"skasync/pkg/cli"
"skasync/pkg/filesystem"
"skasync/pkg/k8s"
"skasync/pkg/sync"
"strings"
)

// skasync sync -> * to/path
// skasync sync <- podName to/path
func RunSync(cfg *Config) {
mainCtx := context.Background()

ccli := cli.NewCLI(cfg.Context, cfg.Namespace)
kubeCtl := cli.NewKubeCtl(ccli)
podsCtrl := k8s.NewPodsCtrl(cfg.RootDir, cfg.Pods, kubeCtl)
refFilesMapService := filesystem.NewFilesMapService(cfg.RootDir)
podSyncker := sync.NewPodSyncer(cfg.RootDir, ccli, podsCtrl, refFilesMapService)

if err := podsCtrl.Refresh(); err != nil {
log.Fatal(err)
}

if cfg.SyncArgs.SyncDiraction == InSyncDiraction {
inSyncDiraction(mainCtx, cfg.SyncArgs, podsCtrl, podSyncker)
return
}

outSyncDiraction(mainCtx)
}

func inSyncDiraction(ctx context.Context, cfg SyncArgs, podsCtrl *k8s.PodsCtrl, podSyncker *sync.PodSyncer) {
var pods []*k8s.Pod

if cfg.SyncInArgs.IsAllPods {
pods = podsCtrl.GetPods()
} else {
pods = make([]*k8s.Pod, 0, len(cfg.SyncInArgs.Pods))

for _, podArg := range cfg.SyncInArgs.Pods {
podSp := strings.Split(podArg, ":")
if len(podSp) != 2 {
log.Fatalf("not found container name in pod %s", podArg)
}

pod, err := podsCtrl.Find(podSp[0], podSp[1])
if err != nil {
log.Fatal(err)
}

pods = append(pods, pod)
}
}

podSyncker.SyncLocalPathsToPods(pods, cfg.SyncInArgs.Paths)
}

func outSyncDiraction(ctx context.Context) {
panic("not implement")
}
Loading

0 comments on commit f2d8807

Please sign in to comment.