-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add storage based ingester to m3coordinator #1038
Changes from 4 commits
85c233c
4fc8a72
0370602
43f75c0
9cef1e5
c915fc3
670939d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
// Copyright (c) 2018 Uber Technologies, Inc. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package ingest | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
|
||
import ( | ||
"github.com/m3db/m3/src/query/storage" | ||
"github.com/m3db/m3/src/x/serialize" | ||
"github.com/m3db/m3x/instrument" | ||
"github.com/m3db/m3x/pool" | ||
"github.com/m3db/m3x/retry" | ||
xsync "github.com/m3db/m3x/sync" | ||
) | ||
|
||
// Configuration configs the ingester. | ||
type Configuration struct { | ||
WorkerPoolSize int `yaml:"workerPoolSize"` | ||
OpPool pool.ObjectPoolConfiguration `yaml:"opPool"` | ||
Retry retry.Configuration `yaml:"retry"` | ||
} | ||
|
||
// NewIngester creates an ingester with an appender. | ||
func (cfg Configuration) NewIngester( | ||
appender storage.Appender, | ||
instrumentOptions instrument.Options, | ||
) (*Ingester, error) { | ||
opts, err := cfg.newOptions(appender, instrumentOptions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return NewIngester(*opts), nil | ||
} | ||
|
||
func (cfg Configuration) newOptions( | ||
appender storage.Appender, | ||
instrumentOptions instrument.Options, | ||
) (*Options, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since you need to derefence this pointer later now for use in the constructor, perhaps make this return type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, was trying to avoid returning |
||
scope := instrumentOptions.MetricsScope().Tagged( | ||
map[string]string{"component": "ingester"}, | ||
) | ||
workers, err := xsync.NewPooledWorkerPool( | ||
cfg.WorkerPoolSize, | ||
xsync.NewPooledWorkerPoolOptions(). | ||
SetInstrumentOptions(instrumentOptions), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
workers.Init() | ||
tagDecoderPool := serialize.NewTagDecoderPool( | ||
serialize.NewTagDecoderOptions(), | ||
pool.NewObjectPoolOptions(). | ||
SetInstrumentOptions(instrumentOptions. | ||
SetMetricsScope(instrumentOptions.MetricsScope(). | ||
SubScope("tag-decoder-pool"))), | ||
) | ||
tagDecoderPool.Init() | ||
opts := Options{ | ||
Appender: appender, | ||
Workers: workers, | ||
PoolOptions: cfg.OpPool.NewObjectPoolOptions(instrumentOptions), | ||
TagDecoderPool: tagDecoderPool, | ||
RetryOptions: cfg.Retry.NewOptions(scope), | ||
InstrumentOptions: instrumentOptions, | ||
} | ||
return &opts, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
// Copyright (c) 2018 Uber Technologies, Inc. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package ingest | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" | ||
"github.com/m3db/m3/src/query/models" | ||
"github.com/m3db/m3/src/query/storage" | ||
"github.com/m3db/m3/src/query/ts" | ||
"github.com/m3db/m3/src/x/common" | ||
"github.com/m3db/m3/src/x/serialize" | ||
"github.com/m3db/m3metrics/metric/id" | ||
"github.com/m3db/m3metrics/policy" | ||
xerrors "github.com/m3db/m3x/errors" | ||
"github.com/m3db/m3x/instrument" | ||
"github.com/m3db/m3x/pool" | ||
"github.com/m3db/m3x/retry" | ||
xsync "github.com/m3db/m3x/sync" | ||
|
||
"github.com/uber-go/tally" | ||
) | ||
|
||
// Options configures the ingester. | ||
type Options struct { | ||
Appender storage.Appender | ||
Workers xsync.PooledWorkerPool | ||
PoolOptions pool.ObjectPoolOptions | ||
TagDecoderPool serialize.TagDecoderPool | ||
RetryOptions retry.Options | ||
InstrumentOptions instrument.Options | ||
} | ||
|
||
type ingestMetrics struct { | ||
ingestError tally.Counter | ||
ingestSuccess tally.Counter | ||
} | ||
|
||
func newIngestMetrics(scope tally.Scope) ingestMetrics { | ||
return ingestMetrics{ | ||
ingestError: scope.Counter("ingest-error"), | ||
ingestSuccess: scope.Counter("ingest-success"), | ||
} | ||
} | ||
|
||
// Ingester ingests metrics with a worker pool. | ||
type Ingester struct { | ||
workers xsync.PooledWorkerPool | ||
p pool.ObjectPool | ||
} | ||
|
||
// NewIngester creates an ingester. | ||
func NewIngester( | ||
opts Options, | ||
) *Ingester { | ||
retrier := retry.NewRetrier(opts.RetryOptions) | ||
m := newIngestMetrics(opts.InstrumentOptions.MetricsScope()) | ||
p := pool.NewObjectPool(opts.PoolOptions) | ||
p.Init( | ||
func() interface{} { | ||
// NB: we don't need a pool for the tag decoder since the ops are | ||
// pooled, but currently this is the only way to get tag decoder. | ||
tagDecoder := opts.TagDecoderPool.Get() | ||
op := ingestOp{ | ||
s: opts.Appender, | ||
r: retrier, | ||
it: serialize.NewMetricTagsIterator(tagDecoder, nil), | ||
p: p, | ||
m: m, | ||
} | ||
op.attemptFn = op.attempt | ||
op.ingestFn = op.ingest | ||
return &op | ||
}, | ||
) | ||
return &Ingester{ | ||
workers: opts.Workers, | ||
p: p, | ||
} | ||
} | ||
|
||
// Ingest ingests a metric asynchronously with callback. | ||
func (i *Ingester) Ingest( | ||
ctx context.Context, | ||
id []byte, | ||
metricTime time.Time, | ||
value float64, | ||
sp policy.StoragePolicy, | ||
callback *m3msg.RefCountedCallback, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add We don't want to use the TODO context each time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, the m3msg server will only be able to pass a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I think that's ideal, in the future if we have a per-op context then we'll be able to use that. Otherwise here it'll be a single one that can never be specialized. We should at least let the callers into this layer be able to specify their own. |
||
) { | ||
op := i.p.Get().(*ingestOp) | ||
op.c = ctx | ||
op.id = id | ||
op.metricTime = metricTime | ||
op.value = value | ||
op.sp = sp | ||
op.callback = callback | ||
i.workers.Go(op.ingestFn) | ||
} | ||
|
||
type ingestOp struct { | ||
s storage.Appender | ||
r retry.Retrier | ||
it id.SortedTagIterator | ||
p pool.ObjectPool | ||
m ingestMetrics | ||
attemptFn retry.Fn | ||
ingestFn func() | ||
|
||
c context.Context | ||
id []byte | ||
metricTime time.Time | ||
value float64 | ||
sp policy.StoragePolicy | ||
callback *m3msg.RefCountedCallback | ||
q storage.WriteQuery | ||
} | ||
|
||
func (op *ingestOp) ingest() { | ||
if err := op.resetWriteQuery(); err != nil { | ||
op.m.ingestError.Inc(1) | ||
op.callback.Callback(m3msg.OnRetriableError) | ||
op.p.Put(op) | ||
return | ||
} | ||
if err := op.r.Attempt(op.attemptFn); err != nil { | ||
if xerrors.IsNonRetryableError(err) { | ||
op.callback.Callback(m3msg.OnNonRetriableError) | ||
} else { | ||
op.callback.Callback(m3msg.OnRetriableError) | ||
} | ||
op.m.ingestError.Inc(1) | ||
op.p.Put(op) | ||
return | ||
} | ||
op.m.ingestSuccess.Inc(1) | ||
op.callback.Callback(m3msg.OnSuccess) | ||
op.p.Put(op) | ||
} | ||
|
||
func (op *ingestOp) attempt() error { | ||
return op.s.Write(op.c, &op.q) | ||
} | ||
|
||
func (op *ingestOp) resetWriteQuery() error { | ||
if err := op.resetTags(); err != nil { | ||
return err | ||
} | ||
op.resetDataPoints() | ||
op.q.Unit = common.SanitizeUnitForM3DB(op.sp.Resolution().Precision) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great 👍 |
||
op.q.Attributes.MetricsType = storage.AggregatedMetricsType | ||
op.q.Attributes.Resolution = op.sp.Resolution().Window | ||
op.q.Attributes.Retention = op.sp.Retention().Duration() | ||
return nil | ||
} | ||
|
||
func (op *ingestOp) resetTags() error { | ||
op.it.Reset(op.id) | ||
op.q.Tags = op.q.Tags[:0] | ||
for op.it.Next() { | ||
name, value := op.it.Current() | ||
op.q.Tags = append(op.q.Tags, models.Tag{ | ||
Name: append([]byte(nil), name...), | ||
Value: append([]byte(nil), value...), | ||
}) | ||
} | ||
return op.it.Err() | ||
} | ||
|
||
func (op *ingestOp) resetDataPoints() { | ||
if len(op.q.Datapoints) != 1 { | ||
op.q.Datapoints = make(ts.Datapoints, 1) | ||
} | ||
op.q.Datapoints[0].Timestamp = op.metricTime | ||
op.q.Datapoints[0].Value = op.value | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will have pretty poor compression for things that can be written in with second precision.
Could we please do at least something here where we try to find the best? i.e.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update
flush_handler.go
to use the new method you created?common.SanitizeUnitForM3DB(mp.StoragePolicy.Resolution().Precision)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, forgot about this