From 21b83256ed7d87473c82ff9cd371f14701d220a8 Mon Sep 17 00:00:00 2001 From: Unknwon Date: Tue, 17 Jul 2018 20:26:25 +0800 Subject: [PATCH] sender: fix SkipDeepCopySender check not working --- mgr/runner.go | 5 ++++- sender/discard/discard.go | 2 +- sender/elasticsearch/elasticsearch.go | 2 +- sender/fault_tolerant.go | 8 +++++++- sender/fault_tolerant/fault_tolerant_test.go | 18 ++++++++++++++++++ sender/file/file.go | 2 +- sender/http/http.go | 2 +- sender/influxdb/influxdb.go | 2 +- sender/kafka/kafka.go | 2 +- sender/mock/mock.go | 2 +- sender/mongodb/mongodb.go | 2 +- sender/sender.go | 3 ++- 12 files changed, 39 insertions(+), 11 deletions(-) diff --git a/mgr/runner.go b/mgr/runner.go index b282f3f39..e74054469 100644 --- a/mgr/runner.go +++ b/mgr/runner.go @@ -663,7 +663,10 @@ func classifySenderData(senders []sender.Sender, datas []Data, router *router.Ro continue } - _, skip := senders[i].(sender.SkipDeepCopySender) + skip := false + if ss, ok := senders[i].(sender.SkipDeepCopySender); ok { + skip = ss.SkipDeepCopy() + } if skip || skipCopyAll || i == lastIdx { senderDataList[i] = datas } else { diff --git a/sender/discard/discard.go b/sender/discard/discard.go index 9710b8fe3..2b3dcf14b 100644 --- a/sender/discard/discard.go +++ b/sender/discard/discard.go @@ -45,4 +45,4 @@ func (s *Sender) SendCount() int { return s.count } -func (_ *Sender) SkipDeepCopy() {} +func (_ *Sender) SkipDeepCopy() bool { return true } diff --git a/sender/elasticsearch/elasticsearch.go b/sender/elasticsearch/elasticsearch.go index 121860a5c..47800dc11 100644 --- a/sender/elasticsearch/elasticsearch.go +++ b/sender/elasticsearch/elasticsearch.go @@ -267,4 +267,4 @@ func (ess *Sender) wrapDoc(doc map[string]interface{}) map[string]interface{} { return doc } -func (_ *Sender) SkipDeepCopy() {} +func (_ *Sender) SkipDeepCopy() bool { return true } diff --git a/sender/fault_tolerant.go b/sender/fault_tolerant.go index 5540a1c00..bfa100428 100644 --- a/sender/fault_tolerant.go +++ b/sender/fault_tolerant.go @@ -506,7 +506,13 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read } } -func (_ *FtSender) SkipDeepCopy() {} +func (ft *FtSender) SkipDeepCopy() bool { + ss, ok := ft.innerSender.(SkipDeepCopySender) + if ok { + return ss.SkipDeepCopy() + } + return false +} func SplitData(data string, splitSize int64) (valArray []string) { if splitSize <= 0 { diff --git a/sender/fault_tolerant/fault_tolerant_test.go b/sender/fault_tolerant/fault_tolerant_test.go index 86748a44c..4509fd66d 100644 --- a/sender/fault_tolerant/fault_tolerant_test.go +++ b/sender/fault_tolerant/fault_tolerant_test.go @@ -513,3 +513,21 @@ func TestTypeSchemaRetry(t *testing.T) { t.Error("Ft send error exp 1 but got ", fts.BackupQueue.Depth()) } } + +func TestSkipDeepCopySender(t *testing.T) { + defer os.RemoveAll("tmp") + + // Skip == false + { + fs, err := sender.NewFtSender(&pandora.Sender{}, nil, "tmp") + assert.Nil(t, err) + assert.False(t, fs.SkipDeepCopy()) + } + + // Skip == true + { + fs, err := sender.NewFtSender(&mock.Sender{}, nil, "tmp") + assert.Nil(t, err) + assert.True(t, fs.SkipDeepCopy()) + } +} diff --git a/sender/file/file.go b/sender/file/file.go index 8a58ef703..0dc02ffd8 100644 --- a/sender/file/file.go +++ b/sender/file/file.go @@ -82,4 +82,4 @@ func JSONLineMarshalFunc(datas []Data) ([]byte, error) { return append(bytes, '\n'), nil } -func (_ *Sender) SkipDeepCopy() {} +func (_ *Sender) SkipDeepCopy() bool { return true } diff --git a/sender/http/http.go b/sender/http/http.go index aa17eca0a..56d997e43 100644 --- a/sender/http/http.go +++ b/sender/http/http.go @@ -224,4 +224,4 @@ func (h *Sender) interfaceJoin(dataArray []interface{}, sep string) (string, err return strings.Join(strData, sep), nil } -func (_ *Sender) SkipDeepCopy() {} +func (_ *Sender) SkipDeepCopy() bool { return true } diff --git a/sender/influxdb/influxdb.go b/sender/influxdb/influxdb.go index 13eae0d16..140378faa 100644 --- a/sender/influxdb/influxdb.go +++ b/sender/influxdb/influxdb.go @@ -503,4 +503,4 @@ func String(in string) string { return in } -func (_ *Sender) SkipDeepCopy() {} +func (_ *Sender) SkipDeepCopy() bool { return true } diff --git a/sender/kafka/kafka.go b/sender/kafka/kafka.go index 64a0f4846..e45ccb2d4 100644 --- a/sender/kafka/kafka.go +++ b/sender/kafka/kafka.go @@ -201,4 +201,4 @@ func (this *Sender) Close() (err error) { return nil } -func (_ *Sender) SkipDeepCopy() {} +func (_ *Sender) SkipDeepCopy() bool { return true } diff --git a/sender/mock/mock.go b/sender/mock/mock.go index a9ec90a46..445e1efe2 100644 --- a/sender/mock/mock.go +++ b/sender/mock/mock.go @@ -63,4 +63,4 @@ func (mock *Sender) SendCount() int { return mock.count } -func (_ *Sender) SkipDeepCopy() {} +func (_ *Sender) SkipDeepCopy() bool { return true } diff --git a/sender/mongodb/mongodb.go b/sender/mongodb/mongodb.go index 5e05f8ee5..d287390c6 100644 --- a/sender/mongodb/mongodb.go +++ b/sender/mongodb/mongodb.go @@ -178,4 +178,4 @@ func (s *Sender) mongoSesssionKeeper(session *mgo.Session) { } } -func (_ *Sender) SkipDeepCopy() {} +func (_ *Sender) SkipDeepCopy() bool { return true } diff --git a/sender/sender.go b/sender/sender.go index 1bc00abd5..6f5705b37 100644 --- a/sender/sender.go +++ b/sender/sender.go @@ -202,7 +202,8 @@ type Sender interface { // SkipDeepCopySender 表示该 sender 不会对传入数据进行污染,凡是有次保证的 sender 需要实现该接口提升发送效率 type SkipDeepCopySender interface { - SkipDeepCopy() + // SkipDeepCopy 需要返回值是因为如果一个 sender 封装了其它 sender,需要根据实际封装的类型返回是否忽略深度拷贝 + SkipDeepCopy() bool } type StatsSender interface {