Skip to content

Commit

Permalink
Merge pull request #256 from jeffa5/csv-output
Browse files Browse the repository at this point in the history
Add CSV output
  • Loading branch information
zyguan authored Oct 21, 2022
2 parents d0a53f4 + 8352523 commit 4d727a7
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 120 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ Available Commands:
- etcd
- DynamoDB

## Output configuration

|field|default value|description|
|-|-|-|
|measurementtype|"histogram"|The mechanism for recording measurements, one of `histogram`, `raw` or `csv`|
|measurement.output_file|""|File to write output to, default writes to stdout|

## Database Configuration

You can pass the database configurations through `-p field=value` in the command line directly.
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (c *Client) Run(ctx context.Context) {
for {
select {
case <-t.C:
measurement.Output()
measurement.Summary()
case <-measureCtx.Done():
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/dbwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type DbWrapper struct {
func measure(start time.Time, op string, err error) {
lan := time.Now().Sub(start)
if err != nil {
measurement.Measure(fmt.Sprintf("%s_ERROR", op), lan)
measurement.Measure(fmt.Sprintf("%s_ERROR", op), start, lan)
return
}

measurement.Measure(op, lan)
measurement.Measure(op, start, lan)
}

func (db DbWrapper) Close() error {
Expand Down
51 changes: 51 additions & 0 deletions pkg/measurement/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package measurement

import (
"fmt"
"io"
"time"
)

type csventry struct {
// start time of the operation in us from unix epoch
startUs int64
// latency of the operation in us
latencyUs int64
}

type csvs struct {
opCsv map[string][]csventry
}

func InitCSV() *csvs {
return &csvs{
opCsv: make(map[string][]csventry),
}
}

func (c *csvs) Measure(op string, start time.Time, lan time.Duration) {
c.opCsv[op] = append(c.opCsv[op], csventry{
startUs: start.UnixMicro(),
latencyUs: lan.Microseconds(),
})
}

func (c *csvs) Output(w io.Writer) error {
_, err := fmt.Fprintln(w, "operation,timestamp_us,latency_us")
if err != nil {
return err
}
for op, entries := range c.opCsv {
for _, entry := range entries {
_, err := fmt.Fprintf(w, "%s,%d,%d\n", op, entry.startUs, entry.latencyUs)
if err != nil {
return err
}
}
}
return nil
}

func (c *csvs) Summary() {
// do nothing as csvs don't keep a summary
}
25 changes: 1 addition & 24 deletions pkg/measurement/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
"time"

hdrhistogram "github.com/HdrHistogram/hdrhistogram-go"
"github.com/magiconair/properties"
"github.com/pingcap/go-ycsb/pkg/util"
"github.com/pingcap/go-ycsb/pkg/ycsb"
)

type histogram struct {
Expand All @@ -42,13 +40,7 @@ const (
PER9999TH = "PER9999TH"
)

func (h *histogram) Info() ycsb.MeasurementInfo {
res := h.getInfo()
delete(res, ELAPSED)
return newHistogramInfo(res)
}

func newHistogram(p *properties.Properties) *histogram {
func newHistogram() *histogram {
h := new(histogram)
h.startTime = time.Now()
h.hist = hdrhistogram.New(1, 24*60*60*1000*1000, 3)
Expand Down Expand Up @@ -103,18 +95,3 @@ func (h *histogram) getInfo() map[string]interface{} {

return res
}

type histogramInfo struct {
info map[string]interface{}
}

func newHistogramInfo(info map[string]interface{}) *histogramInfo {
return &histogramInfo{info: info}
}

func (hi *histogramInfo) Get(metricName string) interface{} {
if value, ok := hi.info[metricName]; ok {
return value
}
return nil
}
76 changes: 76 additions & 0 deletions pkg/measurement/histograms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package measurement

import (
"io"
"os"
"sort"
"time"

"github.com/magiconair/properties"
"github.com/pingcap/go-ycsb/pkg/prop"
"github.com/pingcap/go-ycsb/pkg/util"
)

type histograms struct {
p *properties.Properties

histograms map[string]*histogram
}

func (h *histograms) Measure(op string, start time.Time, lan time.Duration) {
opM, ok := h.histograms[op]
if !ok {
opM = newHistogram()
h.histograms[op] = opM
}

opM.Measure(lan)
}

func (h *histograms) summary() map[string][]string {
summaries := make(map[string][]string, len(h.histograms))
for op, opM := range h.histograms {
summaries[op] = opM.Summary()
}
return summaries
}

func (h *histograms) Summary() {
h.Output(os.Stdout)
}

func (h *histograms) Output(w io.Writer) error {
summaries := h.summary()
keys := make([]string, 0, len(summaries))
for k := range summaries {
keys = append(keys, k)
}
sort.Strings(keys)

lines := [][]string{}
for _, op := range keys {
line := []string{op}
line = append(line, summaries[op]...)
lines = append(lines, line)
}

outputStyle := h.p.GetString(prop.OutputStyle, util.OutputStylePlain)
switch outputStyle {
case util.OutputStylePlain:
util.RenderString(w, "%-6s - %s\n", header, lines)
case util.OutputStyleJson:
util.RenderJson(w, header, lines)
case util.OutputStyleTable:
util.RenderTable(w, header, lines)
default:
panic("unsupported outputstyle: " + outputStyle)
}
return nil
}

func InitHistograms(p *properties.Properties) *histograms {
return &histograms{
p: p,
histograms: make(map[string]*histogram, 16),
}
}
114 changes: 44 additions & 70 deletions pkg/measurement/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
package measurement

import (
"sort"
"bufio"
"os"
"sync"
"sync/atomic"
"time"

"github.com/magiconair/properties"
"github.com/pingcap/go-ycsb/pkg/prop"
"github.com/pingcap/go-ycsb/pkg/util"
"github.com/pingcap/go-ycsb/pkg/ycsb"
)

Expand All @@ -32,90 +32,75 @@ type measurement struct {

p *properties.Properties

opMeasurement map[string]ycsb.Measurement
measurer ycsb.Measurer
}

func (m *measurement) measure(op string, lan time.Duration) {
m.RLock()
opM, ok := m.opMeasurement[op]
m.RUnlock()

if !ok {
opM = newHistogram(m.p)
m.Lock()
m.opMeasurement[op] = opM
m.Unlock()
}

opM.Measure(lan)
func (m *measurement) measure(op string, start time.Time, lan time.Duration) {
m.Lock()
m.measurer.Measure(op, start, lan)
m.Unlock()
}

func (m *measurement) output() {
m.RLock()
defer m.RUnlock()
keys := make([]string, len(m.opMeasurement))
var i = 0
for k := range m.opMeasurement {
keys[i] = k
i += 1
}
sort.Strings(keys)

lines := [][]string{}
for _, op := range keys {
line := []string{op}
line = append(line, m.opMeasurement[op].Summary()...)
lines = append(lines, line)
outFile := m.p.GetString(prop.MeasurementRawOutputFile, "")
var w *bufio.Writer
if outFile == "" {
w = bufio.NewWriter(os.Stdout)
} else {
f, err := os.Create(outFile)
if err != nil {
panic("failed to create output file: " + err.Error())
}
defer f.Close()
w = bufio.NewWriter(f)
}

outputStyle := m.p.GetString(prop.OutputStyle, util.OutputStylePlain)
switch outputStyle {
case util.OutputStylePlain:
util.RenderString("%-6s - %s\n", header, lines)
case util.OutputStyleJson:
util.RenderJson(header, lines)
case util.OutputStyleTable:
util.RenderTable(header, lines)
default:
panic("unsupported outputstyle: " + outputStyle)
err := globalMeasure.measurer.Output(w)
if err != nil {
panic("failed to write output: " + err.Error())
}
}

func (m *measurement) info() map[string]ycsb.MeasurementInfo {
m.RLock()
defer m.RUnlock()

opMeasurementInfo := make(map[string]ycsb.MeasurementInfo, len(m.opMeasurement))
for op, opM := range m.opMeasurement {
opMeasurementInfo[op] = opM.Info()
err = w.Flush()
if err != nil {
panic("failed to flush output: " + err.Error())
}
return opMeasurementInfo
}

func (m *measurement) getOpName() []string {
func (m *measurement) summary() {
m.RLock()
defer m.RUnlock()

res := make([]string, 0, len(m.opMeasurement))
for op := range m.opMeasurement {
res = append(res, op)
}
return res
globalMeasure.measurer.Summary()
m.RUnlock()
}

// InitMeasure initializes the global measurement.
func InitMeasure(p *properties.Properties) {
globalMeasure = new(measurement)
globalMeasure.p = p
globalMeasure.opMeasurement = make(map[string]ycsb.Measurement, 16)
measurementType := p.GetString(prop.MeasurementType, prop.MeasurementTypeDefault)
switch measurementType {
case "histogram":
globalMeasure.measurer = InitHistograms(p)
case "raw", "csv":
globalMeasure.measurer = InitCSV()
default:
panic("unsupported measurement type: " + measurementType)
}
EnableWarmUp(p.GetInt64(prop.WarmUpTime, 0) > 0)
}

// Output prints the measurement summary.
// Output prints the complete measurements.
func Output() {
globalMeasure.output()
}

// Summary prints the measurement summary.
func Summary() {
globalMeasure.summary()
}

// EnableWarmUp sets whether to enable warm-up.
func EnableWarmUp(b bool) {
if b {
Expand All @@ -131,22 +116,11 @@ func IsWarmUpFinished() bool {
}

// Measure measures the operation.
func Measure(op string, lan time.Duration) {
func Measure(op string, start time.Time, lan time.Duration) {
if IsWarmUpFinished() {
globalMeasure.measure(op, lan)
globalMeasure.measure(op, start, lan)
}
}

// Info returns all the operations MeasurementInfo.
// The key of returned map is the operation name.
func Info() map[string]ycsb.MeasurementInfo {
return globalMeasure.info()
}

// GetOpNames returns a string slice which contains all the operation name measured.
func GetOpNames() []string {
return globalMeasure.getOpName()
}

var globalMeasure *measurement
var warmUp int32 // use as bool, 1 means in warmup progress, 0 means warmup finished.
4 changes: 4 additions & 0 deletions pkg/prop/prop.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ const (

LogInterval = "measurement.interval"

MeasurementType = "measurementtype"
MeasurementTypeDefault = "histogram"
MeasurementRawOutputFile = "measurement.output_file"

Command = "command"

OutputStyle = "outputstyle"
Expand Down
Loading

0 comments on commit 4d727a7

Please sign in to comment.