Skip to content

Commit

Permalink
Merge pull request #609 from Unknwon/fix-skip-deep-copy-sender
Browse files Browse the repository at this point in the history
sender: fix SkipDeepCopySender check not working
  • Loading branch information
wonderflow authored Jul 17, 2018
2 parents 1ae60c1 + 21b8325 commit 096fcd7
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 11 deletions.
5 changes: 4 additions & 1 deletion mgr/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,10 @@ func classifySenderData(senders []sender.Sender, datas []Data, router *router.Ro
continue
}

_, skip := senders[i].(sender.SkipDeepCopySender)
skip := false
if ss, ok := senders[i].(sender.SkipDeepCopySender); ok {
skip = ss.SkipDeepCopy()
}
if skip || skipCopyAll || i == lastIdx {
senderDataList[i] = datas
} else {
Expand Down
2 changes: 1 addition & 1 deletion sender/discard/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ func (s *Sender) SendCount() int {
return s.count
}

func (_ *Sender) SkipDeepCopy() {}
func (_ *Sender) SkipDeepCopy() bool { return true }
2 changes: 1 addition & 1 deletion sender/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,4 @@ func (ess *Sender) wrapDoc(doc map[string]interface{}) map[string]interface{} {
return doc
}

func (_ *Sender) SkipDeepCopy() {}
func (_ *Sender) SkipDeepCopy() bool { return true }
8 changes: 7 additions & 1 deletion sender/fault_tolerant.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,13 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read
}
}

func (_ *FtSender) SkipDeepCopy() {}
func (ft *FtSender) SkipDeepCopy() bool {
ss, ok := ft.innerSender.(SkipDeepCopySender)
if ok {
return ss.SkipDeepCopy()
}
return false
}

func SplitData(data string, splitSize int64) (valArray []string) {
if splitSize <= 0 {
Expand Down
18 changes: 18 additions & 0 deletions sender/fault_tolerant/fault_tolerant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,3 +513,21 @@ func TestTypeSchemaRetry(t *testing.T) {
t.Error("Ft send error exp 1 but got ", fts.BackupQueue.Depth())
}
}

func TestSkipDeepCopySender(t *testing.T) {
defer os.RemoveAll("tmp")

// Skip == false
{
fs, err := sender.NewFtSender(&pandora.Sender{}, nil, "tmp")
assert.Nil(t, err)
assert.False(t, fs.SkipDeepCopy())
}

// Skip == true
{
fs, err := sender.NewFtSender(&mock.Sender{}, nil, "tmp")
assert.Nil(t, err)
assert.True(t, fs.SkipDeepCopy())
}
}
2 changes: 1 addition & 1 deletion sender/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ func JSONLineMarshalFunc(datas []Data) ([]byte, error) {
return append(bytes, '\n'), nil
}

func (_ *Sender) SkipDeepCopy() {}
func (_ *Sender) SkipDeepCopy() bool { return true }
2 changes: 1 addition & 1 deletion sender/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,4 @@ func (h *Sender) interfaceJoin(dataArray []interface{}, sep string) (string, err
return strings.Join(strData, sep), nil
}

func (_ *Sender) SkipDeepCopy() {}
func (_ *Sender) SkipDeepCopy() bool { return true }
2 changes: 1 addition & 1 deletion sender/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,4 +503,4 @@ func String(in string) string {
return in
}

func (_ *Sender) SkipDeepCopy() {}
func (_ *Sender) SkipDeepCopy() bool { return true }
2 changes: 1 addition & 1 deletion sender/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,4 @@ func (this *Sender) Close() (err error) {
return nil
}

func (_ *Sender) SkipDeepCopy() {}
func (_ *Sender) SkipDeepCopy() bool { return true }
2 changes: 1 addition & 1 deletion sender/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ func (mock *Sender) SendCount() int {
return mock.count
}

func (_ *Sender) SkipDeepCopy() {}
func (_ *Sender) SkipDeepCopy() bool { return true }
2 changes: 1 addition & 1 deletion sender/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ func (s *Sender) mongoSesssionKeeper(session *mgo.Session) {
}
}

func (_ *Sender) SkipDeepCopy() {}
func (_ *Sender) SkipDeepCopy() bool { return true }
3 changes: 2 additions & 1 deletion sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ type Sender interface {

// SkipDeepCopySender 表示该 sender 不会对传入数据进行污染,凡是有次保证的 sender 需要实现该接口提升发送效率
type SkipDeepCopySender interface {
SkipDeepCopy()
// SkipDeepCopy 需要返回值是因为如果一个 sender 封装了其它 sender,需要根据实际封装的类型返回是否忽略深度拷贝
SkipDeepCopy() bool
}

type StatsSender interface {
Expand Down

0 comments on commit 096fcd7

Please sign in to comment.