Skip to content

Commit

Permalink
Improve proxy logic and core. (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
microup authored Aug 11, 2023
1 parent 6a0d56c commit cdf629b
Show file tree
Hide file tree
Showing 22 changed files with 219 additions and 130 deletions.
67 changes: 43 additions & 24 deletions internal/app/vbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"errors"
"fmt"
"log"
"os"
Expand All @@ -10,70 +11,88 @@ import (
"syscall"

"vbalancer/internal/config"
"vbalancer/internal/core"
"vbalancer/internal/proxy"
"vbalancer/internal/types"
"vbalancer/internal/vlog"
)

var ErrRecoveredPanic = errors.New("recovered from panic")

// Run this is the function of an application that starts a proxy server.
//
//nolint:funlen,cyclop
func Run() {
runtime.GOMAXPROCS(runtime.NumCPU())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := context.Background()

cfg := config.New()
logger := vlog.New(cfg.Log)

err := cfg.Init()
if err != nil {
log.Panicf("failed to initialize configuration: %s", err.Error())
}

logger, err := vlog.New(cfg.Logger)
err = logger.Init()
if err != nil {
log.Panicf("failed to create logger: %s", err)
}

defer func(logger vlog.ILog) {
err = logger.Close()
if err != nil {
log.Fatalf("failed close logger: %v", err)
}
}(logger)

defer func() {
if err := recover(); err != nil {
msgErr := fmt.Errorf("%w: %v", types.ErrRecoveredPanic, err)
msgErr := fmt.Errorf("%w: %v", ErrRecoveredPanic, err)

logger.Add(vlog.Fatal, types.ErrGotPanic, msgErr)
log.Printf("%v", msgErr)
}
}()

defer func(logger vlog.ILog) {
err = logger.Close()
if err != nil {
log.Fatalf("failed close logger: %v", err)
}
}(logger)

proxyBalancer := cfg.Proxy
proxy, err := core.GetObjectFromMap(cfg.Proxy, proxy.New())
if err != nil {
logger.Add(vlog.Fatal, types.ErrCantGetProxyObject, "can't get proxy object")
}

err = proxyBalancer.Init(ctx, logger)
err = proxy.Init(ctx, logger)
if err != nil {
logger.Add(vlog.Fatal, types.ErrCantInitProxy, fmt.Errorf("%w: %v", types.ErrInitProxy, err))
logger.Add(vlog.Fatal, types.ErrCantInitProxy, fmt.Errorf("%w", err))
}

stopSignal := make(chan os.Signal, 1)
signal.Notify(stopSignal, os.Interrupt, syscall.SIGTERM)

listenProxyChan := make(chan error)
chanListenProxy := make(chan error)

go func() {
logger.Add(vlog.Info, types.ResultOK, fmt.Sprintf("start server addr on %s", cfg.Proxy.Port))
listenProxyChan <- proxyBalancer.ListenAndServe(ctx, cfg.Proxy.Port)
logger.Add(vlog.Info, types.ResultOK, fmt.Sprintf("start server addr on %s", proxy.Port))
chanListenProxy <- proxy.ListenAndServe(ctx)

stopSignal <- syscall.SIGTERM
}()

listenErr := <-listenProxyChan
if listenErr != nil {
logger.Add(vlog.Fatal, types.ErrProxy, fmt.Errorf("the proxy was return err: %w", err))
select {
case <-ctx.Done():
logger.Add(vlog.Info, types.ResultOK, "get ctx.Done()...")

case listenErr := <-chanListenProxy:
{
if listenErr != nil {
logger.Add(vlog.Fatal, types.ErrProxy, fmt.Errorf("the proxy was return err: %w", err))
} else {
log.Printf("the proxy was close")
}
}
case <-stopSignal:
{
logger.Add(vlog.Info, types.ResultOK, "get syscall.SIGTERM...")
}
}

<-stopSignal

logger.Add(vlog.Info, types.ResultOK, "received shutdown signal, exiting gracefully...")
}
29 changes: 10 additions & 19 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,46 @@ import (
"os"
"path/filepath"

"vbalancer/internal/proxy"
"vbalancer/internal/types"
"vbalancer/internal/vlog"

"gopkg.in/yaml.v2"
)

const DefaultConfigFile = "config.yaml"

const DefaultFileLogSizeBytes = 100000
const DeafultShowRecordsAPI = 50
const DefaultDirLogs = "/logs"
var ErrCantFindProxySection = errors.New("can't find proxy section in config")

// Config is the configuration of the proxy server.
type Config struct {
// Logger is the configuration for the logger.
Logger *vlog.Config `yaml:"logger" json:"logger"`
Log *Log `yaml:"logger" json:"logger"`
// Proxy is the configuration for the proxy server.
Proxy *proxy.Proxy `yaml:"proxy" json:"proxy"`
Proxy any `yaml:"proxy" json:"proxy"`
}

// New creates a new configuration for the vbalancer application.
func New() *Config {
return &Config{
Logger: &vlog.Config{
DirLog: DefaultDirLogs,
FileSize: DefaultFileLogSizeBytes,
APIShowRecords: DeafultShowRecordsAPI,
Log: &Log{
DirLog: types.DefaultDirLogs,
FileSizeMB: types.DefaultFileLogSizeMB,
APIShowRecords: types.DeafultShowRecordsAPI,
},
Proxy: nil,
}
}

// Init initializes the configuration by loading values from a YAML file.
func (c *Config) Init() error {
configFile := os.Getenv("ConfigFile")
if configFile == "" {
configFile = DefaultConfigFile
configFile = types.DefaultNameConfigFile
}

if err := c.Load(configFile); err != nil {
return err
}

if c.Proxy == nil {
return fmt.Errorf("%w", types.ErrCantGetProxySection)
}

if resultCode := c.Proxy.UpdatePort(); resultCode != types.ResultOK {
return fmt.Errorf("%w: %s", types.ErrCantGetProxyPort, resultCode.ToStr())
return fmt.Errorf("%w", ErrCantFindProxySection)
}

return nil
Expand Down
11 changes: 11 additions & 0 deletions internal/config/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package config

// Config defines the structure for storing configuration values read from a YAML file.
type Log struct {
// directory where log files are stored
DirLog string `yaml:"dirLog"`
// maximum size of a log file, in megabytes
FileSizeMB uint64 `yaml:"fileSizeInMb"`
// number of records to show when
APIShowRecords uint64 `yaml:"apiShowRecords"`
}
File renamed without changes.
File renamed without changes.
12 changes: 0 additions & 12 deletions internal/core/consts.go

This file was deleted.

9 changes: 5 additions & 4 deletions internal/core/converts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"math"
"strconv"
"vbalancer/internal/types"
)

const (
Expand All @@ -20,20 +21,20 @@ func HumanFileSize(size float64) string {
suffixes[3] = "GB"
suffixes[4] = "TB"

base := math.Log(size) / math.Log(LengthByte)
getSize := Round(math.Pow(LengthByte, base-math.Floor(base)), RoundOne, place)
base := math.Log(size) / math.Log(types.LengthByte)
getSize := Round(math.Pow(types.LengthByte, base-math.Floor(base)), types.RoundOne, place)

var suffix string
if base > 0 {
suffix = suffixes[int(math.Floor(base))]
}

return strconv.FormatFloat(getSize, 'f', prec, BitSize) + " " + suffix
return strconv.FormatFloat(getSize, 'f', prec, types.BitSize) + " " + suffix
}

// Round rounds a float to a given number of decimal places.
func Round(val float64, roundOn float64, places int) float64 {
pow := math.Pow(PowX, float64(places))
pow := math.Pow(types.PowX, float64(places))

return math.Round(val*pow) / pow
}
File renamed without changes.
File renamed without changes.
35 changes: 35 additions & 0 deletions internal/core/objects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package core

import (
"errors"
"fmt"

"gopkg.in/yaml.v2"
)

var ErrIncorectType = errors.New("incorrect types")

//nolint
func GetObjectFromMap[T any](objectMap any, unmarshalObject T) (T, error) {
var objectBytes []byte

var err error

switch p := objectMap.(type) {
case map[interface{}]interface{}:
objectBytes, err = yaml.Marshal(p)
default:
return unmarshalObject, fmt.Errorf("%w for : %T", ErrIncorectType, p)
}

if err != nil {
return unmarshalObject, fmt.Errorf("failed to marshal objectMap: %w", err)
}

err = yaml.Unmarshal(objectBytes, &unmarshalObject)
if err != nil {
return unmarshalObject, fmt.Errorf("failed to unmarshal output object: %w", err)
}

return unmarshalObject, nil
}
File renamed without changes.
File renamed without changes.
40 changes: 31 additions & 9 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package proxy

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand All @@ -17,11 +18,12 @@ import (
"vbalancer/internal/vlog"
)

// DefaultPort is the default port for the proxy server.
const DefaultPort = 8080

const MaxCountCopyData = 2

var ErrCantGetProxyPort = errors.New("can't get proxy port")
var ErrMaxCountAttempts = errors.New("exceeded maximum number of attempts")
var ErrConfigPeersIsNil = errors.New("empty list peer in config file")

// Proxy defines the structure for the proxy server.
type Proxy struct {
//
Expand All @@ -44,6 +46,22 @@ type Proxy struct {
Rules *rules.Rules `yaml:"rules" json:"rules"`
}

func New() *Proxy {
return &Proxy{
Logger: nil,
Port: types.DefaultProxyPort,
ClientDeadLineTime: types.DeafultClientDeadLineTime,
PeerHostTimeOut: types.DeafultPeerHostTimeOut,
PeerHostDeadLine: types.DeafultPeerHostDeadLine,
MaxCountConnection: types.DeafultMaxCountConnection,
CountMaxDialAttemptsToPeer: types.DeafultCountMaxDialAttemptsToPeer,
//nolint:exhaustivestruct,exhaustruct
Peers: &peers.Peers{},
//nolint:exhaustivestruct,exhaustruct
Rules: &rules.Rules{},
}
}

// Init initializes the proxy server.
func (p *Proxy) Init(ctx context.Context, logger vlog.ILog) error {
p.Logger = logger
Expand All @@ -54,7 +72,7 @@ func (p *Proxy) Init(ctx context.Context, logger vlog.ILog) error {
return fmt.Errorf("%w", err)
}
} else {
return types.ErrConfigPeersIsNil
return ErrConfigPeersIsNil
}

if p.Rules != nil {
Expand All @@ -64,12 +82,16 @@ func (p *Proxy) Init(ctx context.Context, logger vlog.ILog) error {
}
}

if resultCode := p.UpdatePort(); resultCode != types.ResultOK {
return fmt.Errorf("%w: %s", ErrCantGetProxyPort, resultCode.ToStr())
}

return nil
}

// ListenAndServe starts the proxy server.
func (p *Proxy) ListenAndServe(ctx context.Context, proxyPort string) error {
proxySrv, err := net.Listen("tcp", proxyPort)
func (p *Proxy) ListenAndServe(ctx context.Context) error {
proxySrv, err := net.Listen("tcp", p.Port)
if err != nil {
return fmt.Errorf("%w", err)
}
Expand Down Expand Up @@ -122,7 +144,7 @@ func (p *Proxy) UpdatePort() types.ResultCode {
if p.Port == "" || p.Port == ":" {
proxyPort = os.Getenv("ProxyPort")
if proxyPort == ":" || proxyPort == "" {
proxyPort = fmt.Sprintf("%d", DefaultPort)
proxyPort = types.DefaultProxyPort
}
} else {
proxyPort = p.Port
Expand Down Expand Up @@ -177,7 +199,7 @@ func (p *Proxy) handleIncomingConnection(conn net.Conn, semaphore chan struct{})
if err != nil {
p.Logger.Add(vlog.Debug, types.ErrProxy, vlog.RemoteAddr(clientAddr), fmt.Errorf("failed in reverseData() %w", err))

responseLogger := response.New(p.Logger)
responseLogger := response.New()

err = responseLogger.SentResponseToClient(conn, err)
if err != nil {
Expand All @@ -195,7 +217,7 @@ func (p *Proxy) handleIncomingConnection(conn net.Conn, semaphore chan struct{})
// it returns an error if the maximum number of attempts is reached or if it fails to get the next peer.
func (p *Proxy) reverseData(client net.Conn, curentDialAttemptsToPeer uint, maxDialAttemptsToPeer uint) error {
if curentDialAttemptsToPeer >= maxDialAttemptsToPeer {
return types.ErrMaxCountAttempts
return ErrMaxCountAttempts
}

pPeer, resultCode := p.Peers.GetNextPeer()
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestGetProxyPort(t *testing.T) {
name: "empty env var, got DefaultPort",
envVar: "",
expected: types.ResultOK,
checkValue: fmt.Sprintf(":%d", proxy.DefaultPort),
checkValue: fmt.Sprintf(":%s", types.DefaultProxyPort),
},
{
name: "valid proxy port from env var",
Expand Down
Loading

0 comments on commit cdf629b

Please sign in to comment.