Skip to content

Commit

Permalink
Merge pull request #623 from wonderflow/urlparam
Browse files Browse the repository at this point in the history
add urlparam
  • Loading branch information
wonderflow authored Jul 20, 2018
2 parents a3cd663 + 0d90462 commit 942759c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
2 changes: 1 addition & 1 deletion reader/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
config := consumergroup.NewConfig()
config.Zookeeper.Chroot = kr.ZookeeperChroot
config.Zookeeper.Timeout = kr.ZookeeperTimeout
//config.Consumer.Return.Errors = true //怀疑有bug,会block 读取,先注释
config.Consumer.Return.Errors = true

/********************* kafka offset *************************/
/* 这里设定的offset不影响原有的offset,因为kafka client会去获取 */
Expand Down
25 changes: 23 additions & 2 deletions transforms/mutate/urlparam.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
. "github.com/qiniu/logkit/utils/models"
)

const urlParamPath = "url_param_path"
const (
urlParamPath = "url_param_path"
urlParamHost = "url_param_host"
)

var (
_ transforms.StatsTransformer = &UrlParam{}
Expand All @@ -26,11 +29,29 @@ type UrlParam struct {

func (p *UrlParam) transformToMap(strVal string, key string) (map[string]interface{}, error) {
resultMap := make(map[string]interface{})
var urlPath string
if idx := strings.Index(strVal, "?"); idx != -1 {
if len(strVal[:idx]) != 0 {
resultMap[key+"_"+urlParamPath] = strVal[:idx]
urlPath = strVal[:idx]
}
strVal = strVal[idx+1:]
} else {
urlPath = strVal
}
if len(urlPath) > 0 {
uri, err := url.Parse(urlPath)
if err != nil {
return nil, err
}
if len(uri.Path) > 0 {
//如果同时满足不包含前缀`/`,还包含`&`,说明是个param
if strings.HasPrefix(uri.Path, "/") || !strings.Contains(uri.Path, "&") {
resultMap[key+"_"+urlParamPath] = uri.Path
}
}
if len(uri.Host) > 0 {
resultMap[key+"_"+urlParamHost] = uri.Host
}
}
if len(strVal) < 1 {
return resultMap, nil
Expand Down
19 changes: 13 additions & 6 deletions transforms/mutate/urlparam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func TestParamTransformer(t *testing.T) {
{"myword": "?platform=2&vid=372&vu=caea966558&chan=android_sougou&sign=ad225ec02942c79bdb710e3ad0cf1b43&nonce_str=1510555032"},
{"myword": "platform=2&vid=&vu=caea966558&chan=&sign=ad225ec02942c79bdb710e3ad0cf1b43&nonce_str=1510555032"},
{"myword": "/index/mytest?platform=2&vid=&vu=caea966558&chan=&sign=ad225ec02942c79bdb710e3ad0cf1b43&nonce_str=1510555032"},
{"myword": "/index/mytest1"},
{"myword": "http://10.100.0.1/index/mytest"},
})
assert.NoError(t, err)
exp := []Data{
Expand Down Expand Up @@ -45,18 +47,23 @@ func TestParamTransformer(t *testing.T) {
"myword_nonce_str": "1510555032",
"myword_url_param_path": "/index/mytest",
},
{
"myword": "/index/mytest1",
"myword_url_param_path": "/index/mytest1",
},
{
"myword": "http://10.100.0.1/index/mytest",
"myword_url_param_path": "/index/mytest",
"myword_url_param_host": "10.100.0.1",
},
}
assert.Equal(t, len(exp), len(data))
for i, ex := range exp {
da := data[i]
for k, e := range ex {
d, exist := da[k]
assert.Equal(t, true, exist)
assert.Equal(t, e, d)
}
assert.Equal(t, ex, da)
}
assert.Equal(t, par.Stage(), transforms.StageAfterParser)
assert.Equal(t, StatsInfo{Success: 3}, par.stats)
assert.Equal(t, StatsInfo{Success: 5}, par.stats)
}

func TestParamTransformerError(t *testing.T) {
Expand Down

0 comments on commit 942759c

Please sign in to comment.