Skip to content

Commit

Permalink
Merge pull request #626 from wonderflow/opitimizex
Browse files Browse the repository at this point in the history
优化 性能
  • Loading branch information
wonderflow authored Jul 23, 2018
2 parents 3d29834 + 68a04f3 commit 8646fcf
Show file tree
Hide file tree
Showing 22 changed files with 647 additions and 234 deletions.
22 changes: 9 additions & 13 deletions reader/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func TestGetTags(t *testing.T) {
assert.Equal(t, exp, tags)
}

func TestSetMapValueWithPrefix(t *testing.T) {
func TestSetMapValueExistWithPrefix(t *testing.T) {
data1 := map[string]interface{}{
"a": "b",
}
err1 := SetMapValueWithPrefix(data1, "newVal", "prefix", false, "a")
err1 := SetMapValueExistWithPrefix(data1, "newVal", "prefix", "a")
assert.NoError(t, err1)
exp1 := map[string]interface{}{
"a": "b",
Expand All @@ -154,7 +154,7 @@ func TestSetMapValueWithPrefix(t *testing.T) {
"age": 45,
},
}
err2 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "name"}...)
err2 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "name"}...)
assert.NoError(t, err2)
exp2 := map[string]interface{}{
"a": map[string]interface{}{
Expand All @@ -165,10 +165,10 @@ func TestSetMapValueWithPrefix(t *testing.T) {
}
assert.Equal(t, exp2, data2)

err3 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"xy", "name"}...)
assert.Error(t, err3)
err3 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"xy", "name"}...)
assert.NoError(t, err3)

err4 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "hello"}...)
err4 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "hello"}...)
assert.NoError(t, err4)
exp4 := map[string]interface{}{
"a": map[string]interface{}{
Expand All @@ -177,14 +177,10 @@ func TestSetMapValueWithPrefix(t *testing.T) {
"prefix_name": "newVal",
"hello": "newVal",
},
"xy": map[string]interface{}{
"name": "newVal",
},
}
assert.Equal(t, exp4, data2)

data5 := map[string]interface{}{}
err5 := SetMapValueWithPrefix(data5, "newVal", "prefix", true, "a")
assert.NoError(t, err5)
exp5 := map[string]interface{}{
"prefix_a": "newVal",
}
assert.Equal(t, exp5, data5)
}
47 changes: 32 additions & 15 deletions sender/fault_tolerant.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,20 @@ var _ SkipDeepCopySender = &FtSender{}

// FtSender fault tolerance sender wrapper
type FtSender struct {
stopped int32
exitChan chan struct{}
innerSender Sender
logQueue queue.BackendQueue
BackupQueue queue.BackendQueue
writeLimit int // 写入速度限制,单位MB
strategy string
procs int //发送并发数
runnerName string
opt *FtOption
stats StatsInfo
statsMutex *sync.RWMutex
jsontool jsoniter.API
stopped int32
exitChan chan struct{}
innerSender Sender
logQueue queue.BackendQueue
BackupQueue queue.BackendQueue
writeLimit int // 写入速度限制,单位MB
strategy string
procs int //发送并发数
runnerName string
opt *FtOption
stats StatsInfo
statsMutex *sync.RWMutex
jsontool jsoniter.API
pandoraKeyCache map[string]KeyInfo
}

type FtOption struct {
Expand All @@ -59,21 +60,23 @@ type FtOption struct {
memoryChannel bool
memoryChannelSize int
longDataDiscard bool
innerSenderType string
}

type datasContext struct {
Datas []Data `json:"datas"`
}

// NewFtSender Fault tolerant sender constructor
func NewFtSender(ftSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtSender, error) {
func NewFtSender(innerSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtSender, error) {
memoryChannel, _ := conf.GetBoolOr(KeyFtMemoryChannel, false)
memoryChannelSize, _ := conf.GetIntOr(KeyFtMemoryChannelSize, 100)
logPath, _ := conf.GetStringOr(KeyFtSaveLogPath, ftSaveLogPath)
syncEvery, _ := conf.GetIntOr(KeyFtSyncEvery, DefaultFtSyncEvery)
writeLimit, _ := conf.GetIntOr(KeyFtWriteLimit, defaultWriteLimit)
strategy, _ := conf.GetStringOr(KeyFtStrategy, KeyFtStrategyBackupOnly)
longDataDiscard, _ := conf.GetBoolOr(KeyFtLongDataDiscard, false)
senderType, _ := conf.GetStringOr(KeySenderType, "") //此处不会没有SenderType,在调用NewFtSender时已经检查
switch strategy {
case KeyFtStrategyAlwaysSave, KeyFtStrategyBackupOnly, KeyFtStrategyConcurrent:
default:
Expand All @@ -91,9 +94,10 @@ func NewFtSender(ftSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtS
memoryChannel: memoryChannel,
memoryChannelSize: memoryChannelSize,
longDataDiscard: longDataDiscard,
innerSenderType: senderType,
}

return newFtSender(ftSender, runnerName, opt)
return newFtSender(innerSender, runnerName, opt)
}

func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSender, error) {
Expand Down Expand Up @@ -123,6 +127,10 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende
statsMutex: new(sync.RWMutex),
jsontool: jsoniter.Config{EscapeHTML: true, UseNumber: true}.Froze(),
}

if opt.innerSenderType == TypePandora {
ftSender.pandoraKeyCache = make(map[string]KeyInfo)
}
go ftSender.asyncSendLogFromDiskQueue()
return &ftSender, nil
}
Expand All @@ -132,6 +140,15 @@ func (ft *FtSender) Name() string {
}

func (ft *FtSender) Send(datas []Data) error {

switch ft.opt.innerSenderType {
case TypePandora:
for i, v := range datas {
datas[i] = DeepConvertKeyWithCache(v, ft.pandoraKeyCache)
}
default:
}

se := &StatsError{Ft: true}
if ft.strategy == KeyFtStrategyBackupOnly {
// 尝试直接发送数据,当数据失败的时候会加入到本地重试队列。外部不需要重试
Expand Down
99 changes: 99 additions & 0 deletions sender/fault_tolerant/fault_tolerant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,102 @@ func TestSkipDeepCopySender(t *testing.T) {
assert.True(t, fs.SkipDeepCopy())
}
}

func TestPandoraExtraInfo(t *testing.T) {
pandoraServer, pt := mock_pandora.NewMockPandoraWithPrefix("/v2")
conf1 := conf.MapConf{
"force_microsecond": "false",
"ft_memory_channel": "false",
"ft_strategy": "backup_only",
"ignore_invalid_field": "true",
"logkit_send_time": "false",
"pandora_extra_info": "true",
"pandora_ak": "ak",
"pandora_auto_convert_date": "true",
"pandora_gzip": "true",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_region": "nb",
"pandora_repo_name": "TestPandoraSenderTime",
"pandora_schema_free": "true",
"pandora_sk": "sk",
"runner_name": "runner.20171117110730",
"sender_type": "pandora",
"name": "TestPandoraSenderTime",
"KeyPandoraSchemaUpdateInterval": "1s",
}

innerSender, err := pandora.NewSender(conf1)
if err != nil {
t.Fatal(err)
}
s, err := sender.NewFtSender(innerSender, conf1, fttestdir)
defer os.RemoveAll(fttestdir)
if err != nil {
t.Fatal(err)
}
d := Data{}
d["x1"] = "123.2"
d["hostname"] = "123.2"
d["hostname0"] = "123.2"
d["hostname1"] = "123.2"
d["hostname2"] = "123.2"
d["osinfo"] = "123.2"
err = s.Send([]Data{d})
if st, ok := err.(*StatsError); ok {
err = st.ErrorDetail
}
if err != nil {
t.Error(err)
}
resp := pandoraServer.Body
assert.Equal(t, true, strings.Contains(resp, "core"))
assert.Equal(t, true, strings.Contains(resp, "x1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "osinfo=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname0=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname2=123.2"))

conf2 := conf.MapConf{
"force_microsecond": "false",
"ft_memory_channel": "false",
"ft_strategy": "backup_only",
"ignore_invalid_field": "true",
"logkit_send_time": "false",
"pandora_extra_info": "false",
"pandora_ak": "ak",
"pandora_auto_convert_date": "true",
"pandora_gzip": "true",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_region": "nb",
"pandora_repo_name": "TestPandoraSenderTime",
"pandora_schema_free": "true",
"pandora_sk": "sk",
"runner_name": "runner.20171117110730",
"sender_type": "pandora",
"name": "TestPandoraSenderTime",
"KeyPandoraSchemaUpdateInterval": "1s",
}
innerSender, err = pandora.NewSender(conf2)
if err != nil {
t.Fatal(err)
}

s, err = sender.NewFtSender(innerSender, conf1, fttestdir)
d = Data{
"*x1": "123.2",
"x2.dot": "123.2",
"@timestamp": "2018-07-18T10:17:36.549054846+08:00",
}
err = s.Send([]Data{d})
if st, ok := err.(*StatsError); ok {
err = st.ErrorDetail
}
if err != nil {
t.Error(err)
}
resp = pandoraServer.Body
assert.Equal(t, true, strings.Contains(resp, "x1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "x2_dot=123.2"))
assert.Equal(t, true, strings.Contains(resp, "timestamp=2018-07-18T10:17:36.549054846+08:00"))
}
3 changes: 0 additions & 3 deletions sender/pandora/pandora.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,9 +850,6 @@ func (s *Sender) Send(datas []Data) (se error) {
case SendTypeRaw:
return s.rawSend(datas)
default:
for i, v := range datas {
datas[i] = DeepConvertKey(v)
}
return s.schemaFreeSend(datas)
}
return nil
Expand Down
91 changes: 0 additions & 91 deletions sender/pandora/pandora_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,94 +951,3 @@ func TestPandoraSenderTime(t *testing.T) {
resp = pandora.Body
assert.Equal(t, resp, "x1=123.2")
}

func TestPandoraExtraInfo(t *testing.T) {
pandora, pt := mockPandora.NewMockPandoraWithPrefix("/v2")
conf1 := conf.MapConf{
"force_microsecond": "false",
"ft_memory_channel": "false",
"ft_strategy": "backup_only",
"ignore_invalid_field": "true",
"logkit_send_time": "false",
"pandora_extra_info": "true",
"pandora_ak": "ak",
"pandora_auto_convert_date": "true",
"pandora_gzip": "true",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_region": "nb",
"pandora_repo_name": "TestPandoraSenderTime",
"pandora_schema_free": "true",
"pandora_sk": "sk",
"runner_name": "runner.20171117110730",
"sender_type": "pandora",
"name": "TestPandoraSenderTime",
"KeyPandoraSchemaUpdateInterval": "1s",
}
s, err := NewSender(conf1)
if err != nil {
t.Fatal(err)
}
d := Data{}
d["x1"] = "123.2"
d["hostname"] = "123.2"
d["hostname0"] = "123.2"
d["hostname1"] = "123.2"
d["hostname2"] = "123.2"
d["osinfo"] = "123.2"
err = s.Send([]Data{d})
if st, ok := err.(*StatsError); ok {
err = st.ErrorDetail
}
if err != nil {
t.Error(err)
}
resp := pandora.Body
assert.Equal(t, true, strings.Contains(resp, "core"))
assert.Equal(t, true, strings.Contains(resp, "x1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "osinfo=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname0=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "hostname2=123.2"))

conf2 := conf.MapConf{
"force_microsecond": "false",
"ft_memory_channel": "false",
"ft_strategy": "backup_only",
"ignore_invalid_field": "true",
"logkit_send_time": "false",
"pandora_extra_info": "false",
"pandora_ak": "ak",
"pandora_auto_convert_date": "true",
"pandora_gzip": "true",
"pandora_host": "http://127.0.0.1:" + pt,
"pandora_region": "nb",
"pandora_repo_name": "TestPandoraSenderTime",
"pandora_schema_free": "true",
"pandora_sk": "sk",
"runner_name": "runner.20171117110730",
"sender_type": "pandora",
"name": "TestPandoraSenderTime",
"KeyPandoraSchemaUpdateInterval": "1s",
}
s, err = NewSender(conf2)
if err != nil {
t.Fatal(err)
}
d = Data{
"*x1": "123.2",
"x2.dot": "123.2",
"@timestamp": "2018-07-18T10:17:36.549054846+08:00",
}
err = s.Send([]Data{d})
if st, ok := err.(*StatsError); ok {
err = st.ErrorDetail
}
if err != nil {
t.Error(err)
}
resp = pandora.Body
assert.Equal(t, true, strings.Contains(resp, "x1=123.2"))
assert.Equal(t, true, strings.Contains(resp, "x2_dot=123.2"))
assert.Equal(t, true, strings.Contains(resp, "timestamp=2018-07-18T10:17:36.549054846+08:00"))
}
4 changes: 2 additions & 2 deletions sender/rest_senders_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,8 @@ var ModeKeyOptions = map[string][]Option{
KeyName: KeyPandoraAutoConvertDate,
Element: Radio,
ChooseOnly: true,
ChooseOptions: []interface{}{"true", "false"},
Default: "true",
ChooseOptions: []interface{}{"false", "true"},
Default: "false",
DefaultNoUse: false,
Description: "自动转换时间类型(pandora_auto_convert_date)",
Advance: true,
Expand Down
4 changes: 3 additions & 1 deletion sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ func (r *Registry) NewSender(conf conf.MapConf, ftSaveLogPath string) (sender Se
return
}
faultTolerant, _ := conf.GetBoolOr(KeyFaultTolerant, true)
if faultTolerant {

//如果是 PandoraSender,目前的依赖必须启用 ftsender,依赖Ftsender做key转换检查
if faultTolerant || sendType == TypePandora {
sender, err = NewFtSender(sender, conf, ftSaveLogPath)
if err != nil {
return
Expand Down
Loading

0 comments on commit 8646fcf

Please sign in to comment.