Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/swarm/swarm-smoke: refactor generateEndpoints #19006

Merged
merged 5 commits into from
Feb 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 40 additions & 27 deletions cmd/swarm/swarm-smoke/feed_upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"bytes"
"crypto/md5"
crand "crypto/rand"
"fmt"
"io"
"io/ioutil"
Expand All @@ -16,7 +15,9 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/testutil"
"github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1"
)
Expand All @@ -25,13 +26,28 @@ const (
feedRandomDataLength = 8
)

// TODO: retrieve with manifest + extract repeating code
func feedUploadAndSync(c *cli.Context) error {
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now())
func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error {
errc := make(chan error)

generateEndpoints(scheme, cluster, appName, from, to)
go func() {
errc <- feedUploadAndSync(ctx, tuid)
}()

log.Info("generating and uploading feeds to " + endpoints[0] + " and syncing")
select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)

return fmt.Errorf("timeout after %v sec", timeout)
}
}

func feedUploadAndSync(c *cli.Context, tuid string) error {
log.Info("generating and uploading feeds to " + httpEndpoint(hosts[0]) + " and syncing")

// create a random private key to sign updates with and derive the address
pkFile, err := ioutil.TempFile("", "swarm-feed-smoke-test")
Expand Down Expand Up @@ -85,7 +101,7 @@ func feedUploadAndSync(c *cli.Context) error {

// create feed manifest, topic only
var out bytes.Buffer
cmd := exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--user", userHex)
cmd := exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--user", userHex)
cmd.Stdout = &out
log.Debug("create feed manifest topic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -100,7 +116,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// create feed manifest, subtopic only
cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--name", subTopicHex, "--user", userHex)
cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--name", subTopicHex, "--user", userHex)
cmd.Stdout = &out
log.Debug("create feed manifest subtopic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -115,7 +131,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// create feed manifest, merged topic
cmd = exec.Command("swarm", "--bzzapi", endpoints[0], "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex)
cmd = exec.Command("swarm", "--bzzapi", httpEndpoint(hosts[0]), "feed", "create", "--topic", topicHex, "--name", subTopicHex, "--user", userHex)
cmd.Stdout = &out
log.Debug("create feed manifest mergetopic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -141,7 +157,7 @@ func feedUploadAndSync(c *cli.Context) error {
dataHex := hexutil.Encode(data)

// update with topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, dataHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, dataHex)
cmd.Stdout = &out
log.Debug("update feed manifest topic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -152,7 +168,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// update with subtopic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, dataHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, dataHex)
cmd.Stdout = &out
log.Debug("update feed manifest subtopic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -163,7 +179,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// update with merged topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, dataHex)
cmd.Stdout = &out
log.Debug("update feed manifest merged topic cmd", "cmd", cmd)
err = cmd.Run()
Expand All @@ -177,36 +193,33 @@ func feedUploadAndSync(c *cli.Context) error {

// retrieve the data
wg := sync.WaitGroup{}
for _, endpoint := range endpoints {
for _, host := range hosts {
holisticode marked this conversation as resolved.
Show resolved Hide resolved
// raw retrieve, topic only
for _, hex := range []string{topicHex, subTopicOnlyHex, mergedSubTopicHex} {
wg.Add(1)
ruid := uuid.New()[:8]
go func(hex string, endpoint string, ruid string) {
for {
err := fetchFeed(hex, userHex, endpoint, dataHash, ruid)
err := fetchFeed(hex, userHex, httpEndpoint(host), dataHash, ruid)
if err != nil {
continue
}

wg.Done()
return
}
}(hex, endpoint, ruid)

}(hex, httpEndpoint(host), ruid)
}
}
wg.Wait()
log.Info("all endpoints synced random data successfully")

// upload test file
seed := int(time.Now().UnixNano() / 1e6)
log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed)
log.Info("feed uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)

h = md5.New()
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)
randomBytes := testutil.RandomBytes(seed, filesize*1000)

hash, err := upload(r, filesize*1000, endpoints[0])
hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
if err != nil {
return err
}
Expand All @@ -220,7 +233,7 @@ func feedUploadAndSync(c *cli.Context) error {
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash))

// update file with topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, multihashHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, multihashHex)
cmd.Stdout = &out
err = cmd.Run()
if err != nil {
Expand All @@ -230,7 +243,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// update file with subtopic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--name", subTopicHex, multihashHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--name", subTopicHex, multihashHex)
cmd.Stdout = &out
err = cmd.Run()
if err != nil {
Expand All @@ -240,7 +253,7 @@ func feedUploadAndSync(c *cli.Context) error {
out.Reset()

// update file with merged topic
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", endpoints[0], "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex)
cmd = exec.Command("swarm", "--bzzaccount", pkFile.Name(), "--bzzapi", httpEndpoint(hosts[0]), "feed", "update", "--topic", topicHex, "--name", subTopicHex, multihashHex)
cmd.Stdout = &out
err = cmd.Run()
if err != nil {
Expand All @@ -251,23 +264,23 @@ func feedUploadAndSync(c *cli.Context) error {

time.Sleep(3 * time.Second)

for _, endpoint := range endpoints {
for _, host := range hosts {
holisticode marked this conversation as resolved.
Show resolved Hide resolved

// manifest retrieve, topic only
for _, url := range []string{manifestWithTopic, manifestWithSubTopic, manifestWithMergedTopic} {
wg.Add(1)
ruid := uuid.New()[:8]
go func(url string, endpoint string, ruid string) {
for {
err := fetch(url, endpoint, fileHash, ruid)
err := fetch(url, endpoint, fileHash, ruid, "")
if err != nil {
continue
}

wg.Done()
return
}
}(url, endpoint, ruid)
}(url, httpEndpoint(host), ruid)
}

}
Expand Down
70 changes: 25 additions & 45 deletions cmd/swarm/swarm-smoke/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,15 @@ var (
)

var (
endpoints []string
includeLocalhost bool
cluster string
appName string
scheme string
filesize int
syncDelay int
from int
to int
verbosity int
timeout int
single bool
allhosts string
hosts []string
filesize int
syncDelay int
httpPort int
wsPort int
verbosity int
timeout int
single bool
)

func main() {
Expand All @@ -59,39 +56,22 @@ func main() {

app.Flags = []cli.Flag{
cli.StringFlag{
Name: "cluster-endpoint",
Value: "prod",
Usage: "cluster to point to (prod or a given namespace)",
Destination: &cluster,
},
cli.StringFlag{
Name: "app",
Value: "swarm",
Usage: "application to point to (swarm or swarm-private)",
Destination: &appName,
Name: "hosts",
Value: "",
Usage: "comma-separated list of swarm hosts",
Destination: &allhosts,
},
cli.IntFlag{
Name: "cluster-from",
Value: 8501,
Usage: "swarm node (from)",
Destination: &from,
Name: "http-port",
Value: 80,
Usage: "http port",
Destination: &httpPort,
},
cli.IntFlag{
Name: "cluster-to",
Value: 8512,
Usage: "swarm node (to)",
Destination: &to,
},
cli.StringFlag{
Name: "cluster-scheme",
Value: "http",
Usage: "http or https",
Destination: &scheme,
},
cli.BoolFlag{
Name: "include-localhost",
Usage: "whether to include localhost:8500 as an endpoint",
Destination: &includeLocalhost,
Name: "ws-port",
Value: 8546,
Usage: "ws port",
Destination: &wsPort,
},
cli.IntFlag{
Name: "filesize",
Expand Down Expand Up @@ -140,25 +120,25 @@ func main() {
Name: "upload_and_sync",
Aliases: []string{"c"},
Usage: "upload and sync",
Action: wrapCliCommand("upload-and-sync", true, uploadAndSync),
Action: wrapCliCommand("upload-and-sync", uploadAndSyncCmd),
},
{
Name: "feed_sync",
Aliases: []string{"f"},
Usage: "feed update generate, upload and sync",
Action: wrapCliCommand("feed-and-sync", true, feedUploadAndSync),
Action: wrapCliCommand("feed-and-sync", feedUploadAndSyncCmd),
},
{
Name: "upload_speed",
Aliases: []string{"u"},
Usage: "measure upload speed",
Action: wrapCliCommand("upload-speed", true, uploadSpeed),
Action: wrapCliCommand("upload-speed", uploadSpeedCmd),
},
{
Name: "sliding_window",
Aliases: []string{"s"},
Usage: "measure network aggregate capacity",
Action: wrapCliCommand("sliding-window", false, slidingWindow),
Action: wrapCliCommand("sliding-window", slidingWindowCmd),
},
}

Expand Down
Loading