diff --git a/bulkerapp/app/app.go b/bulkerapp/app/app.go index 3f6f3f6..a076669 100644 --- a/bulkerapp/app/app.go +++ b/bulkerapp/app/app.go @@ -27,7 +27,6 @@ type Context struct { fastStore *FastStore server *http.Server metricsServer *MetricsServer - metricsRelay *MetricsRelay } func (a *Context) InitContext(settings *appbase.AppSettings) error { @@ -107,12 +106,6 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error { } metricsServer := NewMetricsServer(a.config) a.metricsServer = metricsServer - - metricsRelay, err := NewMetricsRelay(a.config) - if err != nil { - logging.Errorf("Error initializing metrics relay: %v", err) - } - a.metricsRelay = metricsRelay return nil } @@ -130,7 +123,6 @@ func (a *Context) Shutdown() error { } logging.Infof("Shutting down http server...") _ = a.metricsServer.Stop() - _ = a.metricsRelay.Stop() _ = a.server.Shutdown(context.Background()) _ = a.eventsLogService.Close() _ = a.fastStore.Close() diff --git a/bulkerapp/app/batch_consumer.go b/bulkerapp/app/batch_consumer.go index 9e500c6..a445815 100644 --- a/bulkerapp/app/batch_consumer.go +++ b/bulkerapp/app/batch_consumer.go @@ -36,12 +36,7 @@ func NewBatchConsumer(repository *Repository, destinationId string, batchPeriodS } func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum, batchSize, retryBatchSize int) (counters BatchCounters, nextBatch bool, err error) { - bulkerStream, err := destination.bulker.CreateStream(bc.topicId, bc.tableName, bulker.Batch, destination.streamOptions.Options...) - if err != nil { - bc.errorMetric("failed to create bulker stream") - err = bc.NewError("Failed to create bulker stream: %v", err) - return - } + var bulkerStream bulker.BulkerStream ctx := context.WithValue(context.Background(), bulker.BatchNumberCtxKey, batchNum) //position of last message in batch in case of failed. Needed for processFailed @@ -113,17 +108,30 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum dec.UseNumber() err = dec.Decode(&obj) if err == nil { - bc.Debugf("%d. Consumed Message ID: %s Offset: %s (Retries: %s) for: %s", i, obj.Id(), message.TopicPartition.Offset.String(), GetKafkaHeader(message, retriesCountHeader), destination.config.BulkerType) - _, processedObjectSample, err = bulkerStream.Consume(ctx, obj) - if err != nil { - bc.errorMetric("bulker_stream_error") + if bulkerStream == nil { + destination.InitBulkerInstance() + bulkerStream, err = destination.bulker.CreateStream(bc.topicId, bc.tableName, bulker.Batch, destination.streamOptions.Options...) + if err != nil { + bc.errorMetric("failed to create bulker stream") + err = bc.NewError("Failed to create bulker stream: %v", err) + } + } + if err == nil { + bc.Debugf("%d. Consumed Message ID: %s Offset: %s (Retries: %s) for: %s", i, obj.Id(), message.TopicPartition.Offset.String(), GetKafkaHeader(message, retriesCountHeader), destination.config.BulkerType) + _, processedObjectSample, err = bulkerStream.Consume(ctx, obj) + if err != nil { + bc.errorMetric("bulker_stream_error") + } } } else { bc.errorMetric("parse_event_error") } if err != nil { failedPosition = &latestMessage.TopicPartition - state, _ := bulkerStream.Abort(ctx) + state := bulker.State{} + if bulkerStream != nil { + state, _ = bulkerStream.Abort(ctx) + } bc.postEventsLog(state, processedObjectSample, err) return counters, false, bc.NewError("Failed to process event to bulker stream: %v", err) } else { @@ -155,7 +163,7 @@ func (bc *BatchConsumerImpl) processBatchImpl(destination *Destination, batchNum err = bc.NewError("Failed to commit kafka consumer: %v", err) return } - } else { + } else if bulkerStream != nil { _, _ = bulkerStream.Abort(ctx) } return diff --git a/bulkerapp/app/metrics_relay.go b/bulkerapp/app/metrics_relay.go deleted file mode 100644 index bd7124b..0000000 --- a/bulkerapp/app/metrics_relay.go +++ /dev/null @@ -1,155 +0,0 @@ -package app - -import ( - "context" - "fmt" - "github.com/hjson/hjson-go/v4" - bulker "github.com/jitsucom/bulker/bulkerlib" - "github.com/jitsucom/bulker/bulkerlib/types" - "github.com/jitsucom/bulker/jitsubase/appbase" - "github.com/jitsucom/bulker/jitsubase/safego" - "github.com/jitsucom/bulker/jitsubase/timestamp" - "github.com/jitsucom/bulker/jitsubase/utils" - "github.com/prometheus/client_golang/prometheus" - prom "github.com/prometheus/client_model/go" - "strings" - "time" -) - -// MetricsRelay pushes prometheus metrics to configured bulker destination -type MetricsRelay struct { - appbase.Service - instanceId string - relayBulker bulker.Bulker - ticker *utils.Ticker - counters map[string]float64 -} - -func NewMetricsRelay(appConfig *Config) (*MetricsRelay, error) { - base := appbase.NewServiceBase("metrics_relay") - if appConfig.MetricsRelayDestination == "" || appConfig.MetricsRelayPeriodSec == 0 { - return nil, nil - } - cfg := &DestinationConfig{} - err := hjson.Unmarshal([]byte(appConfig.MetricsRelayDestination), &cfg) - if err != nil { - return nil, fmt.Errorf("error parsing metrics relay destination: %v", err) - } - bulkerInstance, err := bulker.CreateBulker(cfg.Config) - if bulkerInstance == nil { - return nil, fmt.Errorf("error creating metrics relay bulker: %v", err) - } - period := time.Duration(appConfig.MetricsRelayPeriodSec) * time.Second - m := &MetricsRelay{ - Service: base, - instanceId: appConfig.InstanceId, - relayBulker: bulkerInstance, - ticker: utils.NewTicker(period, period), - counters: map[string]float64{}, - } - m.start() - return m, nil - -} - -// start metrics relay -func (m *MetricsRelay) start() { - m.Infof("Starting metrics relay") - safego.RunWithRestart(func() { - for { - select { - case <-m.ticker.C: - err := m.Push() - if err != nil { - m.Errorf("Error pushing metrics: %v", err) - } - case <-m.ticker.Closed: - m.Infof("Metrics relay stopped.") - return - } - } - }) -} - -// Push metrics to destination -func (m *MetricsRelay) Push() (err error) { - stream, err := m.relayBulker.CreateStream("metrics_relays", "bulker_metrics", bulker.Batch, - bulker.WithTimestamp("timestamp")) - if err != nil { - return fmt.Errorf("error creating bulker stream: %v", err) - } - defer func() { - if err != nil { - _, _ = stream.Abort(context.Background()) - } - }() - ts := timestamp.Now().Truncate(m.ticker.Period()) - gatheredData, err := prometheus.DefaultGatherer.Gather() - if err != nil { - return fmt.Errorf("failed to gather metrics: %v", err) - } - - for _, metricFamily := range gatheredData { - if metricFamily.Type == nil || *metricFamily.Type != prom.MetricType_COUNTER { - continue - } - if !strings.HasPrefix(metricFamily.GetName(), "bulkerapp_") { - continue - } - for _, metric := range metricFamily.Metric { - if metric.Counter != nil { - metricEvent := types.Object{} - metricEvent["timestamp"] = ts - metricEvent["instance"] = m.instanceId - metricEvent["metric_type"] = "counter" - metricEvent["metric_name"] = metricFamily.GetName() - key := strings.Builder{} - key.WriteString(metricFamily.GetName()) - key.WriteString("{") - prevLabel := false - for _, label := range metric.Label { - if label.Name == nil || label.Value == nil || *label.Value == "" { - continue - } - if prevLabel { - key.WriteString(",") - } - metricEvent[*label.Name] = *label.Value - key.WriteString(*label.Name) - key.WriteString("=") - key.WriteString(*label.Value) - prevLabel = true - } - key.WriteString("}") - metricKey := key.String() - //metricEvent["metric_key"] = metricKey - prevValue := m.counters[metricKey] - delta := metric.Counter.GetValue() - prevValue - if delta > 0 { - metricEvent["value"] = delta - m.counters[metricKey] = metric.Counter.GetValue() - //m.Infof("Pushing metric: %+v", metricEvent) - _, _, err = stream.Consume(context.Background(), metricEvent) - if err != nil { - return fmt.Errorf("error pushing metric: %v", err) - } - } - } - - } - } - _, err = stream.Complete(context.Background()) - if err != nil { - return fmt.Errorf("error commiting batch: %v", err) - } - return -} - -// Stop metrics relay -func (m *MetricsRelay) Stop() error { - if m == nil { - return nil - } - m.ticker.Stop() - return m.relayBulker.Close() -} diff --git a/bulkerapp/app/repository.go b/bulkerapp/app/repository.go index 306c015..d8b03c4 100644 --- a/bulkerapp/app/repository.go +++ b/bulkerapp/app/repository.go @@ -68,7 +68,6 @@ func (r *Repository) init() error { } else if !newDst.equals(oldDestination) { r.Infof("Destination %s (%s) was updated. New Ver: %s", id, newDst.config.BulkerType, newDst.config.UpdatedAt) toRetire = append(toRetire, oldDestination) - newDst.InitBulkerInstance() repositoryChange.ChangedDestinations = append(repositoryChange.ChangedDestinations, newDst) } else { //copy unchanged initialized destinations from old repository @@ -79,7 +78,6 @@ func (r *Repository) init() error { _, ok := oldDestinations[id] if !ok { r.Infof("Destination %s (%s) was added. Ver: %s", id, dst.config.BulkerType, dst.config.UpdatedAt) - dst.InitBulkerInstance() repositoryChange.AddedDestinations = append(repositoryChange.AddedDestinations, dst) } } diff --git a/bulkerapp/app/router.go b/bulkerapp/app/router.go index 49e5568..7e63df1 100644 --- a/bulkerapp/app/router.go +++ b/bulkerapp/app/router.go @@ -182,6 +182,7 @@ func (r *Router) BulkHandler(c *gin.Context) { if len(pkeys) > 0 { streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithMergeRows()) } + destination.InitBulkerInstance() bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...) if err != nil { rError = r.ResponseError(c, http.StatusInternalServerError, "create stream error", true, err, "") diff --git a/bulkerapp/app/stream_consumer.go b/bulkerapp/app/stream_consumer.go index c4948c9..5f3528a 100644 --- a/bulkerapp/app/stream_consumer.go +++ b/bulkerapp/app/stream_consumer.go @@ -3,6 +3,7 @@ package app import ( "bytes" "context" + "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/jitsucom/bulker/bulkerapp/metrics" bulker "github.com/jitsucom/bulker/bulkerlib" @@ -84,17 +85,50 @@ func NewStreamConsumer(repository *Repository, destination *Destination, topicId eventsLogService: eventsLogService, closed: make(chan struct{}), } - bulkerStream, err := sc.destination.bulker.CreateStream(sc.topicId, sc.tableName, bulker.Stream, sc.destination.streamOptions.Options...) - if err != nil { - metrics.ConsumerErrors(sc.topicId, "stream", destination.Id(), tableName, "failed to create bulker stream").Inc() - return nil, base.NewError("Failed to create bulker stream: %v", err) - } - sc.stream.Store(&bulkerStream) + var bs bulker.BulkerStream + bs = &StreamWrapper{destination: destination, topicId: topicId, tableName: tableName} + sc.stream.Store(&bs) sc.start() - sc.destination.Lease() return sc, nil } +type StreamWrapper struct { + destination *Destination + stream bulker.BulkerStream + topicId string + tableName string +} + +func (sw *StreamWrapper) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error) { + if sw.stream == nil { + sw.destination.Lease() + sw.destination.InitBulkerInstance() + bulkerStream, err := sw.destination.bulker.CreateStream(sw.topicId, sw.tableName, bulker.Stream, sw.destination.streamOptions.Options...) + if err != nil { + metrics.ConsumerErrors(sw.topicId, "stream", sw.destination.Id(), sw.tableName, "failed to create bulker stream").Inc() + return bulker.State{}, nil, fmt.Errorf("Failed to create bulker stream: %v", err) + } + sw.stream = bulkerStream + } + return sw.stream.Consume(ctx, object) +} + +func (sw *StreamWrapper) Abort(ctx context.Context) (bulker.State, error) { + if sw.stream == nil { + return bulker.State{}, nil + } + sw.destination.Release() + return sw.stream.Abort(ctx) +} + +func (sw *StreamWrapper) Complete(ctx context.Context) (bulker.State, error) { + if sw.stream == nil { + return bulker.State{}, nil + } + sw.destination.Release() + return sw.stream.Complete(ctx) +} + func (sc *StreamConsumer) restartConsumer() { sc.Infof("Restarting consumer") go func(c *kafka.Consumer) { @@ -232,19 +266,13 @@ func (sc *StreamConsumer) Close() error { // UpdateDestination func (sc *StreamConsumer) UpdateDestination(destination *Destination) error { sc.Infof("[Updating stream consumer for topic. Ver: %s", sc.destination.config.UpdatedAt) - destination.Lease() //create new stream - bulkerStream, err := destination.bulker.CreateStream(sc.topicId, sc.tableName, bulker.Stream, destination.streamOptions.Options...) - if err != nil { - return sc.NewError("Failed to create bulker stream: %v", err) - } - oldBulkerStream := sc.stream.Swap(&bulkerStream) - state, err := (*oldBulkerStream).Complete(context.Background()) + var bs bulker.BulkerStream + bs = &StreamWrapper{destination: destination, topicId: sc.topicId, tableName: sc.tableName} + oldBulkerStream := sc.stream.Swap(&bs) + state, _ := (*oldBulkerStream).Complete(context.Background()) sc.Infof("Previous stream state: %+v", state) - oldDestination := sc.destination - oldDestination.Release() - sc.destination = destination return nil }