Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zk name service #42

Merged
merged 10 commits into from
Apr 17, 2014
80 changes: 80 additions & 0 deletions go/executil/executil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// +build linux

// Only build on Linux, since the use of procfs is platform specific.
package executil

import (
"bytes"
"fmt"
"io/ioutil"
"strconv"
)

// Incomplete interpretation of the /proc/pid/stat file.
type procStat struct {
pid int
cmd string
state string
ppid int
pgrp int
}

func readProcStats(pid int) (*procStat, error) {
fname := fmt.Sprintf("/proc/%v/stat", pid)
data, err := ioutil.ReadFile(fname)
if err != nil {
return nil, err
}

i := bytes.Index(data, []byte(" ("))
j := bytes.Index(data, []byte(") "))
stats := procStat{}
stats.pid, err = strconv.Atoi(string(data[:i]))
if err != nil {
return nil, fmt.Errorf("invalid pid in %v %v", fname, err)
}
stats.cmd = string(data[i+2 : j])
fields := string(data[j+2:])
_, err = fmt.Sscanf(fields, "%s %d %d", &stats.state, &stats.ppid, &stats.pgrp)
if err != nil {
return nil, fmt.Errorf("invalid scan in %v %v \"%v\"", fname, err, fields)
}
return &stats, nil
}

// Is there no better way to scan child / group processes?
// For now, we have to load everything we can read see if it
// matches.
func readProcGroup(pgrp int) ([]*procStat, error) {
dirEntries, err := ioutil.ReadDir("/proc")
if err != nil {
return nil, err
}
groupStats := make([]*procStat, 0, 256)
for _, ent := range dirEntries {
if pid, err := strconv.Atoi(ent.Name()); err == nil {
pidStats, err := readProcStats(pid)
if err != nil {
return nil, err
}
if pidStats.pgrp == pgrp {
groupStats = append(groupStats, pidStats)
}
}
}
return groupStats, nil
}

// GetPgrpPids return a list of all pids in a given process group.
// Not as cheap as you think, you have to scan all the pids on the system.
func GetPgrpPids(pgrp int) ([]int, error) {
stats, err := readProcGroup(pgrp)
if err != nil {
return nil, err
}
pids := make([]int, len(stats))
for i, st := range stats {
pids[i] = st.pid
}
return pids, nil
}
10 changes: 8 additions & 2 deletions go/netutil/netutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ import (
"sort"
"strconv"
"strings"
"time"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// byPriorityWeight sorts records by ascending priority and weight.
type byPriorityWeight []*net.SRV

Expand All @@ -31,17 +36,18 @@ func (s byPriorityWeight) Less(i, j int) bool {

// shuffleByWeight shuffles SRV records by weight using the algorithm
// described in RFC 2782.
// NOTE(msolo) This is disabled when the weights are zero.
func (addrs byPriorityWeight) shuffleByWeight() {
sum := 0
for _, addr := range addrs {
sum += int(addr.Weight)
}
for sum > 0 && len(addrs) > 1 {
s := 0
n := rand.Intn(sum + 1)
n := rand.Intn(sum)
for i := range addrs {
s += int(addrs[i].Weight)
if s >= n {
if s > n {
if i > 0 {
t := addrs[i]
copy(addrs[1:i+1], addrs[0:i])
Expand Down
66 changes: 66 additions & 0 deletions go/netutil/netutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package netutil

import (
"math/rand"
"net"
"testing"
)

func checkDistribution(t *testing.T, data []*net.SRV, margin float64) {
sum := 0
for _, srv := range data {
sum += int(srv.Weight)
}

results := make(map[string]int)

count := 1000
for j := 0; j < count; j++ {
d := make([]*net.SRV, len(data))
copy(d, data)
byPriorityWeight(d).shuffleByWeight()
key := d[0].Target
results[key] = results[key] + 1
}

actual := results[data[0].Target]
expected := float64(count) * float64(data[0].Weight) / float64(sum)
diff := float64(actual) - expected
t.Logf("actual: %v diff: %v e: %v m: %v", actual, diff, expected, margin)
if diff < 0 {
diff = -diff
}
if diff > (expected * margin) {
t.Errorf("missed target weight: expected %v, %v", expected, actual)
}
}

func testUniformity(t *testing.T, size int, margin float64) {
rand.Seed(1)
data := make([]*net.SRV, size)
for i := 0; i < size; i++ {
data[i] = &net.SRV{Target: string('a' + i), Weight: 1}
}
checkDistribution(t, data, margin)
}

func TestUniformity(t *testing.T) {
testUniformity(t, 2, 0.05)
testUniformity(t, 3, 0.10)
testUniformity(t, 10, 0.20)
testWeighting(t, 0.05)
}

func testWeighting(t *testing.T, margin float64) {
rand.Seed(1)
data := []*net.SRV{
&net.SRV{Target: "a", Weight: 60},
&net.SRV{Target: "b", Weight: 30},
&net.SRV{Target: "c", Weight: 10},
}
checkDistribution(t, data, margin)
}

func TestWeighting(t *testing.T) {
testWeighting(t, 0.05)
}
11 changes: 7 additions & 4 deletions go/zk/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ const (
)

var (
zkConfigPaths = []string{"/etc/zookeeper/zk_client.json"}
// DefaultZkConfigPaths is the default list of config files to check.
DefaultZkConfigPaths = []string{"/etc/zookeeper/zk_client.json"}
// MagicPrefix is the Default name for the root note in the zookeeper tree.
MagicPrefix = "zk"

localCell = flag.String("zk.local-cell", "", "closest zk cell used for /zk/local paths")
localAddrs = flag.String("zk.local-addrs", "", "list of zookeeper servers (host:port, ...)")
Expand Down Expand Up @@ -61,8 +64,8 @@ func ZkCellFromZkPath(zkPath string) (string, error) {
if len(pathParts) < 3 {
return "", fmt.Errorf("no cell name in path: %v", zkPath)
}
if pathParts[0] != "" || pathParts[1] != "zk" {
return "", fmt.Errorf("path should start with /zk/: %v", zkPath)
if pathParts[0] != "" || pathParts[1] != MagicPrefix {
return "", fmt.Errorf("path should start with /%v: %v", MagicPrefix, zkPath)
}
cell := pathParts[2]
if strings.Contains(cell, "-") {
Expand All @@ -76,7 +79,7 @@ func getConfigPaths() []string {
if zkConfigPath != "" {
return []string{zkConfigPath}
}
return zkConfigPaths
return DefaultZkConfigPaths
}

func getCellAddrMap() map[string]string {
Expand Down
5 changes: 5 additions & 0 deletions go/zk/metaconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func (conn *MetaConn) GetW(path string) (data string, stat Stat, watch <-chan zo
}

func (conn *MetaConn) Children(path string) (children []string, stat Stat, err error) {
if path == ("/" + MagicPrefix) {
// NOTE(msolo) There is a slight hack there - but there really is
// no valid stat for the top level path.
return ZkKnownCells(false), nil, nil
}
var zconn Conn
for i := 0; i < maxAttempts; i++ {
zconn, err = conn.connCache.ConnForPath(path)
Expand Down
Loading