Skip to content

Commit

Permalink
优化ip transformer 性能
Browse files Browse the repository at this point in the history
  • Loading branch information
wonderflow committed Jul 22, 2018
1 parent 3d29834 commit 19a9546
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 68 deletions.
17 changes: 5 additions & 12 deletions reader/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func TestGetTags(t *testing.T) {
assert.Equal(t, exp, tags)
}

func TestSetMapValueWithPrefix(t *testing.T) {
func TestSetMapValueExistWithPrefix(t *testing.T) {
data1 := map[string]interface{}{
"a": "b",
}
err1 := SetMapValueWithPrefix(data1, "newVal", "prefix", false, "a")
err1 := SetMapValueExistWithPrefix(data1, "newVal", "prefix", "a")
assert.NoError(t, err1)
exp1 := map[string]interface{}{
"a": "b",
Expand All @@ -154,7 +154,7 @@ func TestSetMapValueWithPrefix(t *testing.T) {
"age": 45,
},
}
err2 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "name"}...)
err2 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "name"}...)
assert.NoError(t, err2)
exp2 := map[string]interface{}{
"a": map[string]interface{}{
Expand All @@ -165,10 +165,10 @@ func TestSetMapValueWithPrefix(t *testing.T) {
}
assert.Equal(t, exp2, data2)

err3 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"xy", "name"}...)
err3 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"xy", "name"}...)
assert.Error(t, err3)

err4 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "hello"}...)
err4 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "hello"}...)
assert.NoError(t, err4)
exp4 := map[string]interface{}{
"a": map[string]interface{}{
Expand All @@ -180,11 +180,4 @@ func TestSetMapValueWithPrefix(t *testing.T) {
}
assert.Equal(t, exp4, data2)

data5 := map[string]interface{}{}
err5 := SetMapValueWithPrefix(data5, "newVal", "prefix", true, "a")
assert.NoError(t, err5)
exp5 := map[string]interface{}{
"prefix_a": "newVal",
}
assert.Equal(t, exp5, data5)
}
4 changes: 3 additions & 1 deletion sender/pandora/pandora.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Sender struct {
microsecondCounter uint64
extraInfo map[string]string
sendType string
keyCache map[string]KeyInfo
}

// UserSchema was parsed pandora schema from user's raw schema
Expand Down Expand Up @@ -431,6 +432,7 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) {
schemas: make(map[string]pipeline.RepoSchemaEntry),
extraInfo: utilsos.GetExtraInfo(),
sendType: opt.sendType,
keyCache: make(map[string]KeyInfo),
}

expandAttr := make([]string, 0)
Expand Down Expand Up @@ -851,7 +853,7 @@ func (s *Sender) Send(datas []Data) (se error) {
return s.rawSend(datas)
default:
for i, v := range datas {
datas[i] = DeepConvertKey(v)
datas[i] = DeepConvertKeyWithCache(v, s.keyCache)
}
return s.schemaFreeSend(datas)
}
Expand Down
127 changes: 102 additions & 25 deletions transforms/ip/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,18 @@ type Transformer struct {
DataPath string `json:"data_path"`
KeyAsPrefix bool `json:"key_as_prefix"`

loc Locator
stats StatsInfo
loc Locator
keys []string
lastEleKey string
keysRegion []string
keysCity []string
keysCountry []string
keysIsp []string
keysCountryCode []string
keysLatitude []string
keysLongitude []string
keysDistrictCode []string
stats StatsInfo
}

func (t *Transformer) Init() error {
Expand All @@ -43,9 +53,32 @@ func (t *Transformer) Init() error {
return fmt.Errorf("new locator: %v", err)
}
t.loc = loc
t.keys = GetKeys(t.Key)

newKeys := make([]string, len(t.keys))
copy(newKeys, t.keys)
t.lastEleKey = t.keys[len(t.keys)-1]
t.keysRegion = generateKeys(t.keys, Region, t.KeyAsPrefix)
t.keysCity = generateKeys(t.keys, City, t.KeyAsPrefix)
t.keysCountry = generateKeys(t.keys, Country, t.KeyAsPrefix)
t.keysIsp = generateKeys(t.keys, Isp, t.KeyAsPrefix)
t.keysCountryCode = generateKeys(t.keys, CountryCode, t.KeyAsPrefix)
t.keysLatitude = generateKeys(t.keys, Latitude, t.KeyAsPrefix)
t.keysLongitude = generateKeys(t.keys, Longitude, t.KeyAsPrefix)
t.keysDistrictCode = generateKeys(t.keys, DistrictCode, t.KeyAsPrefix)
return nil
}

func generateKeys(keys []string, lastEle string, keyAsPrefix bool) []string {
newKeys := make([]string, len(keys))
copy(newKeys, keys)
if keyAsPrefix {
lastEle = keys[len(keys)-1] + "_" + lastEle
}
newKeys[len(keys)-1] = lastEle
return newKeys
}

func (_ *Transformer) RawTransform(datas []string) ([]string, error) {
return datas, errors.New("IP transformer not support rawTransform")
}
Expand All @@ -54,18 +87,15 @@ func (t *Transformer) Transform(datas []Data) ([]Data, error) {
var err, fmtErr error
errNum := 0
if t.loc == nil {
loc, err := NewLocator(t.DataPath)
err := t.Init()
if err != nil {
t.stats, _ = transforms.SetStatsInfo(err, t.stats, int64(errNum), int64(len(datas)), t.Type())
return datas, err
}
t.loc = loc
}
keys := GetKeys(t.Key)
newKeys := make([]string, len(keys))
newKeys := make([]string, len(t.keys))
for i := range datas {
copy(newKeys, keys)
val, getErr := GetMapValue(datas[i], keys...)
copy(newKeys, t.keys)
val, getErr := GetMapValue(datas[i], t.keys...)
if getErr != nil {
errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, t.Key)
continue
Expand All @@ -81,36 +111,83 @@ func (t *Transformer) Transform(datas []Data) ([]Data, error) {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
continue
}
newKeys[len(newKeys)-1] = Region
SetMapValueWithPrefix(datas[i], info.Region, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
newKeys[len(newKeys)-1] = City
SetMapValueWithPrefix(datas[i], info.City, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
newKeys[len(newKeys)-1] = Country
SetMapValueWithPrefix(datas[i], info.Country, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
newKeys[len(newKeys)-1] = Isp
SetMapValueWithPrefix(datas[i], info.Isp, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
findErr = t.SetMapValue(datas[i], info.Region, t.keysRegion...)
if findErr != nil {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
}
findErr = t.SetMapValue(datas[i], info.City, t.keysCity...)
if findErr != nil {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
}
findErr = t.SetMapValue(datas[i], info.Country, t.keysCountry...)
if findErr != nil {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
}
findErr = t.SetMapValue(datas[i], info.Isp, t.keysIsp...)
if findErr != nil {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
}
if info.CountryCode != "" {
newKeys[len(newKeys)-1] = CountryCode
SetMapValueWithPrefix(datas[i], info.CountryCode, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
findErr = t.SetMapValue(datas[i], info.CountryCode, t.keysCountryCode...)
if findErr != nil {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
}
}
if info.Latitude != "" {
newKeys[len(newKeys)-1] = Latitude
SetMapValueWithPrefix(datas[i], info.Latitude, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
findErr = t.SetMapValue(datas[i], info.Latitude, t.keysLatitude...)
if findErr != nil {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
}
}
if info.Longitude != "" {
newKeys[len(newKeys)-1] = Longitude
SetMapValueWithPrefix(datas[i], info.Longitude, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
findErr = t.SetMapValue(datas[i], info.Longitude, t.keysLongitude...)
if findErr != nil {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
}
}
if info.DistrictCode != "" {
newKeys[len(newKeys)-1] = DistrictCode
SetMapValueWithPrefix(datas[i], info.DistrictCode, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
findErr = t.SetMapValue(datas[i], info.DistrictCode, t.keysDistrictCode...)
if findErr != nil {
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
}
}
}

t.stats, fmtErr = transforms.SetStatsInfo(err, t.stats, int64(errNum), int64(len(datas)), t.Type())
return datas, fmtErr
}

//通过层级key设置value值, 如果keys不存在则不加前缀,否则加前缀
func (t *Transformer) SetMapValue(m map[string]interface{}, val interface{}, keys ...string) error {
if len(keys) == 0 {
return nil
}
var curr map[string]interface{}
curr = m
for _, k := range keys[0 : len(keys)-1] {
finalVal, ok := curr[k]
if !ok {
n := make(map[string]interface{})
curr[k] = n
curr = n
continue
}
//判断val是否为map[string]interface{}类型
if curr, ok = finalVal.(map[string]interface{}); ok {
continue
}
return fmt.Errorf("SetMapValueWithPrefix failed, %v is not the type of map[string]interface{}", keys)
}
//判断val(k)是否存在
_, exist := curr[keys[len(keys)-1]]
if exist {
curr[t.lastEleKey+"_"+keys[len(keys)-1]] = val
} else {
curr[keys[len(keys)-1]] = val
}
return nil
}

func (_ *Transformer) Description() string {
//return "transform ip to country region and isp"
return "获取IP的区域、国家、城市和运营商信息"
Expand Down
31 changes: 31 additions & 0 deletions transforms/ip/ip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestTransformer(t *testing.T) {
Key: "ip",
DataPath: "./test_data/17monipdb.dat",
}
assert.Nil(t, ipt.Init())
data, err := ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
assert.Error(t, err)
exp := []Data{{
Expand Down Expand Up @@ -65,6 +66,7 @@ func TestTransformer(t *testing.T) {
Key: "multi.ip",
DataPath: "./test_data/17monipdb.dat",
}
assert.Nil(t, ipt.Init())
data2, err2 := ipt2.Transform([]Data{{"multi": map[string]interface{}{"ip": "111.2.3.4"}}, {"multi": map[string]interface{}{"ip": "x.x.x.x"}}})
assert.Error(t, err2)
exp2 := []Data{{
Expand Down Expand Up @@ -173,6 +175,35 @@ func TestTransformer(t *testing.T) {
assert.Len(t, locatorStore.locators, 2)
}

var dttest []Data

//old: 1000000 1152 ns/op 432 B/op 16 allocs/op
//new: 2000000 621 ns/op 232 B/op 7 allocs/op
func BenchmarkIpTrans(b *testing.B) {
b.ReportAllocs()
ipt4 := &Transformer{
Key: "multi.ip2",
DataPath: "./test_data/17monipdb.dat",
KeyAsPrefix: true,
}
ipt4.Init()
data := []Data{
{
"multi": map[string]interface{}{
"ip": "111.2.3.4",
"Region": "浙江",
"City": "宁波",
"Country": "中国",
"Isp": "N/A",
"ip2": "183.251.28.250",
},
},
}
for i := 0; i < b.N; i++ {
dttest, _ = ipt4.Transform(data)
}
}

func Test_badData(t *testing.T) {
ipt := &Transformer{
Key: "ip",
Expand Down
10 changes: 8 additions & 2 deletions transforms/mutate/pandorakey_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ var (

type PandoraKeyConvert struct {
stats StatsInfo
cache map[string]KeyInfo
}

func (g *PandoraKeyConvert) Init() error {
g.cache = make(map[string]KeyInfo)
return nil
}
func (g *PandoraKeyConvert) RawTransform(datas []string) ([]string, error) {
return datas, errors.New("pandora_key_convert transformer not support rawTransform")
}

func (g *PandoraKeyConvert) Transform(datas []Data) ([]Data, error) {
for i, v := range datas {
datas[i] = DeepConvertKey(v)
datas[i] = DeepConvertKeyWithCache(v, g.cache)
//datas[i] = DeepConvertKey(v)
}

g.stats, _ = transforms.SetStatsInfo(nil, g.stats, 0, int64(len(datas)), g.Type())
Expand Down Expand Up @@ -63,6 +69,6 @@ func (g *PandoraKeyConvert) SetStats(err string) StatsInfo {

func init() {
transforms.Add("pandora_key_convert", func() transforms.Transformer {
return &PandoraKeyConvert{}
return &PandoraKeyConvert{cache: make(map[string]KeyInfo)}
})
}
20 changes: 20 additions & 0 deletions transforms/mutate/pandorakey_convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,23 @@ func TestDeepconvertkey(t *testing.T) {
exp = []Data{{"ts_ts2": map[string]interface{}{"K200": 1, "a_xs_1": 2}}}
assert.Equal(t, exp, got)
}

var got []Data

//old(没有cache):500000 2846 ns/op 2536 B/op 33 allocs/op
//new(cache): 500000 2249 ns/op 2392 B/op 17 allocs/op
func BenchmarkCache(b *testing.B) {
pandoraConvert := &PandoraKeyConvert{cache: make(map[string]KeyInfo)}
b.ReportAllocs()

for i := 0; i < b.N; i++ {
data := []Data{{"ts。ts2": "stamp1"}, {"ts-tes2/1.2": "stamp2"}}
got, _ = pandoraConvert.Transform(data)

data = []Data{{"ts。ts2": map[string]interface{}{"_xs1_2s.xs.1": 1, "a.xs.1": 2}}, {"ts- ": "stamp2"}}
got, _ = pandoraConvert.Transform(data)

data = []Data{{"ts。ts2": map[string]interface{}{"200": 1, "a.xs.1": 2}}}
got, _ = pandoraConvert.Transform(data)
}
}
Loading

0 comments on commit 19a9546

Please sign in to comment.