Skip to content

Commit

Permalink
Merge pull request #42 from msolo/zk-name-service
Browse files Browse the repository at this point in the history
Zk name service
  • Loading branch information
alainjobart committed Apr 17, 2014
2 parents 2155bd2 + 13f4f3a commit 49fff35
Show file tree
Hide file tree
Showing 9 changed files with 1,001 additions and 42 deletions.
80 changes: 80 additions & 0 deletions go/executil/executil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// +build linux

// Only build on Linux, since the use of procfs is platform specific.
package executil

import (
"bytes"
"fmt"
"io/ioutil"
"strconv"
)

// Incomplete interpretation of the /proc/pid/stat file.
type procStat struct {
pid int
cmd string
state string
ppid int
pgrp int
}

func readProcStats(pid int) (*procStat, error) {
fname := fmt.Sprintf("/proc/%v/stat", pid)
data, err := ioutil.ReadFile(fname)
if err != nil {
return nil, err
}

i := bytes.Index(data, []byte(" ("))
j := bytes.Index(data, []byte(") "))
stats := procStat{}
stats.pid, err = strconv.Atoi(string(data[:i]))
if err != nil {
return nil, fmt.Errorf("invalid pid in %v %v", fname, err)
}
stats.cmd = string(data[i+2 : j])
fields := string(data[j+2:])
_, err = fmt.Sscanf(fields, "%s %d %d", &stats.state, &stats.ppid, &stats.pgrp)
if err != nil {
return nil, fmt.Errorf("invalid scan in %v %v \"%v\"", fname, err, fields)
}
return &stats, nil
}

// Is there no better way to scan child / group processes?
// For now, we have to load everything we can read see if it
// matches.
func readProcGroup(pgrp int) ([]*procStat, error) {
dirEntries, err := ioutil.ReadDir("/proc")
if err != nil {
return nil, err
}
groupStats := make([]*procStat, 0, 256)
for _, ent := range dirEntries {
if pid, err := strconv.Atoi(ent.Name()); err == nil {
pidStats, err := readProcStats(pid)
if err != nil {
return nil, err
}
if pidStats.pgrp == pgrp {
groupStats = append(groupStats, pidStats)
}
}
}
return groupStats, nil
}

// GetPgrpPids return a list of all pids in a given process group.
// Not as cheap as you think, you have to scan all the pids on the system.
func GetPgrpPids(pgrp int) ([]int, error) {
stats, err := readProcGroup(pgrp)
if err != nil {
return nil, err
}
pids := make([]int, len(stats))
for i, st := range stats {
pids[i] = st.pid
}
return pids, nil
}
10 changes: 8 additions & 2 deletions go/netutil/netutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ import (
"sort"
"strconv"
"strings"
"time"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// byPriorityWeight sorts records by ascending priority and weight.
type byPriorityWeight []*net.SRV

Expand All @@ -31,17 +36,18 @@ func (s byPriorityWeight) Less(i, j int) bool {

// shuffleByWeight shuffles SRV records by weight using the algorithm
// described in RFC 2782.
// NOTE(msolo) This is disabled when the weights are zero.
func (addrs byPriorityWeight) shuffleByWeight() {
sum := 0
for _, addr := range addrs {
sum += int(addr.Weight)
}
for sum > 0 && len(addrs) > 1 {
s := 0
n := rand.Intn(sum + 1)
n := rand.Intn(sum)
for i := range addrs {
s += int(addrs[i].Weight)
if s >= n {
if s > n {
if i > 0 {
t := addrs[i]
copy(addrs[1:i+1], addrs[0:i])
Expand Down
66 changes: 66 additions & 0 deletions go/netutil/netutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package netutil

import (
"math/rand"
"net"
"testing"
)

func checkDistribution(t *testing.T, data []*net.SRV, margin float64) {
sum := 0
for _, srv := range data {
sum += int(srv.Weight)
}

results := make(map[string]int)

count := 1000
for j := 0; j < count; j++ {
d := make([]*net.SRV, len(data))
copy(d, data)
byPriorityWeight(d).shuffleByWeight()
key := d[0].Target
results[key] = results[key] + 1
}

actual := results[data[0].Target]
expected := float64(count) * float64(data[0].Weight) / float64(sum)
diff := float64(actual) - expected
t.Logf("actual: %v diff: %v e: %v m: %v", actual, diff, expected, margin)
if diff < 0 {
diff = -diff
}
if diff > (expected * margin) {
t.Errorf("missed target weight: expected %v, %v", expected, actual)
}
}

func testUniformity(t *testing.T, size int, margin float64) {
rand.Seed(1)
data := make([]*net.SRV, size)
for i := 0; i < size; i++ {
data[i] = &net.SRV{Target: string('a' + i), Weight: 1}
}
checkDistribution(t, data, margin)
}

func TestUniformity(t *testing.T) {
testUniformity(t, 2, 0.05)
testUniformity(t, 3, 0.10)
testUniformity(t, 10, 0.20)
testWeighting(t, 0.05)
}

func testWeighting(t *testing.T, margin float64) {
rand.Seed(1)
data := []*net.SRV{
&net.SRV{Target: "a", Weight: 60},
&net.SRV{Target: "b", Weight: 30},
&net.SRV{Target: "c", Weight: 10},
}
checkDistribution(t, data, margin)
}

func TestWeighting(t *testing.T) {
testWeighting(t, 0.05)
}
11 changes: 7 additions & 4 deletions go/zk/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ const (
)

var (
zkConfigPaths = []string{"/etc/zookeeper/zk_client.json"}
// DefaultZkConfigPaths is the default list of config files to check.
DefaultZkConfigPaths = []string{"/etc/zookeeper/zk_client.json"}
// MagicPrefix is the Default name for the root note in the zookeeper tree.
MagicPrefix = "zk"

localCell = flag.String("zk.local-cell", "", "closest zk cell used for /zk/local paths")
localAddrs = flag.String("zk.local-addrs", "", "list of zookeeper servers (host:port, ...)")
Expand Down Expand Up @@ -61,8 +64,8 @@ func ZkCellFromZkPath(zkPath string) (string, error) {
if len(pathParts) < 3 {
return "", fmt.Errorf("no cell name in path: %v", zkPath)
}
if pathParts[0] != "" || pathParts[1] != "zk" {
return "", fmt.Errorf("path should start with /zk/: %v", zkPath)
if pathParts[0] != "" || pathParts[1] != MagicPrefix {
return "", fmt.Errorf("path should start with /%v: %v", MagicPrefix, zkPath)
}
cell := pathParts[2]
if strings.Contains(cell, "-") {
Expand All @@ -76,7 +79,7 @@ func getConfigPaths() []string {
if zkConfigPath != "" {
return []string{zkConfigPath}
}
return zkConfigPaths
return DefaultZkConfigPaths
}

func getCellAddrMap() map[string]string {
Expand Down
5 changes: 5 additions & 0 deletions go/zk/metaconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func (conn *MetaConn) GetW(path string) (data string, stat Stat, watch <-chan zo
}

func (conn *MetaConn) Children(path string) (children []string, stat Stat, err error) {
if path == ("/" + MagicPrefix) {
// NOTE(msolo) There is a slight hack there - but there really is
// no valid stat for the top level path.
return ZkKnownCells(false), nil, nil
}
var zconn Conn
for i := 0; i < maxAttempts; i++ {
zconn, err = conn.connCache.ConnForPath(path)
Expand Down
Loading

0 comments on commit 49fff35

Please sign in to comment.