Skip to content

Commit

Permalink
bulker: lazily connect to destinations dwh
Browse files Browse the repository at this point in the history
bulker: get rid of metrics relay
  • Loading branch information
absorbb committed Oct 17, 2023
1 parent a311906 commit 2bd1aff
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 194 deletions.
8 changes: 0 additions & 8 deletions bulkerapp/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type Context struct {
fastStore *FastStore
server *http.Server
metricsServer *MetricsServer
metricsRelay *MetricsRelay
}

func (a *Context) InitContext(settings *appbase.AppSettings) error {
Expand Down Expand Up @@ -107,12 +106,6 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
}
metricsServer := NewMetricsServer(a.config)
a.metricsServer = metricsServer

metricsRelay, err := NewMetricsRelay(a.config)
if err != nil {
logging.Errorf("Error initializing metrics relay: %v", err)
}
a.metricsRelay = metricsRelay
return nil
}

Expand All @@ -130,7 +123,6 @@ func (a *Context) Shutdown() error {
}
logging.Infof("Shutting down http server...")
_ = a.metricsServer.Stop()
_ = a.metricsRelay.Stop()
_ = a.server.Shutdown(context.Background())
_ = a.eventsLogService.Close()
_ = a.fastStore.Close()
Expand Down
32 changes: 20 additions & 12 deletions bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,7 @@ func NewBatchConsumer(repository *Repository, destinationId string, batchPeriodS
}

func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum, batchSize, retryBatchSize int) (counters BatchCounters, nextBatch bool, err error) {
bulkerStream, err := destination.bulker.CreateStream(bc.topicId, bc.tableName, bulker.Batch, destination.streamOptions.Options...)
if err != nil {
bc.errorMetric("failed to create bulker stream")
err = bc.NewError("Failed to create bulker stream: %v", err)
return
}
var bulkerStream bulker.BulkerStream
ctx := context.WithValue(context.Background(), bulker.BatchNumberCtxKey, batchNum)

//position of last message in batch in case of failed. Needed for processFailed
Expand Down Expand Up @@ -113,17 +108,30 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum
dec.UseNumber()
err = dec.Decode(&obj)
if err == nil {
bc.Debugf("%d. Consumed Message ID: %s Offset: %s (Retries: %s) for: %s", i, obj.Id(), message.TopicPartition.Offset.String(), GetKafkaHeader(message, retriesCountHeader), destination.config.BulkerType)
_, processedObjectSample, err = bulkerStream.Consume(ctx, obj)
if err != nil {
bc.errorMetric("bulker_stream_error")
if bulkerStream == nil {
destination.InitBulkerInstance()
bulkerStream, err = destination.bulker.CreateStream(bc.topicId, bc.tableName, bulker.Batch, destination.streamOptions.Options...)
if err != nil {
bc.errorMetric("failed to create bulker stream")
err = bc.NewError("Failed to create bulker stream: %v", err)
}
}
if err == nil {
bc.Debugf("%d. Consumed Message ID: %s Offset: %s (Retries: %s) for: %s", i, obj.Id(), message.TopicPartition.Offset.String(), GetKafkaHeader(message, retriesCountHeader), destination.config.BulkerType)
_, processedObjectSample, err = bulkerStream.Consume(ctx, obj)
if err != nil {
bc.errorMetric("bulker_stream_error")
}
}
} else {
bc.errorMetric("parse_event_error")
}
if err != nil {
failedPosition = &latestMessage.TopicPartition
state, _ := bulkerStream.Abort(ctx)
state := bulker.State{}
if bulkerStream != nil {
state, _ = bulkerStream.Abort(ctx)
}
bc.postEventsLog(state, processedObjectSample, err)
return counters, false, bc.NewError("Failed to process event to bulker stream: %v", err)
} else {
Expand Down Expand Up @@ -155,7 +163,7 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum
err = bc.NewError("Failed to commit kafka consumer: %v", err)
return
}
} else {
} else if bulkerStream != nil {
_, _ = bulkerStream.Abort(ctx)
}
return
Expand Down
155 changes: 0 additions & 155 deletions bulkerapp/app/metrics_relay.go

This file was deleted.

2 changes: 0 additions & 2 deletions bulkerapp/app/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (r *Repository) init() error {
} else if !newDst.equals(oldDestination) {
r.Infof("Destination %s (%s) was updated. New Ver: %s", id, newDst.config.BulkerType, newDst.config.UpdatedAt)
toRetire = append(toRetire, oldDestination)
newDst.InitBulkerInstance()
repositoryChange.ChangedDestinations = append(repositoryChange.ChangedDestinations, newDst)
} else {
//copy unchanged initialized destinations from old repository
Expand All @@ -79,7 +78,6 @@ func (r *Repository) init() error {
_, ok := oldDestinations[id]
if !ok {
r.Infof("Destination %s (%s) was added. Ver: %s", id, dst.config.BulkerType, dst.config.UpdatedAt)
dst.InitBulkerInstance()
repositoryChange.AddedDestinations = append(repositoryChange.AddedDestinations, dst)
}
}
Expand Down
1 change: 1 addition & 0 deletions bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
if len(pkeys) > 0 {
streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithMergeRows())
}
destination.InitBulkerInstance()
bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...)
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "create stream error", true, err, "")
Expand Down
62 changes: 45 additions & 17 deletions bulkerapp/app/stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"bytes"
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/jitsucom/bulker/bulkerapp/metrics"
bulker "github.com/jitsucom/bulker/bulkerlib"
Expand Down Expand Up @@ -84,17 +85,50 @@ func NewStreamConsumer(repository *Repository, destination *Destination, topicId
eventsLogService: eventsLogService,
closed: make(chan struct{}),
}
bulkerStream, err := sc.destination.bulker.CreateStream(sc.topicId, sc.tableName, bulker.Stream, sc.destination.streamOptions.Options...)
if err != nil {
metrics.ConsumerErrors(sc.topicId, "stream", destination.Id(), tableName, "failed to create bulker stream").Inc()
return nil, base.NewError("Failed to create bulker stream: %v", err)
}
sc.stream.Store(&bulkerStream)
var bs bulker.BulkerStream
bs = &StreamWrapper{destination: destination, topicId: topicId, tableName: tableName}
sc.stream.Store(&bs)
sc.start()
sc.destination.Lease()
return sc, nil
}

type StreamWrapper struct {
destination *Destination
stream bulker.BulkerStream
topicId string
tableName string
}

func (sw *StreamWrapper) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error) {
if sw.stream == nil {
sw.destination.Lease()
sw.destination.InitBulkerInstance()
bulkerStream, err := sw.destination.bulker.CreateStream(sw.topicId, sw.tableName, bulker.Stream, sw.destination.streamOptions.Options...)
if err != nil {
metrics.ConsumerErrors(sw.topicId, "stream", sw.destination.Id(), sw.tableName, "failed to create bulker stream").Inc()
return bulker.State{}, nil, fmt.Errorf("Failed to create bulker stream: %v", err)
}
sw.stream = bulkerStream
}
return sw.stream.Consume(ctx, object)
}

func (sw *StreamWrapper) Abort(ctx context.Context) (bulker.State, error) {
if sw.stream == nil {
return bulker.State{}, nil
}
sw.destination.Release()
return sw.stream.Abort(ctx)
}

func (sw *StreamWrapper) Complete(ctx context.Context) (bulker.State, error) {
if sw.stream == nil {
return bulker.State{}, nil
}
sw.destination.Release()
return sw.stream.Complete(ctx)
}

func (sc *StreamConsumer) restartConsumer() {
sc.Infof("Restarting consumer")
go func(c *kafka.Consumer) {
Expand Down Expand Up @@ -232,19 +266,13 @@ func (sc *StreamConsumer) Close() error {
// UpdateDestination
func (sc *StreamConsumer) UpdateDestination(destination *Destination) error {
sc.Infof("[Updating stream consumer for topic. Ver: %s", sc.destination.config.UpdatedAt)
destination.Lease()

//create new stream
bulkerStream, err := destination.bulker.CreateStream(sc.topicId, sc.tableName, bulker.Stream, destination.streamOptions.Options...)
if err != nil {
return sc.NewError("Failed to create bulker stream: %v", err)
}
oldBulkerStream := sc.stream.Swap(&bulkerStream)
state, err := (*oldBulkerStream).Complete(context.Background())
var bs bulker.BulkerStream
bs = &StreamWrapper{destination: destination, topicId: sc.topicId, tableName: sc.tableName}
oldBulkerStream := sc.stream.Swap(&bs)
state, _ := (*oldBulkerStream).Complete(context.Background())
sc.Infof("Previous stream state: %+v", state)
oldDestination := sc.destination
oldDestination.Release()

sc.destination = destination
return nil
}
Expand Down

0 comments on commit 2bd1aff

Please sign in to comment.