Skip to content

Commit

Permalink
Fix: docker-compose and client module
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Nov 8, 2023
1 parent e7f2baa commit 7f39f9f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 96 deletions.
10 changes: 10 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
INDEXER_BRIDGED_TOKENS_FILE=mainnet.json # full list of files you can find in repo ./build/bridged_tokens/
INDEXER_CLASS_INTERFACES_DIR=./interfaces/ # REQUIRED
HASURA_HOST=hasura
HASURA_POSTGRES_HOST=db
LOG_LEVEL=info
CACHE_ENABLED=false
POSTGRES_PORT=5432
POSTGRES_HOST=db
POSTGRES_DB=starknet
POSTGRES_PASSWORD=<TYPE_SOMETHING_STRONG> # REQUIRED
12 changes: 6 additions & 6 deletions build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ---------------------------------------------------------------------
# The first stage container, for building the application
# ---------------------------------------------------------------------
FROM golang:1.20-alpine as builder
FROM golang:1.21.2-alpine as builder

ENV CGO_ENABLED=0
ENV GO111MODULE=on
Expand All @@ -10,25 +10,25 @@ ENV GOOS=linux
RUN apk --no-cache add ca-certificates
RUN apk add --update git musl-dev gcc build-base

RUN mkdir -p $GOPATH/src/github.com/dipdup-io/straknet-indexer/
RUN mkdir -p $GOPATH/src/github.com/dipdup-io/starknet-indexer/

COPY ./go.* $GOPATH/src/github.com/dipdup-io/straknet-indexer/
WORKDIR $GOPATH/src/github.com/dipdup-io/straknet-indexer
COPY ./go.* $GOPATH/src/github.com/dipdup-io/starknet-indexer/
WORKDIR $GOPATH/src/github.com/dipdup-io/starknet-indexer
RUN go mod download

COPY cmd/indexer cmd/indexer
COPY internal internal
COPY pkg pkg

WORKDIR $GOPATH/src/github.com/dipdup-io/straknet-indexer/cmd/indexer/
WORKDIR $GOPATH/src/github.com/dipdup-io/starknet-indexer/cmd/indexer/
RUN go build -a -o /go/bin/indexer .

# ---------------------------------------------------------------------
# The second stage container, for running the application
# ---------------------------------------------------------------------
FROM scratch

WORKDIR /app/straknet-indexer/
WORKDIR /app/starknet-indexer/

COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/indexer /go/bin/indexer
Expand Down
14 changes: 7 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.6"

services:
indexer:
image: ghcr.io/dipdup-io/starknet-indexer:master
image: ghcr.io/dipdup-io/starknet-indexer:${TAG:-master}
build:
dockerfile: build/Dockerfile
context: .
Expand All @@ -16,7 +16,7 @@ services:
depends_on:
- db
- hasura
logging: &straknet-dipdup-logging
logging: &starknet-indexer-logging
options:
max-size: 10m
max-file: "5"
Expand All @@ -27,21 +27,21 @@ services:
image: postgres:15
restart: always
volumes:
- db:/var/lib/postgres/data
- db:/var/lib/postgresql/data
- /etc/postgresql/postgresql.conf:/etc/postgresql/postgresql.conf
ports:
- 127.0.0.1:5432:5432
- 127.0.0.1:${POSTGRES_PORT:-5432}:5432
environment:
- POSTGRES_HOST=${POSTGRES_HOST:-db}
- POSTGRES_USER=${POSTGRES_USER:-dipdup}
- POSTGRES_DB=${POSTGRES_DB:-starknet}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-changeme}
healthcheck:
test: ["CMD-SHELL", "pg_isready -U dipdup -d starknet"]
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-dipdup} -d ${POSTGRES_DB:-starknet}"]
interval: 10s
timeout: 5s
retries: 5
logging: *straknet-dipdup-logging
logging: *starknet-indexer-logging
command:
- "postgres"
- "-c"
Expand All @@ -59,7 +59,7 @@ services:
- HASURA_GRAPHQL_ENABLED_LOG_TYPES=startup, http-log, webhook-log, websocket-log, query-log
- HASURA_GRAPHQL_ADMIN_SECRET=${ADMIN_SECRET:-changeme}
- HASURA_GRAPHQL_UNAUTHORIZED_ROLE=user
logging: *straknet-dipdup-logging
logging: *starknet-indexer-logging

volumes:
db:
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/dipdup-io/starknet-indexer

go 1.20
go 1.21

require (
github.com/dipdup-io/starknet-go-api v0.0.0-20230912113406-c699cdbd6582
Expand Down
102 changes: 22 additions & 80 deletions pkg/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package grpc

import (
"context"
"sync"
"time"

"github.com/dipdup-io/starknet-indexer/pkg/grpc/pb"
"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
grpcSDK "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
generalPB "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/pb"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

// outputs names
const (
OutputMessages = "messages"
ModuleName = "layer1_grpc_client"
)

// Stream -
Expand All @@ -37,96 +35,43 @@ func NewStream(stream *grpcSDK.Stream[pb.Subscription], request *pb.SubscribeReq

// Client -
type Client struct {
grpc *grpcSDK.Client

output *modules.Output
modules.BaseModule
grpc *grpcSDK.Client
streams map[uint64]*Stream

service pb.IndexerServiceClient
reconnect chan uint64

wg *sync.WaitGroup
}

// NewClient -
func NewClient(cfg ClientConfig) *Client {
return &Client{
grpc: grpcSDK.NewClient(cfg.ServerAddress),
output: modules.NewOutput(OutputMessages),
streams: make(map[uint64]*Stream),
reconnect: make(chan uint64, 16),
wg: new(sync.WaitGroup),
client := &Client{
BaseModule: modules.New(ModuleName),
grpc: grpcSDK.NewClient(cfg.ServerAddress),
streams: make(map[uint64]*Stream),
reconnect: make(chan uint64, 16),
}
client.CreateOutput(OutputMessages)
return client
}

// NewClientWithServerAddress -
func NewClientWithServerAddress(address string) *Client {
return &Client{
grpc: grpcSDK.NewClient(address),
output: modules.NewOutput(OutputMessages),
streams: make(map[uint64]*Stream),
reconnect: make(chan uint64, 16),
wg: new(sync.WaitGroup),
}
}

// Name -
func (client *Client) Name() string {
return "layer1_grpc_client"
}

// Input -
func (client *Client) Input(name string) (*modules.Input, error) {
return nil, errors.Wrap(modules.ErrUnknownInput, name)
}

// MustInput -
func (client *Client) MustInput(name string) *modules.Input {
input, err := client.Input(name)
if err != nil {
panic(err)
}
return input
}

// Output -
func (client *Client) Output(name string) (*modules.Output, error) {
if name != OutputMessages {
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
client := &Client{
BaseModule: modules.New(ModuleName),
grpc: grpcSDK.NewClient(address),
streams: make(map[uint64]*Stream),
reconnect: make(chan uint64, 16),
}
return client.output, nil
}

// MustOutput -
func (client *Client) MustOutput(name string) *modules.Output {
output, err := client.Output(name)
if err != nil {
panic(err)
}
return output
}

// AttachTo -
func (client *Client) AttachTo(outputModule modules.Module, outputName, inputName string) error {
output, err := outputModule.Output(outputName)
if err != nil {
return err
}
input, err := client.Input(inputName)
if err != nil {
return err
}
output.Attach(input)
return nil
client.CreateOutput(OutputMessages)
return client
}

// Start -
func (client *Client) Start(ctx context.Context) {
client.grpc.Start(ctx)
client.service = pb.NewIndexerServiceClient(client.grpc.Connection())

client.wg.Add(1)
go client.reconnectThread(ctx)
client.G.GoCtx(ctx, client.reconnectThread)
}

// Connect -
Expand All @@ -136,7 +81,7 @@ func (client *Client) Connect(ctx context.Context, opts ...grpcSDK.ConnectOption

// Close - closes client
func (client *Client) Close() error {
client.wg.Wait()
client.G.Wait()

for id, stream := range client.streams {
unsubscribeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -166,8 +111,6 @@ func (client *Client) Reconnect() <-chan uint64 {
}

func (client *Client) reconnectThread(ctx context.Context) {
defer client.wg.Done()

for {
select {
case <-ctx.Done():
Expand All @@ -193,8 +136,9 @@ func (client *Client) subscribe(ctx context.Context, req *pb.SubscribeRequest) (
}
grpcStream := grpc.NewStream[pb.Subscription](stream)

client.wg.Add(1)
go client.handleMessage(ctx, grpcStream)
client.G.GoCtx(ctx, func(ctx context.Context) {
client.handleMessage(ctx, grpcStream)
})

id, err := grpcStream.Subscribe(ctx)
return id, grpcStream, err
Expand All @@ -221,8 +165,6 @@ func (client *Client) sendToOutput(name string, data any) error {
}

func (client *Client) handleMessage(ctx context.Context, stream *grpcSDK.Stream[pb.Subscription]) {
defer client.wg.Done()

for {
select {
case <-stream.Context().Done():
Expand Down
4 changes: 2 additions & 2 deletions pkg/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const (

// Server -
type Server struct {
GRPC *grpcSDK.Server
modules.BaseModule
pb.UnimplementedIndexerServiceServer

db postgres.Storage
GRPC *grpcSDK.Server
db postgres.Storage

subscriptions *grpcSDK.Subscriptions[*subscriptions.Message, *pb.Subscription]
}
Expand Down

0 comments on commit 7f39f9f

Please sign in to comment.