Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor improvements #149

Merged
merged 1 commit into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions flow/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package flow
import (
"context"
"fmt"
"path/filepath"
"sync"

"github.com/silas/dag"
Expand Down Expand Up @@ -150,17 +149,17 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
}

func (w *microWorkflow) Abort(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
}

func (w *microWorkflow) Suspend(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
}

func (w *microWorkflow) Resume(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
}

Expand All @@ -181,8 +180,8 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
return "", err
}

stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
stepStore := store.NewNamespaceStore(w.opts.Store, "steps"+w.opts.Store.Options().Separator+eid)
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+eid)

options := NewExecuteOptions(opts...)

Expand Down Expand Up @@ -219,7 +218,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
for idx := range steps {
for nidx := range steps[idx] {
cstep := steps[idx][nidx]
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
return eid, werr
}
}
Expand All @@ -246,65 +245,65 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
wg.Add(1)
go func(step Step) {
defer wg.Done()
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := step.Execute(nctx, req, nopts...)
if serr != nil {
step.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
}(cstep)
wg.Wait()
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := cstep.Execute(nctx, req, nopts...)
if serr != nil {
cstep.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr
return
}
Expand Down
8 changes: 4 additions & 4 deletions profiler/pprof/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func (p *profiler) Start() error {
// create exit channel
p.exit = make(chan bool)

cpuFile := filepath.Join("/tmp", "cpu.pprof")
memFile := filepath.Join("/tmp", "mem.pprof")
cpuFile := filepath.Join(string(os.PathSeparator)+"tmp", "cpu.pprof")
memFile := filepath.Join(string(os.PathSeparator)+"tmp", "mem.pprof")

if len(p.opts.Name) > 0 {
cpuFile = filepath.Join("/tmp", p.opts.Name+".cpu.pprof")
memFile = filepath.Join("/tmp", p.opts.Name+".mem.pprof")
cpuFile = filepath.Join(string(os.PathSeparator)+"tmp", p.opts.Name+".cpu.pprof")
memFile = filepath.Join(string(os.PathSeparator)+"tmp", p.opts.Name+".mem.pprof")
}

f1, err := os.Create(cpuFile)
Expand Down
2 changes: 1 addition & 1 deletion register/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
continue
}

metadata := make(map[string]string)
metadata := make(map[string]string, len(n.Metadata))

// make copy of metadata
for k, v := range n.Metadata {
Expand Down
4 changes: 1 addition & 3 deletions store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package store

import (
"context"
"path/filepath"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -33,12 +32,11 @@ type memoryStore struct {
}

func (m *memoryStore) key(prefix, key string) string {
return filepath.Join(prefix, key)
return prefix + m.opts.Separator + key
}

func (m *memoryStore) exists(prefix, key string) error {
key = m.key(prefix, key)

_, found := m.store.Get(key)
if !found {
return ErrNotFound
Expand Down
20 changes: 15 additions & 5 deletions store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Options struct {
Name string
// Namespace of the records
Namespace string
// Separator used as key parts separator
Separator string
// Addrs contains store address
Addrs []string
// Wrappers store wrapper that called before actual functions
Expand All @@ -41,11 +43,12 @@ type Options struct {
// NewOptions creates options struct
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Context: context.Background(),
Codec: codec.DefaultCodec,
Tracer: tracer.DefaultTracer,
Meter: meter.DefaultMeter,
Logger: logger.DefaultLogger,
Context: context.Background(),
Codec: codec.DefaultCodec,
Tracer: tracer.DefaultTracer,
Meter: meter.DefaultMeter,
Separator: DefaultSeparator,
}
for _, o := range opts {
o(&options)
Expand Down Expand Up @@ -98,6 +101,13 @@ func Name(n string) Option {
}
}

// Separator the value used as key parts separator
func Separator(s string) Option {
return func(o *Options) {
o.Separator = s
}
}

// Namespace sets namespace of the store
func Namespace(ns string) Option {
return func(o *Options) {
Expand Down
2 changes: 2 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var (
ErrInvalidKey = errors.New("invalid key")
// DefaultStore is the global default store
DefaultStore = NewStore()
// DefaultSeparator is the gloabal default key parts separator
DefaultSeparator = "/"
)

// Store is a data storage interface
Expand Down
84 changes: 84 additions & 0 deletions util/text/text.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package text


func DetectEncoding(text string) map[string]int {
charsets := map[string]int{
"UTF-8": 0,
"CP1251": 0,
"KOI8-R": 0,
"IBM866": 0,
"ISO-8859-5": 0,
"MAC": 0,
}

if len(text) == 0 {
return charsets
}

utflower := 7
utfupper := 5
lowercase := 3
uppercase := 1
last_simb := 0

for a := 0; a < len(text); a++ {
char := int(text[a])

// non-russian characters
if char < 128 || char > 256 {
continue
}

// UTF-8
if (last_simb == 208) && ((char > 143 && char < 176) || char == 129) {
charsets["UTF-8"] += (utfupper * 2)
}
if ((last_simb == 208) && ((char > 175 && char < 192) || char == 145)) || (last_simb == 209 && char > 127 && char < 144) {
charsets["UTF-8"] += (utflower * 2)
}

// CP1251
if (char > 223 && char < 256) || char == 184 {
charsets["CP1251"] += lowercase
}
if (char > 191 && char < 224) || char == 168 {
charsets["CP1251"] += uppercase
}

// KOI8-R
if (char > 191 && char < 224) || char == 163 {
charsets["KOI8-R"] += lowercase
}
if (char > 222 && char < 256) || char == 179 {
charsets["KOI8-R"] += uppercase
}

// IBM866
if (char > 159 && char < 176) || (char > 223 && char < 241) {
charsets["IBM866"] += lowercase
}
if (char > 127 && char < 160) || char == 241 {
charsets["IBM866"] += uppercase
}

// ISO-8859-5
if (char > 207 && char < 240) || char == 161 {
charsets["ISO-8859-5"] += lowercase
}
if (char > 175 && char < 208) || char == 241 {
charsets["ISO-8859-5"] += uppercase
}

// MAC
if char > 221 && char < 255 {
charsets["MAC"] += lowercase
}
if char > 127 && char < 160 {
charsets["MAC"] += uppercase
}

last_simb = char
}

return charsets
}