Skip to content

Commit

Permalink
add data received metric
Browse files Browse the repository at this point in the history
  • Loading branch information
arriven committed Jul 1, 2022
1 parent d181b00 commit f3ec73c
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 46 deletions.
38 changes: 5 additions & 33 deletions src/core/packetgen/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func OpenConnection(ctx context.Context, c ConnectionConfig) (Connection, error)

type Connection interface {
Write(Packet) (int, error)
Read([]byte) (int, error)
Close() error
Target() string
}
Expand Down Expand Up @@ -111,57 +112,30 @@ func (conn *rawConn) Close() error {

func (conn *rawConn) Target() string { return conn.target }

func (conn *rawConn) Read(_ []byte) (int, error) { return 0, nil }

type netConnConfig struct {
Protocol string
Address string
Timeout time.Duration
Proxy utils.ProxyParams
Reader *netReaderConfig
TLSClientConfig *tls.Config
}

type netReaderConfig struct {
Interval time.Duration
}

type netConn struct {
net.Conn
buf gopacket.SerializeBuffer

target string
}

func readStub(ctx context.Context, conn net.Conn, c *netReaderConfig) {
const bufSize = 1024
buf := make([]byte, bufSize)

ticker := time.NewTicker(c.Interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, err := conn.Read(buf)
if err != nil {
return
}
}
}
}

func openNetConn(ctx context.Context, c netConnConfig, proxyParams *utils.ProxyParams) (*netConn, error) {
conn, err := utils.GetProxyFunc(utils.NonNilOrDefault(proxyParams, utils.ProxyParams{}), c.Protocol)(c.Protocol, c.Address)

switch {
case err != nil:
return nil, err
case c.TLSClientConfig == nil:
if c.Reader != nil {
go readStub(ctx, conn, c.Reader)
}

return &netConn{Conn: conn, buf: gopacket.NewSerializeBuffer(), target: c.Protocol + "://" + c.Address}, nil
}

Expand All @@ -172,10 +146,6 @@ func openNetConn(ctx context.Context, c netConnConfig, proxyParams *utils.ProxyP
return nil, err
}

if c.Reader != nil {
go readStub(ctx, conn, c.Reader)
}

return &netConn{Conn: tlsConn, buf: gopacket.NewSerializeBuffer(), target: c.Protocol + "://" + c.Address}, nil
}

Expand All @@ -192,3 +162,5 @@ func (conn *netConn) Close() error {
}

func (conn *netConn) Target() string { return conn.target }

func (conn *netConn) Read(buf []byte) (int, error) { return conn.Conn.Read(buf) }
20 changes: 10 additions & 10 deletions src/job/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ func fastHTTPJob(ctx context.Context, args config.Args, globalConfig *GlobalConf
backoffController := utils.BackoffController{BackoffConfig: utils.NonNilOrDefault(jobConfig.Backoff, globalConfig.Backoff)}
client := http.NewClient(ctx, *clientConfig, logger)

req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
var (
req fasthttp.Request
resp fasthttp.Response
)

if !jobConfig.Dynamic {
if err := buildHTTPRequest(ctx, logger, requestTpl, req); err != nil {
if err := buildHTTPRequest(ctx, logger, requestTpl, &req); err != nil {
return nil, fmt.Errorf("error executing request template: %w", err)
}
}
Expand All @@ -159,36 +161,34 @@ func fastHTTPJob(ctx context.Context, args config.Args, globalConfig *GlobalConf

for jobConfig.Next(ctx) {
if jobConfig.Dynamic {
if err := buildHTTPRequest(ctx, logger, requestTpl, req); err != nil {
if err := buildHTTPRequest(ctx, logger, requestTpl, &req); err != nil {
return nil, fmt.Errorf("error executing request template: %w", err)
}
}

if err := client.Do(req, nil); err != nil {
if err := client.Do(&req, &resp); err != nil {
logger.Debug("error sending request", zap.Error(err), zap.Any("args", args))

if a != nil {
a.Inc(target(req.URI()), metrics.RequestsAttemptedStat).Flush()
metrics.IncHTTP(string(req.Host()), string(req.Header.Method()), metrics.StatusFail)
}

utils.Sleep(ctx, backoffController.Increment().GetTimeout())

continue
}

requestSize, _ := req.WriteTo(nopWriter{})

if a != nil {
requestSize, _ := req.WriteTo(nopWriter{})
responseSize, _ := resp.WriteTo(nopWriter{})
tgt := target(req.URI())

a.Inc(tgt, metrics.RequestsAttemptedStat).
Inc(tgt, metrics.RequestsSentStat).
Inc(tgt, metrics.ResponsesReceivedStat).
Add(tgt, metrics.BytesSentStat, uint64(requestSize)).
Add(tgt, metrics.BytesReceivedStat, uint64(responseSize)).
Flush()

metrics.IncHTTP(string(req.Host()), string(req.Header.Method()), metrics.StatusSuccess)
}

backoffController.Reset()
Expand Down
28 changes: 28 additions & 0 deletions src/job/packetgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"errors"
"fmt"

"github.com/google/uuid"
"go.uber.org/zap"

"github.com/Arriven/db1000n/src/core/packetgen"
Expand Down Expand Up @@ -77,6 +78,8 @@ func sendPacket(ctx context.Context, logger *zap.Logger, jobConfig *packetgenJob
}
defer conn.Close()

go readStub(ctx, conn, a.Clone(uuid.NewString()))

packetSrc, err := makePacketSource(ctx, logger, jobConfig.Packets, jobConfig.Dynamic)
if err != nil {
return err
Expand Down Expand Up @@ -188,6 +191,31 @@ func getNextDynamicPacket(ctx context.Context, logger *zap.Logger, packetsChan c
}
}

func readStub(ctx context.Context, conn packetgen.Connection, a *metrics.Accumulator) {
const bufSize = 1024
buf := make([]byte, bufSize)

for {
select {
case <-ctx.Done():
return
default:
n, err := conn.Read(buf)
if err != nil {
return
}

if a != nil && n != 0 {
tgt := conn.Target()

a.Inc(tgt, metrics.ResponsesReceivedStat).
Add(tgt, metrics.BytesReceivedStat, uint64(n)).
Flush()
}
}
}
}

type packetDescriptor struct {
Packet map[string]any
Count int
Expand Down
7 changes: 4 additions & 3 deletions src/utils/metrics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,27 @@ func (r *ConsoleReporter) writeSummaryTo(metrics *Metrics, writer *tabwriter.Wri

// Print table's header
fmt.Fprintln(writer, "\n --- Traffic stats ---")
fmt.Fprintf(writer, "|\tTarget\t|\tRequests attempted\t|\tRequests sent\t|\tResponses received\t|\tData sent \t|\n")
fmt.Fprintf(writer, "|\tTarget\t|\tRequests attempted\t|\tRequests sent\t|\tResponses received\t|\tData sent \t|\tData received \t|\n")

// Print all table rows
for _, tgt := range stats.sortedTargets() {
printStatsRow(writer, tgt, stats[tgt])
}

// Print table's footer
fmt.Fprintln(writer, "|\t---\t|\t---\t|\t---\t|\t---\t|\t--- \t|")
fmt.Fprintln(writer, "|\t---\t|\t---\t|\t---\t|\t---\t|\t--- \t|\t--- \t|")
printStatsRow(writer, "Total", totals)
fmt.Fprintln(writer)
}

func printStatsRow(writer *tabwriter.Writer, rowName string, stats Stats) {
const BytesInMegabyte = 1024 * 1024

fmt.Fprintf(writer, "|\t%s\t|\t%d\t|\t%d\t|\t%d\t|\t%.2f MB \t|\n", rowName,
fmt.Fprintf(writer, "|\t%s\t|\t%d\t|\t%d\t|\t%d\t|\t%.2f MB \t|\t%.2f MB \t|\n", rowName,
stats[RequestsAttemptedStat],
stats[RequestsSentStat],
stats[ResponsesReceivedStat],
float64(stats[BytesSentStat])/BytesInMegabyte,
float64(stats[BytesReceivedStat])/BytesInMegabyte,
)
}
2 changes: 2 additions & 0 deletions src/utils/metrics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
RequestsSentStat
ResponsesReceivedStat
BytesSentStat
BytesReceivedStat

NumStats
)
Expand Down Expand Up @@ -54,6 +55,7 @@ func (stats *Stats) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddUint64("requests_sent", stats[RequestsSentStat])
enc.AddUint64("responses_received", stats[ResponsesReceivedStat])
enc.AddUint64("bytes_sent", stats[BytesSentStat])
enc.AddUint64("bytes_received", stats[BytesReceivedStat])

return nil
}

0 comments on commit f3ec73c

Please sign in to comment.