From 544dc863b129578a0117dc907950d1b669ee085d Mon Sep 17 00:00:00 2001 From: zveinn Date: Mon, 15 Jul 2024 16:32:15 +0000 Subject: [PATCH 1/2] Adding: latency test, more control, more stats, etc.. squash removing hosts --- .github/workflows/vulncheck.yml | 2 +- .gitignore | 4 +- Dockerfile | 1 - README.md | 41 +- go.mod | 17 +- go.sum | 26 +- helm/hperf/templates/service.yaml | 2 - helm/hperf/templates/statefulset.yaml | 2 - hperf.yaml | 4 - main.go | 774 ++++++++++++++++++++++---- 10 files changed, 747 insertions(+), 126 deletions(-) diff --git a/.github/workflows/vulncheck.yml b/.github/workflows/vulncheck.yml index 96708dd..9c5ca5b 100644 --- a/.github/workflows/vulncheck.yml +++ b/.github/workflows/vulncheck.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ 1.21.9, 1.22.2 ] + go-version: [ 1.22.5 ] steps: - name: Check out code into the Go module directory uses: actions/checkout@v3 diff --git a/.gitignore b/.gitignore index 53c805b..7786ffe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ dist/ mperf +./nperf hperf -nperf +./hperf +deploy.sh diff --git a/Dockerfile b/Dockerfile index 782bf27..229ebd0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,6 @@ FROM scratch MAINTAINER MinIO Development "dev@min.io" EXPOSE 9999 -EXPOSE 10000 COPY ./hperf /hperf diff --git a/README.md b/README.md index 9cbbda5..cac3173 100644 --- a/README.md +++ b/README.md @@ -9,24 +9,43 @@ hperf is a tool for active measurements of the maximum achievable bandwidth betw ## Usecases - Calculate baseline RX/TX - Debug TOR Switch bottlenecks +- Calculate roundtrip MS for http requests ## Usage +Various configurations have been added for controling everything from payload size to http read/write buffers. All flags +can be seen via `-help`. + +Hperf can be used without any configuration, just run hperf on all the servers IP1 IP2 IP3 ... respectively. ``` -./hperf IP1 IP2 IP3 ... -... -Bandwidth: 1.2 GB/s RX | 1.0 GB/s TX -Bandwidth: 1.2 GB/s RX | 1.1 GB/s TX -Bandwidth: 1.2 GB/s RX | 990 MB/s TX -Bandwidth: 1.2 GB/s RX | 944 MB/s TX +./hperf -stream=false -hosts 10.10.1.{1...10} +┌────────────┬────────────┬───────┬──────────┬───────┬──────────┬─────────────────┬───────────────────┬──────┐ +│ Local │ Remote │ #RX │ RX │ #TX │ TX │ TX(ms) high/low │ TTFB(ms) high/low │ #Err │ +├────────────┼────────────┼───────┼──────────┼───────┼──────────┼─────────────────┼───────────────────┼──────┤ +│ 10.10.10.1 │ 10.10.10.6 │ 14927 │ 1.3 GB/s │ 10681 │ 1.2 GB/s │ 312 / 2 │ 13 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.2 │ 10880 │ 1.3 GB/s │ 18187 │ 1.2 GB/s │ 260 / 2 │ 13 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.3 │ 16804 │ 1.3 GB/s │ 17141 │ 1.2 GB/s │ 299 / 2 │ 13 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.4 │ 18670 │ 1.4 GB/s │ 18920 │ 1.3 GB/s │ 321 / 2 │ 10 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.5 │ 30070 │ 1.2 GB/s │ 29626 │ 1.3 GB/s │ 636 / 2 │ 10 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.7 │ 24031 │ 1.3 GB/s │ 27004 │ 1.3 GB/s │ 600 / 2 │ 16 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.8 │ 20844 │ 1.2 GB/s │ 21870 │ 1.2 GB/s │ 297 / 1 │ 13 / 0 │ 0 │ +└────────────┴────────────┴───────┴──────────┴───────┴──────────┴─────────────────┴───────────────────┴──────┘ ``` -on all the servers IP1 IP2 IP3 ... respectively. - -Default ports are `9999` and `10000` make sure your firewalls allow these ports. You may optionally configure `./hperf` to use custom ports as well, for example setting port `5001` would require opening up port `5002` as well. - +Default ports are `9999` make sure your firewalls allow this port. You may optionally configure `./hperf` to use a custom port as well `-port MYPORT` ``` -NPERF_PORT=5001 ./hperf IP1 IP2 IP3 ... +./hperf -port MYPORT -stream=false -hosts 10.10.1.{1...10} +┌────────────┬────────────┬───────┬──────────┬───────┬──────────┬─────────────────┬───────────────────┬──────┐ +│ Local │ Remote │ #RX │ RX │ #TX │ TX │ TX(ms) high/low │ TTFB(ms) high/low │ #Err │ +├────────────┼────────────┼───────┼──────────┼───────┼──────────┼─────────────────┼───────────────────┼──────┤ +│ 10.10.10.1 │ 10.10.10.6 │ 14927 │ 1.3 GB/s │ 10681 │ 1.2 GB/s │ 312 / 2 │ 13 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.2 │ 10880 │ 1.3 GB/s │ 18187 │ 1.2 GB/s │ 260 / 2 │ 13 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.3 │ 16804 │ 1.3 GB/s │ 17141 │ 1.2 GB/s │ 299 / 2 │ 13 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.4 │ 18670 │ 1.4 GB/s │ 18920 │ 1.3 GB/s │ 321 / 2 │ 10 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.5 │ 30070 │ 1.2 GB/s │ 29626 │ 1.3 GB/s │ 636 / 2 │ 10 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.7 │ 24031 │ 1.3 GB/s │ 27004 │ 1.3 GB/s │ 600 / 2 │ 16 / 0 │ 0 │ +│ 10.10.10.1 │ 10.10.10.8 │ 20844 │ 1.2 GB/s │ 21870 │ 1.2 GB/s │ 297 / 1 │ 13 / 0 │ 0 │ +└────────────┴────────────┴───────┴──────────┴───────┴──────────┴─────────────────┴───────────────────┴──────┘ ``` ## On k8s diff --git a/go.mod b/go.mod index d64896f..6a000f4 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,20 @@ module github.com/minio/hperf -go 1.19 +go 1.22.4 require ( + github.com/charmbracelet/lipgloss v0.11.1 github.com/dustin/go-humanize v1.0.1 - github.com/google/uuid v1.4.0 - golang.org/x/sys v0.19.0 + github.com/minio/pkg/v3 v3.0.7 + golang.org/x/sys v0.22.0 +) + +require ( + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/charmbracelet/x/ansi v0.1.3 // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/muesli/termenv v0.15.2 // indirect + github.com/rivo/uniseg v0.4.7 // indirect ) diff --git a/go.sum b/go.sum index f2964e9..28ed5ae 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,24 @@ +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/charmbracelet/lipgloss v0.11.1 h1:a8KgVPHa7kOoP95vm2tQQrjD2AKhbWmfr4uJ2RW6kNk= +github.com/charmbracelet/lipgloss v0.11.1/go.mod h1:beLlcmkF7MWA+5UrKKIRo/VJ21xGXr7YJ9miWfdMRIU= +github.com/charmbracelet/x/ansi v0.1.3 h1:RBh/eleNWML5R524mjUF0yVRePTwqN9tPtV+DPgO5Lw= +github.com/charmbracelet/x/ansi v0.1.3/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/minio/pkg/v3 v3.0.7 h1:1I2CbFKO+brioB6Pbnw0jLlFxo+YPy6hCTTXTSitgI8= +github.com/minio/pkg/v3 v3.0.7/go.mod h1:njlf539caYrgXqn/CXewqvkqBIMDTQo9oBBEL34LzY0= +github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= +github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/helm/hperf/templates/service.yaml b/helm/hperf/templates/service.yaml index ee475b1..ef6deca 100644 --- a/helm/hperf/templates/service.yaml +++ b/helm/hperf/templates/service.yaml @@ -10,7 +10,5 @@ spec: ports: - port: 9999 name: http1 - - port: 10000 - name: http2 selector: app: hperf diff --git a/helm/hperf/templates/statefulset.yaml b/helm/hperf/templates/statefulset.yaml index 9907d54..e2b5d06 100644 --- a/helm/hperf/templates/statefulset.yaml +++ b/helm/hperf/templates/statefulset.yaml @@ -38,8 +38,6 @@ spec: ports: - name: http1 containerPort: 9999 - - name: http2 - containerPort: 10000 {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/hperf.yaml b/hperf.yaml index 400dead..1871d4a 100644 --- a/hperf.yaml +++ b/hperf.yaml @@ -11,8 +11,6 @@ spec: ports: - port: 9999 name: http1 - - port: 10000 - name: http2 selector: app: hperf --- @@ -59,5 +57,3 @@ spec: ports: - name: http1 containerPort: 9999 - - name: http2 - containerPort: 10000 diff --git a/main.go b/main.go index 8742a4d..f2e33b0 100644 --- a/main.go +++ b/main.go @@ -14,91 +14,292 @@ // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . + package main import ( + "bytes" "context" "crypto/rand" + "encoding/json" + "errors" "flag" "fmt" "io" - "log" "net" "net/http" "os" + "os/signal" "strconv" + "strings" "sync" "sync/atomic" "syscall" "time" "github.com/dustin/go-humanize" - "github.com/google/uuid" + "github.com/minio/pkg/v3/ellipses" "golang.org/x/sys/unix" + + "github.com/charmbracelet/lipgloss" + "github.com/charmbracelet/lipgloss/table" ) -var port = func() string { - p := os.Getenv("HPERF_PORT") - if p == "" { - p = "9999" +// dataPoint is collected every second+- a few micro/nano seconds. +type dataPoint struct { + Created time.Time + // Local IP Address is the address which listens for data + Local string + // Remote is the IP Address specified on the sender + Remote string + // LowestTransferMS - lowest time seen (in MS) transferring 1xPayload + LowestTransferMS int64 + // HighestTransferMS - highest time seen (in MS) to transferring 1xPayload + HighestTransferMS int64 + + // TTFBHigh - highest time to first byte + TTFBHigh int64 + // TTFBLow - lowest time to first byte + TTFBLow int64 + + // RX - Total transmitted amount + RX uint64 + // RXCount - total sent http requests + RXCount uint64 + // TX - Total received amount + TX uint64 + // TXCount - total received http requests + TXCount uint64 + + // The number of errors seen + Errors uint64 +} + +var ( + lastRow = 0 + headerStyle = lipgloss.NewStyle().Foreground(lipgloss.Color("#f2f2f2")).Padding(0, 1) + tableStyle = lipgloss.NewStyle().Foreground(lipgloss.Color("#f2f2f2")).Padding(0, 1).Align(lipgloss.Right) + baseTable *table.Table + latencyTable *table.Table + streamTable *table.Table + fullTable *table.Table + finalTable *table.Table + styleFunc = func(row, col int) lipgloss.Style { + switch { + case row == 0: + return headerStyle + default: + return tableStyle + } } - return p -}() +) -var selfDetectPort = func() string { - if sp := os.Getenv("HPERF_SELF_PORT"); sp != "" { - return sp +func createTables() { + finalTable = table.New(). + Border(lipgloss.NormalBorder()). + BorderStyle(lipgloss.NewStyle().Foreground(lipgloss.Color("99"))) + + if latency { + finalTable.Headers("Local", "Remote", "TX(ms) high/low", "Err #") + return + } else if stream { + finalTable.Headers("Local", "Remote", "RX", "TX") + return } - sp, err := strconv.Atoi(port) - if err != nil { - log.Fatal(err) + + finalTable.Headers("Local", "Remote", "#RX", "RX", "#TX", "TX", "TX(ms) high/low", "TTFB(ms) high/low", "#Err") + finalTable.StyleFunc(styleFunc) +} + +func printTable(list []dataPoint) { + if len(list) == 0 { + return } - sp++ - return strconv.Itoa(sp) -}() -var uniqueStr = uuid.New().String() + if globalErr != nil { + fmt.Println("Last Error: ", globalErr.Error()) + globalErr = nil + } -var ( - dataIn uint64 - dataOut uint64 -) + rows := make([][]string, len(list)) -const dialTimeout = 1 * time.Second + for i := range list { -func printDataOut() { - for { - time.Sleep(time.Second) - lastDataIn := atomic.SwapUint64(&dataIn, 0) - lastDataOut := atomic.SwapUint64(&dataOut, 0) - fmt.Printf("Bandwidth: %s/s RX | %s/s TX\n", humanize.Bytes(lastDataIn), humanize.Bytes(lastDataOut)) + local := list[i].Local + remote := list[i].Remote + + txc := strconv.Itoa(int(list[i].TXCount)) + tx := humanize.Bytes(list[i].TX) + "/s" + + rxc := strconv.Itoa(int(list[i].RXCount)) + rx := humanize.Bytes(list[i].RX) + "/s" + + txms := strconv.Itoa(int(list[i].HighestTransferMS)) + " / " + strconv.Itoa(int(list[i].LowestTransferMS)) + ttfb := strconv.Itoa(int(list[i].TTFBHigh)) + " / " + strconv.Itoa(int(list[i].TTFBLow)) + errCount := strconv.Itoa(int(list[i].Errors)) + if latency { + rows[i] = append(rows[i], local, remote, txms, errCount) + } else if stream { + rows[i] = append(rows[i], local, remote, rx, tx) + } else { + rows[i] = append(rows[i], local, remote, rxc, rx, txc, tx, txms, ttfb, errCount) + } } + + finalTable.ClearRows() + finalTable.Data(table.NewStringData()) + finalTable.Rows(rows...) + lastRow = len(rows) + fmt.Println(finalTable) } -// Discard is just like io.Discard without the io.ReaderFrom compatible -// implementation which is buggy on NUMA systems, we have to use a simpler -// io.Writer implementation alone avoids also unnecessary buffer copies, -// and as such incurred latencies. -var Discard io.Writer = discard{} +func printDataOut(ctx context.Context, cancel context.CancelFunc) { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + go func() { + <-sigc + httpListener.Close() + cancel() + }() + + createTables() + + lastPrint := false + for !lastPrint { + time.Sleep(1 * time.Second) + + // We wait for requests and clients to exit so that + // the request counters are as accurate as possible. + // lastPrint = activeClients.Load() == 0 && activeRequests.Load() == 0 && ctx.Err() != nil + if activeClients.Load() == 0 && activeRequests.Load() == 0 { + if ctx.Err() != nil { + lastPrint = true + } + } + + if time.Since(start).Seconds() > float64(seconds) { + if httpListener != nil { + httpListener.Close() + } + cancel() + } + + list := generateDataPoint() + if printJSON { + for _, v := range list { + outb, _ := json.Marshal(v) + fmt.Println(string(outb)) + } + } else { + printTable(list) + } + + } +} + +func generateDataPoint() (list []dataPoint) { + list = make([]dataPoint, 0, len(Readers)) + for ri, rv := range Readers { + if rv == nil { + continue + } + for wi, wv := range Writers { + if wv == nil || wv.host != rv.host { + continue + } + + w := Writers[wi] + r := Readers[ri] + + list = append(list, dataPoint{ + Created: time.Now(), + RX: w.rx.Swap(0), + RXCount: w.count.Load(), + TX: r.tx.Swap(0), + TXCount: r.count.Load(), + Local: currentHost, + TTFBLow: r.ttfbLow, + TTFBHigh: r.ttfbHigh, + LowestTransferMS: r.lowestTransferMS, + HighestTransferMS: r.highestTransferMS, + Errors: r.errors.Swap(0), + Remote: r.host, + }) + + r.m.Lock() + r.ttfbHigh = 0 + r.ttfbLow = 999 + r.highestTransferMS = 0 + r.lowestTransferMS = 999 + r.m.Unlock() + + } + } + return +} -// discard is /dev/null for Golang. -type discard struct{} +type netperfWriter struct { + buf []byte + rx atomic.Uint64 + count atomic.Uint64 + host string +} -func (discard) Write(p []byte) (int, error) { - atomic.AddUint64(&dataIn, uint64(len(p))) +func (w *netperfWriter) Write(p []byte) (int, error) { + w.rx.Add(uint64(len(p))) return len(p), nil } func runServer(host string) { - http.HandleFunc("/devnull", func(w http.ResponseWriter, r *http.Request) { - buf := make([]byte, 1*humanize.MiByte) - io.CopyBuffer(Discard, r.Body, buf) + http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) { + index := createNetWriter(r.Header.Get("X-Host")) + w.Header().Add("X-Index", index) + w.WriteHeader(200) + r.Body.Close() + }) + + http.HandleFunc("/ms", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + r.Body.Close() + }) + + http.HandleFunc("/data", func(w http.ResponseWriter, r *http.Request) { + activeRequests.Add(1) + defer activeRequests.Add(-1) + host := getNetWriter(r.Header.Get("X-Index")) + if host == nil { + w.WriteHeader(400) + return + } + + host.count.Add(1) + + io.CopyBuffer(host, r.Body, host.buf) + r.Body.Close() }) + + var err error + httpListener, err = net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port))) + if err != nil { + panic(err) + } + s := &http.Server{ - Addr: net.JoinHostPort(host, port), + Addr: net.JoinHostPort(host, strconv.Itoa(port)), MaxHeaderBytes: 1 << 20, } - s.ListenAndServe() + + err = s.Serve(httpListener) + if err != nil { + globalErr = err + if printAllErrors { + fmt.Println("Error serving: ", err) + } + } } // DialContext is a function to make custom Dial for internode communications @@ -169,24 +370,108 @@ func NewInternodeDialContext(dialTimeout time.Duration) DialContext { // Reader to read random data. type netperfReader struct { - doneCh <-chan struct{} - buf []byte + buf []byte + + host string + addr string + count atomic.Uint64 + tx atomic.Uint64 + errors atomic.Uint64 + + ttfbHigh int64 + ttfbLow int64 + highestTransferMS int64 + lowestTransferMS int64 + + m sync.Mutex +} + +type AsyncReader struct { + pr *netperfReader + i int64 // current reading index + prevRune int // index of previous rune; or < 0 + ttfbRegistered bool + start time.Time + ctx context.Context } -func (m *netperfReader) Read(b []byte) (int, error) { - select { - case <-m.doneCh: - return 0, io.EOF - default: +func (a *AsyncReader) Read(b []byte) (n int, err error) { + if !a.ttfbRegistered { + a.ttfbRegistered = true + since := time.Since(a.start).Milliseconds() + a.pr.m.Lock() + if since > a.pr.ttfbHigh { + a.pr.ttfbHigh = since + } + if since < a.pr.ttfbLow { + a.pr.ttfbLow = since + } + a.pr.m.Unlock() + } + + if !stream { + if a.i >= int64(len(a.pr.buf)) { + return 0, io.EOF + } + a.prevRune = -1 + n = copy(b, a.pr.buf[a.i:]) + a.i += int64(n) + } else { + if a.ctx.Err() != nil { + return 0, io.EOF + } + n = copy(b, a.pr.buf) } - n := copy(b, m.buf) - atomic.AddUint64(&dataOut, uint64(n)) + + a.pr.tx.Add(uint64(n)) return n, nil } -func runClient(host string) { - host = net.JoinHostPort(host, port) - proc := 32 +func NewNetPerfReader(host string) (r *netperfReader) { + r = new(netperfReader) + r.host = host + r.buf = make([]byte, payloadMB*humanize.MiByte) + r.ttfbLow = 999 + rand.Read(r.buf) + return +} + +func getWriterIndex(r *netperfReader, c *http.Client) string { + req, err := http.NewRequestWithContext( + context.Background(), + http.MethodGet, + "http://"+r.addr+"/hello", + nil, + ) + if err != nil { + globalErr = err + if printAllErrors { + fmt.Println("Error Creating hello request:", err) + } + return "" + } + req.Header.Set("X-Host", currentHost) + resp, err := c.Do(req) + if err != nil { + globalErr = err + if printAllErrors { + fmt.Println("Error sending hello request:", err) + } + return "" + } + if resp == nil { + return "" + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return resp.Header.Get("X-Index") +} + +func runClient(ctx context.Context, r *netperfReader) { + activeClients.Add(1) + defer activeClients.Add(-1) + + r.addr = net.JoinHostPort(r.host, strconv.Itoa(port)) // For more details about various values used here refer // https://golang.org/pkg/net/http/#Transport documentation @@ -194,11 +479,12 @@ func runClient(host string) { Proxy: http.ProxyFromEnvironment, DialContext: NewInternodeDialContext(10 * time.Second), MaxIdleConnsPerHost: 1024, - WriteBufferSize: 64 << 10, // 64KiB moving up from 4KiB default - ReadBufferSize: 64 << 10, // 64KiB moving up from 4KiB default + WriteBufferSize: bufferKB * humanize.KByte, + ReadBufferSize: bufferKB * humanize.KByte, IdleConnTimeout: 15 * time.Second, ResponseHeaderTimeout: 15 * time.Minute, // Conservative timeout is the default (for MinIO internode) TLSHandshakeTimeout: 10 * time.Second, + // Go net/http automatically unzip if content-type is // gzip disable this feature, as we are always interested // in raw stream. @@ -209,10 +495,23 @@ func runClient(host string) { Transport: tr, } - ctx, cancel := context.WithCancel(context.Background()) - r := &netperfReader{doneCh: ctx.Done()} - r.buf = make([]byte, 1*humanize.MiByte) - rand.Read(r.buf) + // Each http client receives it's own index in order + // for us to match it with incomming statistics on + // the receiver side. + var clientIndex string + for { + if ctx.Err() != nil { + return + } + + time.Sleep(1 * time.Second) + clientIndex = getWriterIndex(r, clnt) + if clientIndex != "" { + globalErr = nil + break + } + globalErr = errors.New("Waiting for index ...") + } var wg sync.WaitGroup for i := 0; i < proc; i++ { @@ -220,28 +519,99 @@ func runClient(host string) { go func() { defer wg.Done() - // Establish the connection. for { - req, err := http.NewRequestWithContext(ctx, http.MethodPut, "http://"+host+"/devnull", nil) + if ctx.Err() != nil { + return + } + time.Sleep(time.Duration(sleep) * time.Millisecond) + var req *http.Request + + AR := new(AsyncReader) + AR.pr = r + AR.start = time.Now() + AR.ctx = ctx + + var resp *http.Response + var err error + + if stream { + req, err = http.NewRequestWithContext( + ctx, + http.MethodPut, + "http://"+r.addr+"/data", + nil, + ) + } else if latency { + req, err = http.NewRequestWithContext( + context.Background(), + http.MethodGet, + "http://"+r.addr+"/ms", + nil, + ) + } else { + req, err = http.NewRequestWithContext( + context.Background(), + http.MethodPut, + "http://"+r.addr+"/data", + AR, + ) + } + + req.Header.Set("X-Index", clientIndex) + if err != nil { - log.Println("Client-Error-New", err) + globalErr = err + if printAllErrors { + fmt.Println("Error Creating request:", err) + } time.Sleep(dialTimeout) continue } - req.Body = io.NopCloser(r) - req.ContentLength = -1 - resp, err := clnt.Do(req) + if stream { + req.Body = io.NopCloser(AR) + req.ContentLength = -1 + } + + sent := time.Now() + r.count.Add(1) + resp, err = clnt.Do(req) if err != nil { - log.Println("Client-Error-Do", err) + if errors.Is(err, context.Canceled) { + break + } + if printAllErrors { + fmt.Println("Error Sending request:", err) + } + globalErr = err + r.errors.Add(1) time.Sleep(dialTimeout) continue } + + done := time.Since(sent).Milliseconds() + + r.m.Lock() + if done > r.highestTransferMS { + r.highestTransferMS = done + } + + if done < r.lowestTransferMS { + r.lowestTransferMS = done + } + r.m.Unlock() + io.Copy(io.Discard, resp.Body) resp.Body.Close() if resp.StatusCode != http.StatusOK { - log.Println("Client-Error-Response", resp.Status) + err = errors.New("Status code was not OK: " + resp.Status) + if printAllErrors { + fmt.Println(err) + } + + globalErr = err + r.errors.Add(1) time.Sleep(dialTimeout) continue } @@ -250,50 +620,260 @@ func runClient(host string) { } wg.Wait() - cancel() } +func getNetWriter(index string) *netperfWriter { + indexInt, err := strconv.Atoi(index) + if err != nil { + globalErr = errors.New("index is not a number") + return nil + } + if indexInt > len(Writers) { + globalErr = errors.New("writer index is bigger then len") + return nil + } + + return Writers[indexInt] +} + +func createNetWriter(host string) string { + defer WriterLock.Unlock() + WriterLock.Lock() + for i, v := range Writers { + if v != nil { + continue + } + + Writers[i] = new(netperfWriter) + Writers[i].buf = make([]byte, payloadMB*humanize.MiByte) + Writers[i].host = host + return strconv.Itoa(i) + } + + return "" +} + +var ( + start = time.Now() + longLatencyTest bool + shortLatencyTest bool + + port int + proc int + payloadMB int + bufferKB int + seconds int + sleep int + startTimeout int + stream bool + printJSON bool + resolveHosts string + latency bool + clearTable bool + serveIP string + hosts string + activeClients atomic.Int64 + activeRequests atomic.Int64 + dialTimeout = 1 * time.Second + + currentHost string + httpListener net.Listener + Readers []*netperfReader + Writers []*netperfWriter + WriterLock = sync.Mutex{} + globalErr error + printAllErrors bool +) + func main() { + flag.BoolVar(&longLatencyTest, "longLatencyTest", false, "60 second latency test") + flag.BoolVar(&shortLatencyTest, "shortLatencyTest", false, "10 minute latency test") + + flag.BoolVar(&clearTable, "clearTable", true, "Clear table between updates") + flag.BoolVar(&latency, "latency", false, "Do latency testing only") + flag.BoolVar(&stream, "stream", true, "Only use a single client per remote host") + flag.BoolVar(&printJSON, "json", false, "print as json output") + flag.IntVar(&port, "port", 9999, "serve port") + flag.IntVar(&sleep, "sleep", 1, "Timeout between requests in Milliseconds") + flag.IntVar(&startTimeout, "startTimeout", 10, "Timeout before testing starts") + flag.IntVar(&bufferKB, "bufferKB", 64, "TX/RX buffer size in Kilobytes") + flag.IntVar(&payloadMB, "payloadMB", 1, "Payload buffer size in Megabytes") + flag.StringVar(&serveIP, "serveIP", "", "IP to listen for requests on, only needed if two IPs are on the same host. (mostly used for local testing)") + flag.StringVar(&hosts, "hosts", "file:./hosts", "Define servers using an ellipses range: '1.1.1.{1...3},2.2.2.{1...3}' \nYou can also use IP's: '1.1.1.1,2.2.2.1'\nOr a host file: 'file:./hosts'\nWhen using a file, hosts can be comma seperate (with no spacing) or host per line. \nYou can even mix hostnames and IP's together in the file.\n") + flag.StringVar(&resolveHosts, "resolveHosts", "", "Resolve hosts using the given DNS server.") + flag.IntVar(&proc, "proc", 32, "Concurrent requests per host") + flag.IntVar(&seconds, "seconds", 15, "how long (Seconds) to run hperf") + flag.BoolVar(&printAllErrors, "printErrors", false, "Print all errors in real time") flag.Parse() - if flag.NArg() == 0 { - log.Fatal("provide a list of hostnames or IP addresses") + + if latency { + stream = false + } + + if shortLatencyTest { + latency = true + startTimeout = 30 + stream = false + sleep = 100 + bufferKB = 16 + payloadMB = 0 + proc = 1 + seconds = 60 } - hostMap := make(map[string]struct{}, flag.NArg()) - for _, host := range flag.Args() { - if _, ok := hostMap[host]; ok { - log.Fatalln("duplicate arguments found, please make sure all arguments are unique") + if longLatencyTest { + latency = true + startTimeout = 30 + stream = false + sleep = 100 + bufferKB = 16 + payloadMB = 0 + proc = 1 + seconds = 600 + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addressMap := make(map[string]struct{}) + interfaces, _ := net.Interfaces() + for _, intf := range interfaces { + addrs, _ := intf.Addrs() + for _, addr := range addrs { + sa := strings.Split(addr.String(), "/") + addressMap[sa[0]] = struct{}{} } - hostMap[host] = struct{}{} } - s := &http.Server{ - Addr: ":" + selfDetectPort, - MaxHeaderBytes: 1 << 20, + hostMap := make(map[string]struct{}, 0) + + if strings.Contains(hosts, "file:") { + + fs := strings.Split(hosts, ":") + if len(fs) < 2 { + fmt.Println("When using a file for hosts, please use the format( file:path ) example( file:~/hosts.txt )") + fmt.Println("Hosts within the file should be per line or comma seperated") + os.Exit(1) + } + hb, err := os.ReadFile(fs[1]) + if err != nil { + fmt.Println("Could not open file:", fs[1]) + os.Exit(1) + } + + // this is just to trip out carrage return + hb = bytes.Replace(hb, []byte{13}, []byte{}, -1) + + var splitLines [][]byte + if bytes.Contains(hb, []byte(",")) { + splitLines = bytes.Split(hb, []byte(",")) + } else if bytes.Contains(hb, []byte{10}) { + splitLines = bytes.Split(hb, []byte{10}) + } + if len(splitLines) < 1 { + fmt.Println("Hosts within the file (", fs[1], ") should be per line or comma seperated") + os.Exit(1) + } + + for _, v := range splitLines { + // to account to accidental empty lines or commas + if len(v) == 0 { + continue + } + hostMap[string(v)] = struct{}{} + } + + } else { + + splitHosts := strings.Split(hosts, ",") + hostList := make([]ellipses.ArgPattern, 0) + for _, v := range splitHosts { + if !ellipses.HasEllipses(v) { + hostMap[v] = struct{}{} + continue + } + + x, e := ellipses.FindEllipsesPatterns(v) + if e != nil { + fmt.Println(e) + os.Exit(1) + } + hostList = append(hostList, x) + } + + for _, host := range hostList { + for _, pattern := range host { + for _, seq := range pattern.Seq { + hostMap[pattern.Prefix+seq] = struct{}{} + } + } + } + } - go func() { - http.HandleFunc("/"+uniqueStr, func(w http.ResponseWriter, req *http.Request) {}) - s.ListenAndServe() - }() - log.Println("Starting HTTP service to skip self.. waiting for 10secs for services to be ready") - time.Sleep(time.Second * 10) + for host := range hostMap { + if net.ParseIP(host) == nil && resolveHosts != "" { + ips, err := net.LookupIP(host) + if err != nil { + fmt.Println("Could not look up ", host, ", err :", err) + os.Exit(1) + } + if len(ips) == 0 { + fmt.Println("Could not look up ", host, ", err: did not find any IPs on record") + os.Exit(1) + } + + hostMap[ips[0].String()] = struct{}{} + delete(hostMap, host) + continue + } + } var serverRunningOnce sync.Once + serverStarted := false + + if serveIP != "" { + serverRunningOnce.Do(func() { + currentHost = serveIP + serverStarted = true + go runServer(serveIP) + }) + } + + Readers = make([]*netperfReader, len(hostMap)+1) + Writers = make([]*netperfWriter, len(hostMap)+1) + + time.Sleep(time.Duration(startTimeout) * time.Second) + for host := range hostMap { - resp, err := http.Get("http://" + host + ":" + selfDetectPort + "/" + uniqueStr) - if err == nil && resp.StatusCode == http.StatusOK { - resp.Body.Close() // close the connection. - s.Close() // close the server as we are done. - log.Println("HTTP service closed after successful skip...") - serverRunningOnce.Do(func() { - go runServer(host) - }) + if !serverStarted { + _, ok := addressMap[host] + if ok { + serverRunningOnce.Do(func() { + currentHost = host + serverStarted = true + go runServer(host) + }) + continue + } + } else if serverStarted && host == serveIP { continue } - go runClient(host) + + for i, v := range Readers { + if v == nil { + Readers[i] = NewNetPerfReader(host) + go runClient(ctx, Readers[i]) + break + } + } + + } + + if !serverStarted { + fmt.Println("None of the given IP Addresses can be found on this server") + os.Exit(1) } - go printDataOut() - time.Sleep(time.Hour * 72) + printDataOut(ctx, cancel) } From f8d0d6e1fdc48af0d475a4103d32d65ff2b582da Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 15 Jul 2024 10:44:14 -0700 Subject: [PATCH 2/2] Update go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 6a000f4..4833a97 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/minio/hperf -go 1.22.4 +go 1.22 require ( github.com/charmbracelet/lipgloss v0.11.1