Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Performance problems #419

Merged
merged 8 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"github.com/numary/ledger/pkg/api/middlewares"
"github.com/numary/ledger/pkg/api/routes"
"github.com/numary/ledger/pkg/bus"
"github.com/numary/ledger/pkg/contextlogger"
"github.com/numary/ledger/pkg/ledger"
"github.com/numary/ledger/pkg/redis"
"github.com/numary/ledger/pkg/storage/sqlstorage"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/uptrace/opentelemetry-go-extra/otellogrus"
"github.com/xdg-go/scram"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
Expand All @@ -43,12 +45,26 @@ func NewContainer(v *viper.Viper, userOptions ...fx.Option) *fx.App {
options = append(options, fx.NopLogger)
}

debug := viper.GetBool(debugFlag)

l := logrus.New()
if v.GetBool(debugFlag) {
if debug {
l.Level = logrus.DebugLevel
}
loggerFactory := logging.StaticLoggerFactory(logginglogrus.New(l))
logging.SetFactory(loggerFactory)
if viper.GetBool(otlptraces.OtelTracesFlag) {
l.AddHook(otellogrus.NewHook(otellogrus.WithLevels(
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
)))
}
logging.SetFactory(contextlogger.NewFactory(
logging.StaticLoggerFactory(logginglogrus.New(l)),
))
if debug {
sqlstorage.InstrumentalizeSQLDrivers()
}

topics := v.GetStringSlice(publisherTopicMappingFlag)
mapping := make(map[string]string)
Expand Down Expand Up @@ -169,7 +185,7 @@ func NewContainer(v *viper.Viper, userOptions ...fx.Option) *fx.App {

// Handle resolver
options = append(options, ledger.ResolveModule(
v.GetInt64(numscriptCacheCapacity)))
v.GetInt64(cacheCapacityBytes), v.GetInt64(cacheMaxNumKeys)))

// Api middlewares
options = append(options, routes.ProvidePerLedgerMiddleware(func(tp trace.TracerProvider) []gin.HandlerFunc {
Expand Down
3 changes: 2 additions & 1 deletion cmd/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ func TestContainers(t *testing.T) {
// Default options
v.Set(storageDriverFlag, sqlstorage.SQLite.String())
v.Set(storageDirFlag, "/tmp")
v.Set(numscriptCacheCapacity, 100)
v.Set(cacheCapacityBytes, 100000000)
v.Set(cacheMaxNumKeys, 100)
//v.Set(storageSQLiteDBNameFlag, uuid.New())
tc.init(v)
app := NewContainer(v, options...)
Expand Down
7 changes: 5 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const (

commitPolicyFlag = "commit-policy"

numscriptCacheCapacity = "numscript-cache-capacity"
cacheCapacityBytes = "cache-capacity-bytes"
cacheMaxNumKeys = "cache-max-num-keys"
)

var (
Expand Down Expand Up @@ -140,7 +141,9 @@ func NewRootCommand() *cobra.Command {
root.PersistentFlags().Bool(authBearerUseScopesFlag, false, "Use scopes as defined by rfc https://datatracker.ietf.org/doc/html/rfc8693")
root.PersistentFlags().String(commitPolicyFlag, "", "Transaction commit policy (default or allow-past-timestamps)")

root.PersistentFlags().Int(numscriptCacheCapacity, 100, "Capacity of the cache storing Numscript in RAM")
// 100 000 000 bytes is 100 MB
root.PersistentFlags().Int(cacheCapacityBytes, 100000000, "Capacity in bytes of the cache storing Numscript in RAM")
root.PersistentFlags().Int(cacheMaxNumKeys, 100, "Maximum number of Numscript to be stored in the cache in RAM")

otlptraces.InitOTLPTracesFlags(root.PersistentFlags())
internal.InitHTTPBasicFlags(root)
Expand Down
19 changes: 0 additions & 19 deletions cmd/server_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,16 @@ import (
"net/http"

"github.com/formancehq/go-libs/logging"
"github.com/formancehq/go-libs/logging/logginglogrus"
"github.com/formancehq/go-libs/otlp/otlptraces"
"github.com/numary/ledger/pkg/api"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/uptrace/opentelemetry-go-extra/otellogrus"
"go.uber.org/fx"
)

func NewServerStart() *cobra.Command {
return &cobra.Command{
Use: "start",
RunE: func(cmd *cobra.Command, args []string) error {
l := logrus.New()
if viper.GetBool(debugFlag) {
l.Level = logrus.DebugLevel
}
if viper.GetBool(otlptraces.OtelTracesFlag) {
l.AddHook(otellogrus.NewHook(otellogrus.WithLevels(
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
)))
}
loggerFactory := logging.StaticLoggerFactory(logginglogrus.New(l))
logging.SetFactory(loggerFactory)

app := NewContainer(
viper.GetViper(),
fx.Invoke(func(lc fx.Lifecycle, h *api.API) {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/numary/ledger
go 1.18

require (
github.com/DmitriyVTitov/size v1.5.0
github.com/Masterminds/semver/v3 v3.2.0
github.com/Shopify/sarama v1.37.2
github.com/ThreeDotsLabs/watermill v1.1.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg6
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DmitriyVTitov/size v1.5.0 h1:/PzqxYrOyOUX1BXj6J9OuVRVGe+66VL4D9FlUaW515g=
github.com/DmitriyVTitov/size v1.5.0/go.mod h1:le6rNI4CoLQV1b9gzp1+3d7hMAD/uu2QcJ+aYbNgiU0=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g=
github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
Expand Down Expand Up @@ -216,6 +218,8 @@ github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0L
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
Expand Down
62 changes: 0 additions & 62 deletions pkg/api/controllers/context_test.go

This file was deleted.

3 changes: 2 additions & 1 deletion pkg/api/internal/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ func RunTest(t *testing.T, options ...fx.Option) {

options = append([]fx.Option{
api.Module(api.Config{StorageDriver: "sqlite", Version: "latest", UseScopes: true}),
ledger.ResolveModule(100),
// 100 000 000 bytes is 100 MB
ledger.ResolveModule(100000000, 100),
ledgertesting.ProvideLedgerStorageDriver(),
fx.Invoke(func(driver storage.Driver[ledger.Store], lc fx.Lifecycle) {
lc.Append(fx.Hook{
Expand Down
30 changes: 7 additions & 23 deletions pkg/api/middlewares/ledger_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package middlewares

import (
"context"
"fmt"
"net/http"

"github.com/formancehq/go-libs/logging"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/numary/ledger/pkg"
"github.com/numary/ledger/pkg/api/apierrors"
"github.com/numary/ledger/pkg/contextlogger"
"github.com/numary/ledger/pkg/ledger"
Expand All @@ -28,36 +25,23 @@ func (m *LedgerMiddleware) LedgerMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
name := c.Param("ledger")
if name == "" {
c.AbortWithStatus(http.StatusNotFound)
return
}

ctx, span := opentelemetry.Start(c.Request.Context(), "Ledger access")
span := opentelemetry.WrapGinContext(c, "Ledger access")
defer span.End()

contextKeyID := uuid.NewString()
id := span.SpanContext().SpanID()
if id == [8]byte{} {
logging.GetLogger(ctx).Debugf(
"ledger middleware SpanID is empty, new id generated %s", contextKeyID)
} else {
contextKeyID = fmt.Sprint(id)
}
ctx = context.WithValue(ctx, pkg.KeyContextID, contextKeyID)
c.Header(string(pkg.KeyContextID), contextKeyID)

loggerFactory := logging.StaticLoggerFactory(
contextlogger.New(ctx, logging.GetLogger(ctx)))
logging.SetFactory(loggerFactory)
contextlogger.WrapGinRequest(c)

l, err := m.resolver.GetLedger(ctx, name)
l, err := m.resolver.GetLedger(c.Request.Context(), name)
if err != nil {
apierrors.ResponseError(c, err)
return
}
defer l.Close(ctx)
c.Set("ledger", l)
defer l.Close(context.Background())

c.Request = c.Request.WithContext(ctx)
c.Set("ledger", l)
c.Next()
}
}
44 changes: 25 additions & 19 deletions pkg/api/middlewares/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,39 @@ func newBufferedWriter(rw gin.ResponseWriter) *bufferedResponseWriter {
func Transaction(locker Locker) func(c *gin.Context) {
return func(c *gin.Context) {

ctx, span := opentelemetry.Start(c.Request.Context(), "Ledger locking")
ctx, span := opentelemetry.Start(c.Request.Context(), "Wait ledger lock")
defer span.End()

c.Request = c.Request.WithContext(ctx)

unlock, err := locker.Lock(c.Request.Context(), c.Param("ledger"))
if err != nil {
panic(err)
}
defer unlock(context.Background()) // Use a background context instead of the request one as it could have been cancelled

bufferedWriter := newBufferedWriter(c.Writer)
c.Request = c.Request.WithContext(storage.TransactionalContext(c.Request.Context()))
c.Writer = bufferedWriter
defer func() {
_ = storage.RollbackTransaction(c.Request.Context())
}()

c.Next()

if c.Writer.Status() >= 200 && c.Writer.Status() < 300 &&
storage.IsTransactionRegistered(c.Request.Context()) {
if err := storage.CommitTransaction(c.Request.Context()); err != nil {
apierrors.ResponseError(c, err)
return
func() {
unlock, err := locker.Lock(c.Request.Context(), c.Param("ledger"))
if err != nil {
panic(err)
}
}
defer unlock(context.Background()) // Use a background context instead of the request one as it could have been cancelled

ctx, span = opentelemetry.Start(c.Request.Context(), "Ledger locked")
defer span.End()
c.Request = c.Request.WithContext(ctx)
c.Request = c.Request.WithContext(storage.TransactionalContext(c.Request.Context()))
defer func() {
_ = storage.RollbackTransaction(c.Request.Context())
}()

c.Next()

if c.Writer.Status() >= 200 && c.Writer.Status() < 300 &&
storage.IsTransactionRegistered(c.Request.Context()) {
if err := storage.CommitTransaction(c.Request.Context()); err != nil {
apierrors.ResponseError(c, err)
return
}
}
}()

if err := bufferedWriter.WriteResponse(); err != nil {
_ = c.Error(err)
Expand Down
5 changes: 0 additions & 5 deletions pkg/context.go

This file was deleted.

Loading