Skip to content

Commit

Permalink
Upgrade: indexer-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Nov 8, 2023
1 parent 09511de commit e7f2baa
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 140 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ starknet_id:
loot_survivor:
cd cmd/rpc_tester && go run . -c ../../cmd/rpc_tester/loot_survivor.yml

blocks:
cd cmd/rpc_tester && go run . -c ../../cmd/rpc_tester/blocks.yml

build-proto:
protoc \
-I=${GOPATH}/src \
Expand Down
9 changes: 9 additions & 0 deletions cmd/rpc_tester/blocks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 0.0.1

log_level: ${LOG_LEVEL:-info}

grpc:
server_address: ${GRPC_BIND:-127.0.0.1:7779}
subscriptions:
blocks:
head: true
2 changes: 1 addition & 1 deletion cmd/rpc_tester/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// Printer -
type Printer struct {
*printer.Printer
printer.Printer

eventCounters map[string]*atomic.Uint64

Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ go 1.20

require (
github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582
github.com/dipdup-io/workerpool v0.0.3
github.com/dipdup-io/workerpool v0.0.4
github.com/dipdup-net/go-lib v0.3.3
github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65
github.com/dipdup-net/indexer-sdk v0.0.4
github.com/go-testfixtures/testfixtures/v3 v3.9.0
github.com/goccy/go-json v0.10.2
github.com/karlseguin/ccache/v2 v2.0.8
Expand All @@ -18,7 +18,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/uptrace/bun v1.1.14
go.uber.org/mock v0.2.0
google.golang.org/grpc v1.57.0
google.golang.org/grpc v1.57.1
google.golang.org/protobuf v1.31.0
)

Expand All @@ -36,7 +36,7 @@ require (
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.5+incompatible // indirect
github.com/docker/docker v24.0.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand Down Expand Up @@ -98,7 +98,7 @@ require (
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw=
github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582 h1:Mo5MDCO9Bqdj62Jrn1at/f44KOOzWCPF+ohu5jqlb4o=
github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582/go.mod h1:Pbi1et2OC3ZWhzv/76nDg5C0/v4Mrj7YWkZPXcZFys0=
github.com/dipdup-io/workerpool v0.0.3 h1:+cnO0/J0e4UiJ0EBEDpvuhriSDVHlsPminGRU2Il+ZI=
github.com/dipdup-io/workerpool v0.0.3/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s=
github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
github.com/dipdup-net/go-lib v0.3.3 h1:vTUI+sT4L+x+eiMf712Cg8EtlqUCMiN6M3vcNaPlCw8=
github.com/dipdup-net/go-lib v0.3.3/go.mod h1:oBDOSsM/F8fEnmuDnaJ6QA/cHH4lne49ASbsh8WXDe4=
github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65 h1:jZ53iH1UhoBTtIj4pRUPz826EFIwr7KYUR7EkwokwPM=
github.com/dipdup-net/indexer-sdk v0.0.0-20230819120445-392cbc4cfb65/go.mod h1:sZEbnuguFw8kxgD3iNLLHDq1DUDrXI6KcghNMcmNCS4=
github.com/dipdup-net/indexer-sdk v0.0.4 h1:mhTW3f4U6oc05UjxSiffOV+HIi4vQkDgOq1MbJXia8U=
github.com/dipdup-net/indexer-sdk v0.0.4/go.mod h1:n1oBIm5MPY1WxLS9tQfTWr+Ytrwv6ThCZF7TASsJslg=
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY=
github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM=
github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
Expand Down Expand Up @@ -295,8 +295,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea h1:vLCWI/yYrdEHyN2JzIzPO3aaQJHQdp89IZBA/+azVC4=
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down Expand Up @@ -396,8 +396,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go.
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/grpc v1.57.1 h1:upNTNqv0ES+2ZOOqACwVtS3Il8M12/+Hz41RCPzAjQg=
google.golang.org/grpc v1.57.1/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
26 changes: 24 additions & 2 deletions pkg/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ func (client *Client) Input(name string) (*modules.Input, error) {
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}

// MustInput -
func (client *Client) MustInput(name string) *modules.Input {
input, err := client.Input(name)
if err != nil {
panic(err)
}
return input
}

// Output -
func (client *Client) Output(name string) (*modules.Output, error) {
if name != OutputMessages {
Expand All @@ -88,9 +97,22 @@ func (client *Client) Output(name string) (*modules.Output, error) {
return client.output, nil
}

// AttachTo -
func (client *Client) AttachTo(name string, input *modules.Input) error {
// MustOutput -
func (client *Client) MustOutput(name string) *modules.Output {
output, err := client.Output(name)
if err != nil {
panic(err)
}
return output
}

// AttachTo -
func (client *Client) AttachTo(outputModule modules.Module, outputName, inputName string) error {
output, err := outputModule.Output(outputName)
if err != nil {
return err
}
input, err := client.Input(inputName)
if err != nil {
return err
}
Expand Down
68 changes: 17 additions & 51 deletions pkg/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package grpc

import (
"context"
"sync"
"time"

"github.com/dipdup-io/starknet-indexer/internal/storage"
Expand All @@ -12,8 +11,6 @@ import (
"github.com/dipdup-io/starknet-indexer/pkg/indexer"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
grpcSDK "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

Expand All @@ -24,16 +21,13 @@ const (

// Server -
type Server struct {
*grpcSDK.Server
GRPC *grpcSDK.Server
modules.BaseModule
pb.UnimplementedIndexerServiceServer

db postgres.Storage

input *modules.Input
subscriptions *grpcSDK.Subscriptions[*subscriptions.Message, *pb.Subscription]
log zerolog.Logger

wg *sync.WaitGroup
}

// NewServer -
Expand All @@ -46,47 +40,46 @@ func NewServer(
return nil, err
}

return &Server{
Server: server,
s := &Server{
GRPC: server,
db: db,
input: modules.NewInput(InputBlocks),
BaseModule: modules.New("layer1_grpc_server"),
subscriptions: grpcSDK.NewSubscriptions[*subscriptions.Message, *pb.Subscription](),
log: log.With().Str("module", "grpc_server").Logger(),
}
s.CreateInput(InputBlocks)

wg: new(sync.WaitGroup),
}, nil
return s, nil
}

// Start -
func (module *Server) Start(ctx context.Context) {
pb.RegisterIndexerServiceServer(module.Server.Server(), module)
pb.RegisterIndexerServiceServer(module.GRPC.Server(), module)

module.Server.Start(ctx)
module.GRPC.Start(ctx)

module.wg.Add(1)
go module.listen(ctx)
module.G.GoCtx(ctx, module.listen)
}

func (module *Server) listen(ctx context.Context) {
defer module.wg.Done()

ticker := time.NewTicker(time.Second * 15)
defer ticker.Stop()

input := module.MustInput(InputBlocks)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:

case msg, ok := <-module.input.Listen():
case msg, ok := <-input.Listen():
if !ok {
return
}
if message, ok := msg.(*indexer.IndexerMessage); ok {
module.blockHandler(ctx, message)
} else {
module.log.Warn().Msgf("unknown message type: %T", msg)
module.Log.Warn().Msgf("unknown message type: %T", msg)
}
}
}
Expand Down Expand Up @@ -280,34 +273,7 @@ func (module *Server) notifyAboutAddress(address *storage.Address) {

// Close -
func (module *Server) Close() error {
module.wg.Wait()

if err := module.input.Close(); err != nil {
return err
}

return module.Server.Close()
}

// Input -
func (module *Server) Input(name string) (*modules.Input, error) {
if name != InputBlocks {
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}
return module.input, nil
}

// Output -
func (module *Server) Output(name string) (*modules.Output, error) {
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
}

// AttachTo -
func (module *Server) AttachTo(name string, input *modules.Input) error {
return errors.Wrap(modules.ErrUnknownOutput, name)
}
module.G.Wait()

// Name -
func (module *Server) Name() string {
return "layer1_grpc_server"
return module.GRPC.Close()
}
Loading

0 comments on commit e7f2baa

Please sign in to comment.