From 5c032224056d28d9e95301d3157724596a9b6b8d Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 5 Sep 2023 17:29:12 +0800 Subject: [PATCH 1/4] tests (ticdc): fix unstable integration: http_api (#9681) close pingcap/tiflow#9682 --- .../http_api/util/test_case.py | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/tests/integration_tests/http_api/util/test_case.py b/tests/integration_tests/http_api/util/test_case.py index 605aa2e9704..756758ea885 100644 --- a/tests/integration_tests/http_api/util/test_case.py +++ b/tests/integration_tests/http_api/util/test_case.py @@ -1,6 +1,7 @@ import sys import os import requests as rq +from requests.exceptions import RequestException import time import json @@ -24,6 +25,26 @@ SINK_URI="mysql://normal:%s@127.0.0.1:3306/" % ENPASSWORD physicalShiftBits = 18 + +def requests_get_with_retry(url, max_retries=RETRY_TIME, delay_seconds=1): + """ + requests get with retry + + :param url: request url + :param max_retries: max retry times + :param delay_seconds: retry delay seconds + :return: when success, return response, else return None + """ + for retry in range(max_retries): + try: + response = rq.get(url) + if response.status_code == 200 or response.status_code == 202: + return response + except RequestException as e: + print(f"request fails {retry + 1}/{max_retries} time retry...") + time.sleep(delay_seconds) + return None + # we should write some SQLs in the run.sh after call create_changefeed def create_changefeed(sink_uri): url = BASE_URL1+"/changefeeds" @@ -245,7 +266,7 @@ def resign_owner(): def list_capture(): url = BASE_URL0 + "/captures" - resp = rq.get(url) + resp = requests_get_with_retry(url) assert resp.status_code == rq.codes.ok print("pass test: list captures") @@ -253,7 +274,7 @@ def list_capture(): def list_processor(): url = BASE_URL0 + "/processors" - resp = rq.get(url) + resp = requests_get_with_retry(url) assert resp.status_code == rq.codes.ok print("pass test: list processors") @@ -262,17 +283,16 @@ def list_processor(): def get_processor(): # list processor to get changefeed_id and capture_id base_url = BASE_URL0 + "/processors" - resp = rq.get(base_url) + resp = requests_get_with_retry(base_url) assert resp.status_code == rq.codes.ok data = resp.json()[0] time.sleep(2) url = base_url + "/" + data["changefeed_id"] + "/" + data["capture_id"] - resp = rq.get(url) + resp = requests_get_with_retry(url) # print error message for debug if (resp.status_code != rq.codes.ok): print("request url", url) print("response status code:", resp.status_code) - print("response body:", resp.text()) assert resp.status_code == rq.codes.ok # test capture_id error and cdc server no panic @@ -297,7 +317,7 @@ def check_health(): def get_status(): url = BASE_URL0 + "/status" - resp = rq.get(url) + resp = requests_get_with_retry(url) assert resp.status_code == rq.codes.ok assert resp.json()["is_owner"] From 63ffe68fc6b7e7a35c47edce41d0404b5e234c65 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 5 Sep 2023 18:02:13 +0800 Subject: [PATCH 2/4] metrics (ticdc): fix error metrics expression (#9648) close pingcap/tiflow#9649 --- metrics/grafana/TiCDC-Monitor-Summary.json | 22 +++++++++++++------- metrics/grafana/ticdc.json | 24 ++++++++++++++-------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/metrics/grafana/TiCDC-Monitor-Summary.json b/metrics/grafana/TiCDC-Monitor-Summary.json index 4740f483fa5..f046cc34218 100644 --- a/metrics/grafana/TiCDC-Monitor-Summary.json +++ b/metrics/grafana/TiCDC-Monitor-Summary.json @@ -709,7 +709,7 @@ }, { "datasource": "${DS_TEST-CLUSTER}", - "description": "The status of each changefeed.\n\n0: Normal\n\n1: Error\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown", + "description": "The status of each changefeed.\n\n0: Normal\n\n1 and 6: Warning\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown", "fieldConfig": { "defaults": { "color": { @@ -748,7 +748,7 @@ { "from": "", "id": 2, - "text": "Error", + "text": "Warning", "to": "", "type": 1, "value": "1" @@ -780,22 +780,30 @@ { "from": "", "id": 6, + "text": "Warning", + "to": "", + "type": 1, + "value": "6" + }, + { + "from": "", + "id": 7, "text": "Unknown", "to": "", "type": 1, "value": "-1" }, { - "from": "5", - "id": 7, + "from": "7", + "id": 8, "text": "Other", "to": "10000", "type": 1, - "value": "5" + "value": "7" }, { - "from": "6", - "id": 8, + "from": "7", + "id": 9, "text": "-", "to": "1000", "type": 2 diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 1cfe3758715..4649e06e286 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -1073,7 +1073,7 @@ "steppedLine": false, "targets": [ { - "expr": "sum(increase(tikv_cdc_scan_duration_seconds_sum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$tikv_instance\"}[1m])) by (type, instance)", + "expr": "sum(increase(tikv_cdc_scan_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$tikv_instance\"}[1m])) by (type, instance)", "format": "time_series", "hide": false, "intervalFactor": 1, @@ -3900,7 +3900,7 @@ }, { "datasource": "${DS_TEST-CLUSTER}", - "description": "The status of each changefeed.\n\n0: Normal\n\n1: Error\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown", + "description": "The status of each changefeed.\n\n0: Normal\n\n1 and 6: Warning\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown", "fieldConfig": { "defaults": { "color": { @@ -3939,7 +3939,7 @@ { "from": "", "id": 2, - "text": "Error", + "text": "Warning", "to": "", "type": 1, "value": "1" @@ -3971,22 +3971,30 @@ { "from": "", "id": 6, + "text": "Warning", + "to": "", + "type": 1, + "value": "6" + }, + { + "from": "", + "id": 7, "text": "Unknown", "to": "", "type": 1, "value": "-1" }, { - "from": "5", - "id": 7, + "from": "7", + "id": 8, "text": "Other", "to": "10000", "type": 1, - "value": "5" + "value": "7" }, { - "from": "6", - "id": 8, + "from": "7", + "id": 9, "text": "-", "to": "1000", "type": 2 From 2342b4efd859c318f79ee2e26137ddac86c9b3cf Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 5 Sep 2023 20:35:13 +0800 Subject: [PATCH 3/4] config(ticdc): refine the kafka config adjust and validation (#9679) close pingcap/tiflow#9678 --- pkg/compression/compress.go | 4 ++-- pkg/config/large_message.go | 21 +++++++++------- pkg/config/large_message_test.go | 24 +++++++++---------- pkg/config/replica_config_test.go | 20 ++++++++++++++++ pkg/config/sink.go | 40 +++++++++++++++++++++++-------- pkg/sink/codec/common/config.go | 4 ++-- 6 files changed, 79 insertions(+), 34 deletions(-) diff --git a/pkg/compression/compress.go b/pkg/compression/compress.go index 72510a5e058..2704e683868 100644 --- a/pkg/compression/compress.go +++ b/pkg/compression/compress.go @@ -61,7 +61,7 @@ func Encode(cc string, data []byte) ([]byte, error) { default: } - return nil, cerror.ErrCompressionFailed.GenWithStack("Unsupported compression %d", cc) + return nil, cerror.ErrCompressionFailed.GenWithStack("Unsupported compression %s", cc) } // Decode the given data by the given compression codec. @@ -81,5 +81,5 @@ func Decode(cc string, data []byte) ([]byte, error) { default: } - return nil, cerror.ErrCompressionFailed.GenWithStack("Unsupported compression %d", cc) + return nil, cerror.ErrCompressionFailed.GenWithStack("Unsupported compression %s", cc) } diff --git a/pkg/config/large_message.go b/pkg/config/large_message.go index d56e7691ea9..83d6d255853 100644 --- a/pkg/config/large_message.go +++ b/pkg/config/large_message.go @@ -42,16 +42,21 @@ func NewDefaultLargeMessageHandleConfig() *LargeMessageHandleConfig { } } -// Validate the Config. -func (c *LargeMessageHandleConfig) Validate(protocol Protocol, enableTiDBExtension bool) error { - // compression can be enabled independently - if c.LargeMessageHandleCompression != "" { - if !compression.Supported(c.LargeMessageHandleCompression) { - return cerror.ErrInvalidReplicaConfig.GenWithStack( - "large message handle compression is not supported, got %s", c.LargeMessageHandleCompression) - } +// AdjustAndValidate the Config. +func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTiDBExtension bool) error { + if c.LargeMessageHandleOption == "" { + c.LargeMessageHandleOption = LargeMessageHandleOptionNone + } + + if c.LargeMessageHandleCompression == "" { + c.LargeMessageHandleCompression = compression.None } + // compression can be enabled independently + if !compression.Supported(c.LargeMessageHandleCompression) { + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle compression is not supported, got %s", c.LargeMessageHandleCompression) + } if c.LargeMessageHandleOption == LargeMessageHandleOptionNone { return nil } diff --git a/pkg/config/large_message_test.go b/pkg/config/large_message_test.go index e06d5717632..3bc1dd6b212 100644 --- a/pkg/config/large_message_test.go +++ b/pkg/config/large_message_test.go @@ -29,19 +29,19 @@ func TestLargeMessageHandle4Compression(t *testing.T) { // unsupported compression, return error largeMessageHandle.LargeMessageHandleCompression = "zstd" - err := largeMessageHandle.Validate(ProtocolCanalJSON, false) + err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) largeMessageHandle.LargeMessageHandleCompression = compression.LZ4 - err = largeMessageHandle.Validate(ProtocolCanalJSON, false) + err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) require.NoError(t, err) largeMessageHandle.LargeMessageHandleCompression = compression.Snappy - err = largeMessageHandle.Validate(ProtocolCanalJSON, false) + err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) require.NoError(t, err) largeMessageHandle.LargeMessageHandleCompression = compression.None - err = largeMessageHandle.Validate(ProtocolCanalJSON, false) + err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) require.NoError(t, err) } @@ -50,11 +50,11 @@ func TestLargeMessageHandle4NotSupportedProtocol(t *testing.T) { largeMessageHandle := NewDefaultLargeMessageHandleConfig() - err := largeMessageHandle.Validate(ProtocolCanal, true) + err := largeMessageHandle.AdjustAndValidate(ProtocolCanal, true) require.NoError(t, err) largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly - err = largeMessageHandle.Validate(ProtocolCanal, true) + err = largeMessageHandle.AdjustAndValidate(ProtocolCanal, true) require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) } @@ -64,7 +64,7 @@ func TestLargeMessageHandle4CanalJSON(t *testing.T) { // large-message-handle not set, always no error largeMessageHandle := NewDefaultLargeMessageHandleConfig() - err := largeMessageHandle.Validate(ProtocolCanalJSON, false) + err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) require.NoError(t, err) require.True(t, largeMessageHandle.Disabled()) @@ -78,11 +78,11 @@ func TestLargeMessageHandle4CanalJSON(t *testing.T) { } // `enable-tidb-extension` is false, return error - err := largeMessageHandle.Validate(ProtocolCanalJSON, false) + err := largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, false) require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) // `enable-tidb-extension` is true, no error - err = largeMessageHandle.Validate(ProtocolCanalJSON, true) + err = largeMessageHandle.AdjustAndValidate(ProtocolCanalJSON, true) require.NoError(t, err) require.Equal(t, option, largeMessageHandle.LargeMessageHandleOption) } @@ -94,7 +94,7 @@ func TestLargeMessageHandle4OpenProtocol(t *testing.T) { // large-message-handle not set, always no error largeMessageHandle := NewDefaultLargeMessageHandleConfig() - err := largeMessageHandle.Validate(ProtocolOpen, false) + err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) require.NoError(t, err) require.True(t, largeMessageHandle.Disabled()) @@ -108,11 +108,11 @@ func TestLargeMessageHandle4OpenProtocol(t *testing.T) { } // `enable-tidb-extension` is false, return error - err := largeMessageHandle.Validate(ProtocolOpen, false) + err := largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) require.NoError(t, err) // `enable-tidb-extension` is true, no error - err = largeMessageHandle.Validate(ProtocolOpen, true) + err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, true) require.NoError(t, err) require.Equal(t, o, largeMessageHandle.LargeMessageHandleOption) diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index c610e0b7113..60bffb2e0f7 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/pingcap/tiflow/pkg/compression" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" @@ -325,3 +326,22 @@ func TestIsSinkCompatibleWithSpanReplication(t *testing.T) { require.Equal(t, compatible, tt.compatible, tt.name) } } + +func TestValidateAndAdjustLargeMessageHandle(t *testing.T) { + cfg := GetDefaultReplicaConfig() + cfg.Sink.KafkaConfig = &KafkaConfig{ + LargeMessageHandle: NewDefaultLargeMessageHandleConfig(), + } + cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption = "" + cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleCompression = "" + + rawURL := "kafka://127.0.0.1:9092/canal-json-test?protocol=canal-json&enable-tidb-extension=true" + sinkURL, err := url.Parse(rawURL) + require.NoError(t, err) + + err = cfg.ValidateAndAdjust(sinkURL) + require.NoError(t, err) + + require.Equal(t, LargeMessageHandleOptionNone, cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption) + require.Equal(t, compression.None, cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleCompression) +} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index c0ea20e3fec..305d8fb60cc 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -16,6 +16,7 @@ package config import ( "fmt" "net/url" + "strconv" "strings" "time" @@ -540,6 +541,33 @@ type CloudStorageConfig struct { } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { + if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { + return err + } + + if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) { + return nil + } + + protocol, _ := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) + + if s.KafkaConfig != nil && s.KafkaConfig.LargeMessageHandle != nil { + var ( + enableTiDBExtension bool + err error + ) + if s := sinkURI.Query().Get("enable-tidb-extension"); s != "" { + enableTiDBExtension, err = strconv.ParseBool(s) + if err != nil { + return errors.Trace(err) + } + } + err = s.KafkaConfig.LargeMessageHandle.AdjustAndValidate(protocol, enableTiDBExtension) + if err != nil { + return err + } + } + if s.SchemaRegistry != nil && (s.KafkaConfig != nil && s.KafkaConfig.GlueSchemaRegistryConfig != nil) { return cerror.ErrInvalidReplicaConfig. @@ -548,6 +576,7 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { "schema-registry is used by confluent schema registry, " + "glue-schema-registry-config is used by aws glue schema registry") } + if s.KafkaConfig != nil && s.KafkaConfig.GlueSchemaRegistryConfig != nil { err := s.KafkaConfig.GlueSchemaRegistryConfig.Validate() if err != nil { @@ -555,14 +584,6 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { } } - if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { - return err - } - - if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) { - return nil - } - if s.PulsarConfig != nil { if err := s.PulsarConfig.validate(); err != nil { return err @@ -595,7 +616,6 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { s.Terminator = util.AddressOf(CRLF) } - protocol, _ := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) if util.GetOrZero(s.DeleteOnlyOutputHandleKeyColumns) && protocol == ProtocolCsv { return cerror.ErrSinkInvalidConfig.GenWithStack( "CSV protocol always output all columns for the delete event, " + @@ -656,7 +676,7 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { return err } - // Validate that protocol is compatible with the scheme. For testing purposes, + // Adjust that protocol is compatible with the scheme. For testing purposes, // any protocol should be legal for blackhole. if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { _, err := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 350bc74f4ff..8e7ba8f241a 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -188,7 +188,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er c.IncludeCommitTs = replicaConfig.Sink.CSVConfig.IncludeCommitTs c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod } - if replicaConfig.Sink.KafkaConfig != nil { + if replicaConfig.Sink.KafkaConfig != nil && replicaConfig.Sink.KafkaConfig.LargeMessageHandle != nil { c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle } if !c.LargeMessageHandle.Disabled() && replicaConfig.ForceReplicate { @@ -324,7 +324,7 @@ func (c *Config) Validate() error { } if c.LargeMessageHandle != nil { - err := c.LargeMessageHandle.Validate(c.Protocol, c.EnableTiDBExtension) + err := c.LargeMessageHandle.AdjustAndValidate(c.Protocol, c.EnableTiDBExtension) if err != nil { return err } From e1730e53db02dcffefc2f4b876e53a525f71e20e Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 5 Sep 2023 23:09:41 +0800 Subject: [PATCH 4/4] pulsar (ticdc): pulsar use ticdc log (#9674) ref pingcap/tiflow#9413 --- pkg/sink/pulsar/factory.go | 18 ++-- pkg/sink/pulsar/logger.go | 163 +++++++++++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+), 9 deletions(-) create mode 100644 pkg/sink/pulsar/logger.go diff --git a/pkg/sink/pulsar/factory.go b/pkg/sink/pulsar/factory.go index f27e639727a..8e0cdd11068 100644 --- a/pkg/sink/pulsar/factory.go +++ b/pkg/sink/pulsar/factory.go @@ -14,8 +14,6 @@ package pulsar import ( - "fmt" - "github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/pingcap/log" @@ -30,7 +28,7 @@ type FactoryCreator func(config *config.PulsarConfig, changefeedID model.ChangeF // NewCreatorFactory returns a factory implemented based on kafka-go func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFeedID, sinkConfig *config.SinkConfig) (pulsar.Client, error) { - co := pulsar.ClientOptions{ + option := pulsar.ClientOptions{ URL: config.GetBrokerURL(), CustomMetricsLabels: map[string]string{ "changefeed": changefeedID.ID, @@ -40,10 +38,11 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee OperationTimeout: config.OperationTimeout.Duration(), // add pulsar default metrics MetricsRegisterer: mq.GetMetricRegistry(), + Logger: NewPulsarLogger(), } var err error - co.Authentication, err = setupAuthentication(config) + option.Authentication, err = setupAuthentication(config) if err != nil { log.Error("setup pulsar authentication fail", zap.Error(err)) return nil, err @@ -54,13 +53,13 @@ func NewCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFee sinkPulsar := sinkConfig.PulsarConfig if sinkPulsar.TLSCertificateFile != nil && sinkPulsar.TLSKeyFilePath != nil && sinkPulsar.TLSTrustCertsFilePath != nil { - co.TLSCertificateFile = *sinkPulsar.TLSCertificateFile - co.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath - co.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath + option.TLSCertificateFile = *sinkPulsar.TLSCertificateFile + option.TLSKeyFilePath = *sinkPulsar.TLSKeyFilePath + option.TLSTrustCertsFilePath = *sinkPulsar.TLSTrustCertsFilePath } } - pulsarClient, err := pulsar.NewClient(co) + pulsarClient, err := pulsar.NewClient(option) if err != nil { log.Error("cannot connect to pulsar", zap.Error(err)) return nil, err @@ -93,7 +92,8 @@ func setupAuthentication(config *config.PulsarConfig) (pulsar.Authentication, er if config.AuthTLSCertificatePath != nil && config.AuthTLSPrivateKeyPath != nil { return pulsar.NewAuthenticationTLS(*config.AuthTLSCertificatePath, *config.AuthTLSPrivateKeyPath), nil } - return nil, fmt.Errorf("no authentication method found") + log.Info("No authentication configured for pulsar client") + return nil, nil } // NewMockCreatorFactory returns a factory implemented based on kafka-go diff --git a/pkg/sink/pulsar/logger.go b/pkg/sink/pulsar/logger.go new file mode 100644 index 00000000000..835f0830199 --- /dev/null +++ b/pkg/sink/pulsar/logger.go @@ -0,0 +1,163 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulsar + +import ( + "github.com/apache/pulsar-client-go/pulsar/log" + plog "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Logger wrapper cdc logger to adapt pulsar logger +type Logger struct { + zapLogger *zap.Logger +} + +// SubLogger sub +func (p *Logger) SubLogger(fields log.Fields) log.Logger { + subLogger := p.zapLogger + for k, v := range fields { + subLogger = subLogger.With(zap.Any(k, v)) + } + return &Logger{subLogger} +} + +// WithFields with fields +func (p *Logger) WithFields(fields log.Fields) log.Entry { + return &LoggerEntry{ + fields: fields, + logger: p.zapLogger, + } +} + +// WithField with field +func (p *Logger) WithField(name string, value interface{}) log.Entry { + return &LoggerEntry{ + fields: log.Fields{name: value}, + logger: p.zapLogger, + } +} + +// WithError error +func (p *Logger) WithError(err error) log.Entry { + return &LoggerEntry{ + fields: log.Fields{"error": err}, + logger: p.zapLogger, + } +} + +// Debug debug +func (p *Logger) Debug(args ...interface{}) { + p.zapLogger.Sugar().Debug(args...) +} + +// Info info +func (p *Logger) Info(args ...interface{}) { + p.zapLogger.Sugar().Info(args...) +} + +// Warn warn +func (p *Logger) Warn(args ...interface{}) { + p.zapLogger.Sugar().Warn(args...) +} + +// Error error +func (p *Logger) Error(args ...interface{}) { + p.zapLogger.Sugar().Error(args...) +} + +// Debugf debugf +func (p *Logger) Debugf(format string, args ...interface{}) { + p.zapLogger.Sugar().Debugf(format, args...) +} + +// Infof infof +func (p *Logger) Infof(format string, args ...interface{}) { + p.zapLogger.Sugar().Infof(format, args...) +} + +// Warnf warnf +func (p *Logger) Warnf(format string, args ...interface{}) { + p.zapLogger.Sugar().Warnf(format, args...) +} + +// Errorf errorf +func (p *Logger) Errorf(format string, args ...interface{}) { + p.zapLogger.Sugar().Errorf(format, args...) +} + +// NewPulsarLogger new pulsar logger +func NewPulsarLogger() *Logger { + return &Logger{ + zapLogger: plog.L(), + } +} + +// LoggerEntry pulsar logger entry +type LoggerEntry struct { + fields log.Fields + logger *zap.Logger +} + +// WithFields with fields +func (p *LoggerEntry) WithFields(fields log.Fields) log.Entry { + p.fields = fields + return p +} + +// WithField with field +func (p *LoggerEntry) WithField(name string, value interface{}) log.Entry { + p.fields[name] = value + return p +} + +// Debug debug +func (p *LoggerEntry) Debug(args ...interface{}) { + p.logger.Sugar().Debug(args...) +} + +// Info info +func (p *LoggerEntry) Info(args ...interface{}) { + p.logger.Sugar().Info(args...) +} + +// Warn warn +func (p *LoggerEntry) Warn(args ...interface{}) { + p.logger.Sugar().Warn(args...) +} + +// Error error +func (p *LoggerEntry) Error(args ...interface{}) { + p.logger.Sugar().Error(args...) +} + +// Debugf debugf +func (p *LoggerEntry) Debugf(format string, args ...interface{}) { + p.logger.Sugar().Debugf(format, args...) +} + +// Infof infof +func (p *LoggerEntry) Infof(format string, args ...interface{}) { + p.logger.Sugar().Infof(format, args...) +} + +// Warnf warnf +func (p *LoggerEntry) Warnf(format string, args ...interface{}) { + p.logger.Sugar().Warnf(format, args...) +} + +// Errorf errorf +func (p *LoggerEntry) Errorf(format string, args ...interface{}) { + p.logger.Sugar().Errorf(format, args...) +}