diff --git a/README.md b/README.md index 7841ca98..c6780b25 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,13 @@ Available Commands: - etcd - DynamoDB +## Output configuration + +|field|default value|description| +|-|-|-| +|measurementtype|"histogram"|The mechanism for recording measurements, one of `histogram`, `raw` or `csv`| +|measurement.output_file|""|File to write output to, default writes to stdout| + ## Database Configuration You can pass the database configurations through `-p field=value` in the command line directly. diff --git a/pkg/client/client.go b/pkg/client/client.go index 3fb34098..71f4d2f0 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -195,7 +195,7 @@ func (c *Client) Run(ctx context.Context) { for { select { case <-t.C: - measurement.Output() + measurement.Summary() case <-measureCtx.Done(): return } diff --git a/pkg/client/dbwrapper.go b/pkg/client/dbwrapper.go index f7492c5d..d8ec94d2 100644 --- a/pkg/client/dbwrapper.go +++ b/pkg/client/dbwrapper.go @@ -30,11 +30,11 @@ type DbWrapper struct { func measure(start time.Time, op string, err error) { lan := time.Now().Sub(start) if err != nil { - measurement.Measure(fmt.Sprintf("%s_ERROR", op), lan) + measurement.Measure(fmt.Sprintf("%s_ERROR", op), start, lan) return } - measurement.Measure(op, lan) + measurement.Measure(op, start, lan) } func (db DbWrapper) Close() error { diff --git a/pkg/measurement/csv.go b/pkg/measurement/csv.go new file mode 100644 index 00000000..0014c6fe --- /dev/null +++ b/pkg/measurement/csv.go @@ -0,0 +1,51 @@ +package measurement + +import ( + "fmt" + "io" + "time" +) + +type csventry struct { + // start time of the operation in us from unix epoch + startUs int64 + // latency of the operation in us + latencyUs int64 +} + +type csvs struct { + opCsv map[string][]csventry +} + +func InitCSV() *csvs { + return &csvs{ + opCsv: make(map[string][]csventry), + } +} + +func (c *csvs) Measure(op string, start time.Time, lan time.Duration) { + c.opCsv[op] = append(c.opCsv[op], csventry{ + startUs: start.UnixMicro(), + latencyUs: lan.Microseconds(), + }) +} + +func (c *csvs) Output(w io.Writer) error { + _, err := fmt.Fprintln(w, "operation,timestamp_us,latency_us") + if err != nil { + return err + } + for op, entries := range c.opCsv { + for _, entry := range entries { + _, err := fmt.Fprintf(w, "%s,%d,%d\n", op, entry.startUs, entry.latencyUs) + if err != nil { + return err + } + } + } + return nil +} + +func (c *csvs) Summary() { + // do nothing as csvs don't keep a summary +} diff --git a/pkg/measurement/histogram.go b/pkg/measurement/histogram.go index 5b309cb5..5da22c9e 100644 --- a/pkg/measurement/histogram.go +++ b/pkg/measurement/histogram.go @@ -18,9 +18,7 @@ import ( "time" hdrhistogram "github.com/HdrHistogram/hdrhistogram-go" - "github.com/magiconair/properties" "github.com/pingcap/go-ycsb/pkg/util" - "github.com/pingcap/go-ycsb/pkg/ycsb" ) type histogram struct { @@ -42,13 +40,7 @@ const ( PER9999TH = "PER9999TH" ) -func (h *histogram) Info() ycsb.MeasurementInfo { - res := h.getInfo() - delete(res, ELAPSED) - return newHistogramInfo(res) -} - -func newHistogram(p *properties.Properties) *histogram { +func newHistogram() *histogram { h := new(histogram) h.startTime = time.Now() h.hist = hdrhistogram.New(1, 24*60*60*1000*1000, 3) @@ -103,18 +95,3 @@ func (h *histogram) getInfo() map[string]interface{} { return res } - -type histogramInfo struct { - info map[string]interface{} -} - -func newHistogramInfo(info map[string]interface{}) *histogramInfo { - return &histogramInfo{info: info} -} - -func (hi *histogramInfo) Get(metricName string) interface{} { - if value, ok := hi.info[metricName]; ok { - return value - } - return nil -} diff --git a/pkg/measurement/histograms.go b/pkg/measurement/histograms.go new file mode 100644 index 00000000..4e0d0a05 --- /dev/null +++ b/pkg/measurement/histograms.go @@ -0,0 +1,76 @@ +package measurement + +import ( + "io" + "os" + "sort" + "time" + + "github.com/magiconair/properties" + "github.com/pingcap/go-ycsb/pkg/prop" + "github.com/pingcap/go-ycsb/pkg/util" +) + +type histograms struct { + p *properties.Properties + + histograms map[string]*histogram +} + +func (h *histograms) Measure(op string, start time.Time, lan time.Duration) { + opM, ok := h.histograms[op] + if !ok { + opM = newHistogram() + h.histograms[op] = opM + } + + opM.Measure(lan) +} + +func (h *histograms) summary() map[string][]string { + summaries := make(map[string][]string, len(h.histograms)) + for op, opM := range h.histograms { + summaries[op] = opM.Summary() + } + return summaries +} + +func (h *histograms) Summary() { + h.Output(os.Stdout) +} + +func (h *histograms) Output(w io.Writer) error { + summaries := h.summary() + keys := make([]string, 0, len(summaries)) + for k := range summaries { + keys = append(keys, k) + } + sort.Strings(keys) + + lines := [][]string{} + for _, op := range keys { + line := []string{op} + line = append(line, summaries[op]...) + lines = append(lines, line) + } + + outputStyle := h.p.GetString(prop.OutputStyle, util.OutputStylePlain) + switch outputStyle { + case util.OutputStylePlain: + util.RenderString(w, "%-6s - %s\n", header, lines) + case util.OutputStyleJson: + util.RenderJson(w, header, lines) + case util.OutputStyleTable: + util.RenderTable(w, header, lines) + default: + panic("unsupported outputstyle: " + outputStyle) + } + return nil +} + +func InitHistograms(p *properties.Properties) *histograms { + return &histograms{ + p: p, + histograms: make(map[string]*histogram, 16), + } +} diff --git a/pkg/measurement/measurement.go b/pkg/measurement/measurement.go index bb47bc36..7260bb31 100644 --- a/pkg/measurement/measurement.go +++ b/pkg/measurement/measurement.go @@ -14,14 +14,14 @@ package measurement import ( - "sort" + "bufio" + "os" "sync" "sync/atomic" "time" "github.com/magiconair/properties" "github.com/pingcap/go-ycsb/pkg/prop" - "github.com/pingcap/go-ycsb/pkg/util" "github.com/pingcap/go-ycsb/pkg/ycsb" ) @@ -32,90 +32,75 @@ type measurement struct { p *properties.Properties - opMeasurement map[string]ycsb.Measurement + measurer ycsb.Measurer } -func (m *measurement) measure(op string, lan time.Duration) { - m.RLock() - opM, ok := m.opMeasurement[op] - m.RUnlock() - - if !ok { - opM = newHistogram(m.p) - m.Lock() - m.opMeasurement[op] = opM - m.Unlock() - } - - opM.Measure(lan) +func (m *measurement) measure(op string, start time.Time, lan time.Duration) { + m.Lock() + m.measurer.Measure(op, start, lan) + m.Unlock() } func (m *measurement) output() { m.RLock() defer m.RUnlock() - keys := make([]string, len(m.opMeasurement)) - var i = 0 - for k := range m.opMeasurement { - keys[i] = k - i += 1 - } - sort.Strings(keys) - lines := [][]string{} - for _, op := range keys { - line := []string{op} - line = append(line, m.opMeasurement[op].Summary()...) - lines = append(lines, line) + outFile := m.p.GetString(prop.MeasurementRawOutputFile, "") + var w *bufio.Writer + if outFile == "" { + w = bufio.NewWriter(os.Stdout) + } else { + f, err := os.Create(outFile) + if err != nil { + panic("failed to create output file: " + err.Error()) + } + defer f.Close() + w = bufio.NewWriter(f) } - outputStyle := m.p.GetString(prop.OutputStyle, util.OutputStylePlain) - switch outputStyle { - case util.OutputStylePlain: - util.RenderString("%-6s - %s\n", header, lines) - case util.OutputStyleJson: - util.RenderJson(header, lines) - case util.OutputStyleTable: - util.RenderTable(header, lines) - default: - panic("unsupported outputstyle: " + outputStyle) + err := globalMeasure.measurer.Output(w) + if err != nil { + panic("failed to write output: " + err.Error()) } -} - -func (m *measurement) info() map[string]ycsb.MeasurementInfo { - m.RLock() - defer m.RUnlock() - opMeasurementInfo := make(map[string]ycsb.MeasurementInfo, len(m.opMeasurement)) - for op, opM := range m.opMeasurement { - opMeasurementInfo[op] = opM.Info() + err = w.Flush() + if err != nil { + panic("failed to flush output: " + err.Error()) } - return opMeasurementInfo } -func (m *measurement) getOpName() []string { +func (m *measurement) summary() { m.RLock() - defer m.RUnlock() - - res := make([]string, 0, len(m.opMeasurement)) - for op := range m.opMeasurement { - res = append(res, op) - } - return res + globalMeasure.measurer.Summary() + m.RUnlock() } // InitMeasure initializes the global measurement. func InitMeasure(p *properties.Properties) { globalMeasure = new(measurement) globalMeasure.p = p - globalMeasure.opMeasurement = make(map[string]ycsb.Measurement, 16) + measurementType := p.GetString(prop.MeasurementType, prop.MeasurementTypeDefault) + switch measurementType { + case "histogram": + globalMeasure.measurer = InitHistograms(p) + case "raw", "csv": + globalMeasure.measurer = InitCSV() + default: + panic("unsupported measurement type: " + measurementType) + } EnableWarmUp(p.GetInt64(prop.WarmUpTime, 0) > 0) } -// Output prints the measurement summary. +// Output prints the complete measurements. func Output() { globalMeasure.output() } +// Summary prints the measurement summary. +func Summary() { + globalMeasure.summary() +} + // EnableWarmUp sets whether to enable warm-up. func EnableWarmUp(b bool) { if b { @@ -131,22 +116,11 @@ func IsWarmUpFinished() bool { } // Measure measures the operation. -func Measure(op string, lan time.Duration) { +func Measure(op string, start time.Time, lan time.Duration) { if IsWarmUpFinished() { - globalMeasure.measure(op, lan) + globalMeasure.measure(op, start, lan) } } -// Info returns all the operations MeasurementInfo. -// The key of returned map is the operation name. -func Info() map[string]ycsb.MeasurementInfo { - return globalMeasure.info() -} - -// GetOpNames returns a string slice which contains all the operation name measured. -func GetOpNames() []string { - return globalMeasure.getOpName() -} - var globalMeasure *measurement var warmUp int32 // use as bool, 1 means in warmup progress, 0 means warmup finished. diff --git a/pkg/prop/prop.go b/pkg/prop/prop.go index aaf1f2a7..7ca5d6c1 100644 --- a/pkg/prop/prop.go +++ b/pkg/prop/prop.go @@ -109,6 +109,10 @@ const ( LogInterval = "measurement.interval" + MeasurementType = "measurementtype" + MeasurementTypeDefault = "histogram" + MeasurementRawOutputFile = "measurement.output_file" + Command = "command" OutputStyle = "outputstyle" diff --git a/pkg/util/output.go b/pkg/util/output.go index 914b428e..c982d6e5 100644 --- a/pkg/util/output.go +++ b/pkg/util/output.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/json" "fmt" - "os" + "io" "strings" "github.com/olekukonko/tablewriter" @@ -18,7 +18,7 @@ const ( ) // RenderString renders headers and values according to the format provided -func RenderString(format string, headers []string, values [][]string) { +func RenderString(w io.Writer, format string, headers []string, values [][]string) { if len(values) == 0 { return } @@ -31,22 +31,22 @@ func RenderString(format string, headers []string, values [][]string) { } buf.WriteString(fmt.Sprintf(format, value[0], strings.Join(args, ", "))) } - fmt.Print(buf.String()) + fmt.Fprint(w, buf.String()) } // RenderTable will use given headers and values to render a table style output -func RenderTable(headers []string, values [][]string) { +func RenderTable(w io.Writer, headers []string, values [][]string) { if len(values) == 0 { return } - tb := tablewriter.NewWriter(os.Stdout) + tb := tablewriter.NewWriter(w) tb.SetHeader(headers) tb.AppendBulk(values) tb.Render() } // RnederJson will combine the headers and values and print a json string -func RenderJson(headers []string, values [][]string) { +func RenderJson(w io.Writer, headers []string, values [][]string) { if len(values) == 0 { return } @@ -60,10 +60,10 @@ func RenderJson(headers []string, values [][]string) { } outStr, err := json.Marshal(data) if err != nil { - fmt.Println(err) + fmt.Fprintln(w, err) return } - fmt.Println(string(outStr)) + fmt.Fprintln(w, string(outStr)) } // IntToString formats int value to string diff --git a/pkg/workload/core.go b/pkg/workload/core.go index dbb9ab03..0d860109 100644 --- a/pkg/workload/core.go +++ b/pkg/workload/core.go @@ -444,7 +444,7 @@ func (c *core) doTransactionRead(ctx context.Context, db ycsb.DB, state *coreSta func (c *core) doTransactionReadModifyWrite(ctx context.Context, db ycsb.DB, state *coreState) error { start := time.Now() defer func() { - measurement.Measure("READ_MODIFY_WRITE", time.Now().Sub(start)) + measurement.Measure("READ_MODIFY_WRITE", start, time.Now().Sub(start)) }() r := state.r diff --git a/pkg/ycsb/measurement.go b/pkg/ycsb/measurement.go index d4b778a1..60e4c46c 100644 --- a/pkg/ycsb/measurement.go +++ b/pkg/ycsb/measurement.go @@ -14,22 +14,18 @@ package ycsb import ( + "io" "time" ) -// MeasurementInfo contains metrics of one measurement. -type MeasurementInfo interface { - // Get returns the value corresponded to the specified metric, such QPS, MIN, MAX,etc. - // If metric does not exist, the returned value will be nil. - Get(metricName string) interface{} -} +// Measurer is used to capture measurements. +type Measurer interface { + // Measure measures the latency of an operation. + Measure(op string, start time.Time, latency time.Duration) + + // Summary writes a summary of the current measurement results to stdout. + Summary() -// Measurement measures the operations metrics. -type Measurement interface { - // Measure measures the operation latency. - Measure(latency time.Duration) - // Summary returns the summary of the measurement. - Summary() []string - // Info returns the MeasurementInfo of the measurement. - Info() MeasurementInfo + // Output writes the measurement results to the specified writer. + Output(w io.Writer) error }