Skip to content

Commit

Permalink
feat: Add list of ledgers and accounts/transactions total to telemetr…
Browse files Browse the repository at this point in the history
…y events.
  • Loading branch information
gfyrag committed Aug 23, 2022
1 parent f57e9d6 commit 2724c92
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 45 deletions.
62 changes: 31 additions & 31 deletions cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,37 +71,6 @@ func NewContainer(v *viper.Viper, userOptions ...fx.Option) *fx.App {
mapping[parts[0]] = parts[1]
}

if v.GetBool(segmentEnabledFlag) {
applicationId := viper.GetString(segmentApplicationId)
var appIdProviderModule fx.Option
if applicationId == "" {
appIdProviderModule = fx.Provide(analytics.FromStorageAppIdProvider)
} else {
appIdProviderModule = fx.Provide(func() analytics.AppIdProvider {
return analytics.AppIdProviderFn(func(ctx context.Context) (string, error) {
return applicationId, nil
})
})
}
writeKey := viper.GetString(segmentWriteKey)
interval := viper.GetDuration(segmentHeartbeatInterval)
if writeKey == "" {
sharedlogging.GetLogger(context.Background()).Infof("Segment enabled but no write key provided")
} else if interval == 0 {
sharedlogging.GetLogger(context.Background()).Error("Segment heartbeat interval is 0")
} else {
_, err := semver.NewVersion(Version)
if err != nil {
sharedlogging.GetLogger(context.Background()).Infof("Segment enabled but version '%s' is not semver, skip", Version)
} else {
options = append(options,
appIdProviderModule,
analytics.NewHeartbeatModule(Version, writeKey, interval),
)
}
}
}

options = append(options, sharedpublish.Module(), bus.LedgerMonitorModule())
options = append(options, sharedpublish.TopicMapperPublisherModule(mapping))

Expand Down Expand Up @@ -239,6 +208,37 @@ func NewContainer(v *viper.Viper, userOptions ...fx.Option) *fx.App {
}(),
}))

if v.GetBool(segmentEnabledFlag) {
applicationId := viper.GetString(segmentApplicationId)
var appIdProviderModule fx.Option
if applicationId == "" {
appIdProviderModule = fx.Provide(analytics.FromStorageAppIdProvider)
} else {
appIdProviderModule = fx.Provide(func() analytics.AppIdProvider {
return analytics.AppIdProviderFn(func(ctx context.Context) (string, error) {
return applicationId, nil
})
})
}
writeKey := viper.GetString(segmentWriteKey)
interval := viper.GetDuration(segmentHeartbeatInterval)
if writeKey == "" {
sharedlogging.GetLogger(context.Background()).Infof("Segment enabled but no write key provided")
} else if interval == 0 {
sharedlogging.GetLogger(context.Background()).Error("Segment heartbeat interval is 0")
} else {
_, err := semver.NewVersion(Version)
if err != nil {
sharedlogging.GetLogger(context.Background()).Infof("Segment enabled but version '%s' is not semver, skip", Version)
} else {
options = append(options,
appIdProviderModule,
analytics.NewHeartbeatModule(Version, writeKey, interval),
)
}
}
}

options = append(options, fx.Provide(
fx.Annotate(func() []ledger.LedgerOption {
ledgerOptions := []ledger.LedgerOption{}
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func NewRootCommand() *cobra.Command {
root.PersistentFlags().Bool(segmentEnabledFlag, true, "Is segment enabled")
root.PersistentFlags().String(segmentApplicationId, "", "Segment application id")
root.PersistentFlags().String(segmentWriteKey, DefaultSegmentWriteKey, "Segment write key")
root.PersistentFlags().Duration(segmentHeartbeatInterval, 24*time.Hour, "Segment heartbeat interval")
root.PersistentFlags().Duration(segmentHeartbeatInterval, 4*time.Hour, "Segment heartbeat interval")
root.PersistentFlags().String(commitPolicyFlag, "", "Transaction commit policy (default or allow-past-timestamps)")

internal.InitHTTPBasicFlags(root)
Expand Down
74 changes: 62 additions & 12 deletions pkg/analytics/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package analytics

import (
"context"
"crypto/sha256"
"encoding/base64"
"time"

"github.com/numary/go-libs/sharedlogging"
Expand All @@ -12,9 +14,12 @@ import (
)

const (
ApplicationStartedEvent = "Application started"
ApplicationStats = "Application stats"

VersionProperty = "version"
VersionProperty = "version"
AccountsProperty = "accounts"
TransactionsProperty = "transactions"
LedgersProperty = "ledgers"
)

type AppIdProvider interface {
Expand All @@ -29,8 +34,9 @@ func (fn AppIdProviderFn) AppID(ctx context.Context) (string, error) {
func FromStorageAppIdProvider(driver storage.Driver) AppIdProvider {
var appId string
return AppIdProviderFn(func(ctx context.Context) (string, error) {
var err error
if appId == "" {
appId, err := driver.GetConfiguration(ctx, "appId")
appId, err = driver.GetConfiguration(ctx, "appId")
if err != nil && err != storage.ErrConfigurationNotFound {
return "", err
}
Expand All @@ -51,12 +57,13 @@ type heartbeat struct {
client analytics.Client
stopChan chan chan struct{}
appIdProvider AppIdProvider
driver storage.Driver
}

func (m *heartbeat) Run(ctx context.Context) error {

enqueue := func() {
err := m.enqueue()
err := m.enqueue(ctx)
if err != nil {
sharedlogging.GetLogger(ctx).WithFields(map[string]interface{}{
"error": err,
Expand Down Expand Up @@ -89,26 +96,69 @@ func (m *heartbeat) Stop(ctx context.Context) error {
}
}

func (m *heartbeat) enqueue() error {
func (m *heartbeat) enqueue(ctx context.Context) error {

appId, err := m.appIdProvider.AppID(context.Background())
appId, err := m.appIdProvider.AppID(ctx)
if err != nil {
return err
}

properties := analytics.NewProperties().
Set(VersionProperty, m.version)

ledgers, err := m.driver.List(ctx)
if err != nil {
return err
}

ledgersProperty := map[string]any{}

for _, ledger := range ledgers {
properties := map[string]any{}
if err := func() error {
store, _, err := m.driver.GetStore(ctx, ledger, false)
if err != nil {
return err
}
transactions, err := store.CountTransactions(ctx, storage.TransactionsQuery{})
if err != nil {
return err
}
accounts, err := store.CountAccounts(ctx, storage.AccountsQuery{})
if err != nil {
return err
}
properties[TransactionsProperty] = transactions
properties[AccountsProperty] = accounts

return nil
}(); err != nil {
return err
}

digest := sha256.New()
digest.Write([]byte(ledger))
ledgerHash := base64.RawURLEncoding.EncodeToString(digest.Sum(nil))

ledgersProperty[ledgerHash] = properties
}
if len(ledgersProperty) > 0 {
properties.Set(LedgersProperty, ledgersProperty)
}

return m.client.Enqueue(&analytics.Track{
AnonymousId: appId,
Event: ApplicationStartedEvent,
Properties: analytics.NewProperties().
Set(VersionProperty, m.version),
Event: ApplicationStats,
Properties: properties,
})
}

func newHeartbeat(appIdProvider AppIdProvider, client analytics.Client, version string, interval time.Duration) *heartbeat {
func newHeartbeat(appIdProvider AppIdProvider, driver storage.Driver, client analytics.Client, version string, interval time.Duration) *heartbeat {
return &heartbeat{
version: version,
interval: interval,
client: client,
driver: driver,
appIdProvider: appIdProvider,
stopChan: make(chan chan struct{}, 1),
}
Expand All @@ -120,8 +170,8 @@ func NewHeartbeatModule(version, writeKey string, interval time.Duration) fx.Opt
fx.Provide(func(cfg analytics.Config) (analytics.Client, error) {
return analytics.NewWithConfig(writeKey, cfg)
}),
fx.Provide(func(client analytics.Client, provider AppIdProvider) *heartbeat {
return newHeartbeat(provider, client, version, interval)
fx.Provide(func(client analytics.Client, provider AppIdProvider, driver storage.Driver) *heartbeat {
return newHeartbeat(provider, driver, client, version, interval)
}),
fx.Invoke(func(m *heartbeat, lc fx.Lifecycle) {
lc.Append(fx.Hook{
Expand Down
4 changes: 3 additions & 1 deletion pkg/analytics/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/numary/ledger/pkg/ledgertesting"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
"gopkg.in/segmentio/analytics-go.v3"
Expand Down Expand Up @@ -81,6 +82,7 @@ var (
return "foo", nil
})
}),
ledgertesting.ProvideStorageDriver(),
)
)

Expand Down Expand Up @@ -134,7 +136,7 @@ func TestSegment(t *testing.T) {
require.Len(t, batch.Batch, 1)

track := batch.Batch[0]
require.Equal(t, ApplicationStartedEvent, track.Event)
require.Equal(t, ApplicationStats, track.Event)
require.Equal(t, version, track.Properties[VersionProperty])
require.Equal(t, applicationId, track.AnonymousId)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/sqlstorage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func (d *Driver) GetConfiguration(ctx context.Context, key string) (string, erro
}
var value string
if err := row.Scan(&value); err != nil {
if err == sql.ErrNoRows {
return "", storage.ErrConfigurationNotFound
}
return "", err
}

Expand Down

0 comments on commit 2724c92

Please sign in to comment.