From 33b1a9676d7f08579f42c4647decf77c8565bcd7 Mon Sep 17 00:00:00 2001 From: wonderflow Date: Fri, 20 Jul 2018 16:49:00 +0800 Subject: [PATCH 1/2] add urlparam --- transforms/mutate/urlparam.go | 25 +++++++++++++++++++++++-- transforms/mutate/urlparam_test.go | 19 +++++++++++++------ 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/transforms/mutate/urlparam.go b/transforms/mutate/urlparam.go index 0a67eeae1..5f81cca3c 100644 --- a/transforms/mutate/urlparam.go +++ b/transforms/mutate/urlparam.go @@ -12,7 +12,10 @@ import ( . "github.com/qiniu/logkit/utils/models" ) -const urlParamPath = "url_param_path" +const ( + urlParamPath = "url_param_path" + urlParamHost = "url_param_host" +) var ( _ transforms.StatsTransformer = &UrlParam{} @@ -26,11 +29,29 @@ type UrlParam struct { func (p *UrlParam) transformToMap(strVal string, key string) (map[string]interface{}, error) { resultMap := make(map[string]interface{}) + var urlPath string if idx := strings.Index(strVal, "?"); idx != -1 { if len(strVal[:idx]) != 0 { - resultMap[key+"_"+urlParamPath] = strVal[:idx] + urlPath = strVal[:idx] } strVal = strVal[idx+1:] + } else { + urlPath = strVal + } + if len(urlPath) > 0 { + uri, err := url.Parse(urlPath) + if err != nil { + return nil, err + } + if len(uri.Path) > 0 { + //如果同时满足不包含前缀`/`,还包含`&`,说明是个param + if strings.HasPrefix(uri.Path, "/") || !strings.Contains(uri.Path, "&") { + resultMap[key+"_"+urlParamPath] = uri.Path + } + } + if len(uri.Host) > 0 { + resultMap[key+"_"+urlParamHost] = uri.Host + } } if len(strVal) < 1 { return resultMap, nil diff --git a/transforms/mutate/urlparam_test.go b/transforms/mutate/urlparam_test.go index afbfe4261..3dec3f716 100644 --- a/transforms/mutate/urlparam_test.go +++ b/transforms/mutate/urlparam_test.go @@ -18,6 +18,8 @@ func TestParamTransformer(t *testing.T) { {"myword": "?platform=2&vid=372&vu=caea966558&chan=android_sougou&sign=ad225ec02942c79bdb710e3ad0cf1b43&nonce_str=1510555032"}, {"myword": "platform=2&vid=&vu=caea966558&chan=&sign=ad225ec02942c79bdb710e3ad0cf1b43&nonce_str=1510555032"}, {"myword": "/index/mytest?platform=2&vid=&vu=caea966558&chan=&sign=ad225ec02942c79bdb710e3ad0cf1b43&nonce_str=1510555032"}, + {"myword": "/index/mytest1"}, + {"myword": "http://10.100.0.1/index/mytest"}, }) assert.NoError(t, err) exp := []Data{ @@ -45,18 +47,23 @@ func TestParamTransformer(t *testing.T) { "myword_nonce_str": "1510555032", "myword_url_param_path": "/index/mytest", }, + { + "myword": "/index/mytest1", + "myword_url_param_path": "/index/mytest1", + }, + { + "myword": "http://10.100.0.1/index/mytest", + "myword_url_param_path": "/index/mytest", + "myword_url_param_host": "10.100.0.1", + }, } assert.Equal(t, len(exp), len(data)) for i, ex := range exp { da := data[i] - for k, e := range ex { - d, exist := da[k] - assert.Equal(t, true, exist) - assert.Equal(t, e, d) - } + assert.Equal(t, ex, da) } assert.Equal(t, par.Stage(), transforms.StageAfterParser) - assert.Equal(t, StatsInfo{Success: 3}, par.stats) + assert.Equal(t, StatsInfo{Success: 5}, par.stats) } func TestParamTransformerError(t *testing.T) { From 0d904625384b65257ed97947f5da4b88e3bc8cfb Mon Sep 17 00:00:00 2001 From: wonderflow Date: Fri, 20 Jul 2018 16:54:28 +0800 Subject: [PATCH 2/2] roll back --- reader/kafka/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reader/kafka/kafka.go b/reader/kafka/kafka.go index b16b8fab5..bab07de03 100644 --- a/reader/kafka/kafka.go +++ b/reader/kafka/kafka.go @@ -81,7 +81,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) { config := consumergroup.NewConfig() config.Zookeeper.Chroot = kr.ZookeeperChroot config.Zookeeper.Timeout = kr.ZookeeperTimeout - //config.Consumer.Return.Errors = true //怀疑有bug,会block 读取,先注释 + config.Consumer.Return.Errors = true /********************* kafka offset *************************/ /* 这里设定的offset不影响原有的offset,因为kafka client会去获取 */