Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Adds support for streaming collectors #1632

Merged
merged 1 commit into from
Jun 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/snaptel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func (t *task) setScheduleFromCliOptions(ctx *cli.Context) error {
}
duration = &d
}

// It's a streaming collector
if strings.Compare(t.Schedule.Type, "streaming") == 0 {
return nil
}

// Grab the interval for the schedule (if one was provided). Note that if an
// interval value was not passed in and there is no interval defined for the
// schedule associated with this task, it's an error
Expand Down
23 changes: 20 additions & 3 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
if !security.TLSEnabled && resp.Meta.TLSEnabled {
return nil, errors.New(ErrMsgInsecureClient + "; plugin_name: " + resp.Meta.Name)
}
if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType {
if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType && resp.Type != plugin.StreamCollectorPluginType {
return nil, strategy.ErrBadType
}
ap := &availablePlugin{
Expand Down Expand Up @@ -172,6 +172,20 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
default:
return nil, errors.New("Invalid RPCTYPE")
}
case plugin.StreamCollectorPluginType:
switch resp.Meta.RPCType {
case plugin.STREAMGRPC:
c, e := client.NewStreamCollectorGrpcClient(
resp.ListenAddress,
DefaultClientTimeout,
security)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
default:
return nil, errors.New("Invalid RPCTYPE")
}
default:
return nil, errors.New("Cannot create a client for a plugin of the type: " + resp.Type.String())
}
Expand Down Expand Up @@ -279,7 +293,10 @@ func (a *availablePlugin) Kill(r string) error {
c.Killed()
}

return a.ePlugin.Kill()
if a.ePlugin != nil {
return a.ePlugin.Kill()
}
return nil
}

// CheckHealth checks the health of a plugin and updates
Expand Down Expand Up @@ -365,7 +382,7 @@ func newAvailablePlugins() *availablePlugins {
}

func (ap *availablePlugins) insert(pl *availablePlugin) error {
if pl.pluginType != plugin.CollectorPluginType && pl.pluginType != plugin.ProcessorPluginType && pl.pluginType != plugin.PublisherPluginType {
if pl.pluginType != plugin.CollectorPluginType && pl.pluginType != plugin.ProcessorPluginType && pl.pluginType != plugin.PublisherPluginType && pl.pluginType != plugin.StreamCollectorPluginType {
return strategy.ErrBadType
}

Expand Down
8 changes: 4 additions & 4 deletions control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,12 @@ func (p *pluginConfig) deletePluginConfigDataNodeFieldAll(key string) {
}

func (p *pluginConfig) switchPluginConfigType(pluginType core.PluginType) *pluginTypeConfigItem {
switch pluginType {
case core.CollectorPluginType:
switch {
case pluginType == core.CollectorPluginType || pluginType == core.StreamingCollectorPluginType:
return p.Collector
case core.ProcessorPluginType:
case pluginType == core.ProcessorPluginType:
return p.Processor
case core.PublisherPluginType:
case pluginType == core.PublisherPluginType:
return p.Publisher
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,8 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged
return nil
}

func (p *pluginControl) ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, configTree *cdata.ConfigDataTree) []serror.SnapError {
return p.subscriptionGroups.ValidateDeps(requested, plugins, configTree)
func (p *pluginControl) ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) []serror.SnapError {
return p.subscriptionGroups.ValidateDeps(requested, plugins, configTree, asserts...)
}

// SubscribeDeps will subscribe to collectors, processors and publishers. The collectors are subscribed by mapping the provided
Expand Down
35 changes: 33 additions & 2 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -468,14 +469,44 @@ func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan
if values != nil {
maxCollectDuration, ok := values["MaxCollectDuration"]
if ok {
t, ok := maxCollectDuration.(*ctypes.ConfigValueInt)
t, ok := maxCollectDuration.(ctypes.ConfigValueInt)
if ok {
// MaxCollectDuration was passed as an int therefore
// it is representing nanoseconds
arg.MaxCollectDuration = int64(t.Value)
} else {
t, ok := maxCollectDuration.(ctypes.ConfigValueStr)
if ok {
// MaxCollectDuration was passed as a string therefore
// it should be a string rep of a duration
dur, err := time.ParseDuration(t.Value)
if err != nil {
log.WithFields(
log.Fields{
"_block": "StreamMetrics",
"config-key": "MaxCollectDuration",
"hint": "value should be a parsable duration (e.g. 5s)",
"error": err.Error(),
},
).Warn("invalid config value")
}
arg.MaxCollectDuration = dur.Nanoseconds()
} else {
log.WithFields(
log.Fields{
"_block": "StreamMetrics",
"config-key": "MaxCollectDuration",
"type-provided": reflect.TypeOf(maxCollectDuration).String(),
"type-wanted": ctypes.ConfigValueStr{}.Type(),
"hint": "value should be a parsable duration (e.g. 5s)",
},
).Warn("wrong config value type")
}
}
}
maxMetricsBuffer, ok := values["MaxMetricsBuffer"]
if ok {
t, ok := maxMetricsBuffer.(*ctypes.ConfigValueInt)
t, ok := maxMetricsBuffer.(ctypes.ConfigValueInt)
if ok {
arg.MaxMetricsBuffer = int64(t.Value)
}
Expand Down
2 changes: 1 addition & 1 deletion control/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
"collector",
"processor",
"publisher",
"streamCollector",
"streaming-collector",
}

routingStrategyTypes = [...]string{
Expand Down
12 changes: 7 additions & 5 deletions control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
lPlugin.LoadedTime = time.Now()
lPlugin.State = LoadedState

if resp.Type == plugin.CollectorPluginType {
if resp.Type == plugin.CollectorPluginType || resp.Type == plugin.StreamCollectorPluginType {
cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version)

if lPlugin.ConfigPolicy != nil {
Expand Down Expand Up @@ -543,9 +543,11 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
metricTypes, err := colClient.GetMetricTypes(cfg)
if err != nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"plugin-type": "collector",
"error": err.Error(),
"_block": "load-plugin",
"plugin-type": resp.Type.String(),
"error": err.Error(),
"plugin-name": ap.Name(),
"plugin-version": ap.Version(),
}).Error("error in getting metric types")
resultChan <- result{nil, serror.New(err)}
return
Expand Down Expand Up @@ -714,7 +716,7 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap
p.loadedPlugins.remove(plugin.Key())

// Remove any metrics from the catalog if this was a collector
if plugin.TypeName() == "collector" {
if plugin.TypeName() == core.CollectorPluginType.String() || plugin.TypeName() == core.StreamingCollectorPluginType.String() {
p.metricCatalog.RmUnloadedPluginMetrics(plugin)
}

Expand Down
30 changes: 11 additions & 19 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,27 +422,19 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID
return errors.New("pool not found")
}
if pool.SubscriptionCount() < pool.Count() {
lp, err := r.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion))
if lp != nil && lp.Details.Uri != nil {
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-unsubscription",
"pool-count": pool.Count(),
"pool-subscription-count": pool.SubscriptionCount(),
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
"error": err.Error(),
}).Error("unable to get loaded plugin")
}
_, err := r.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion))
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-unsubscription",
"plugin-uri": lp.Details.Uri,
}).Debug(fmt.Sprintf("unsubscribe called on standalone plugin"))
pool.SelectAndStop(taskID, "remote unsubscription event")
} else {
pool.SelectAndKill(taskID, "unsubscription event")
"_block": "handle-unsubscription",
"pool-count": pool.Count(),
"pool-subscription-count": pool.SubscriptionCount(),
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
"error": err.Error(),
}).Error("unable to get loaded plugin")
}
pool.SelectAndKill(taskID, "unsubscription event")
}
return nil
}
Expand Down
24 changes: 1 addition & 23 deletions control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type Pool interface {
Plugins() MapAvailablePlugin
RLock()
RUnlock()
SelectAndStop(taskID, reason string)
SelectAndKill(taskID, reason string)
SelectAP(taskID string, configID map[string]ctypes.ConfigValue) (AvailablePlugin, serror.SnapError)
Strategy() RoutingAndCaching
Expand Down Expand Up @@ -181,7 +180,7 @@ func (p *pool) IncRestartCount() {

// Insert inserts an AvailablePlugin into the pool
func (p *pool) Insert(a AvailablePlugin) error {
if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType {
if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType && a.Type() != plugin.StreamCollectorPluginType {
return ErrBadType
}
// If an empty pool is created, it does not have
Expand Down Expand Up @@ -306,27 +305,6 @@ func (p *pool) KillAll(reason string) {
}
}

// SelectAndStop selects, stops and removes the available plugin from the pool
func (p *pool) SelectAndStop(id, reason string) {
rp, err := p.Remove(p.plugins.Values(), id)
if err != nil {
log.WithFields(log.Fields{
"_block": "SelectAndStop",
"taskID": id,
"reason": reason,
}).Error(err)
return
}
if err := rp.Stop(reason); err != nil {
log.WithFields(log.Fields{
"_block": "SelectAndStop",
"taskID": id,
"reason": reason,
}).Error(err)
}
p.remove(rp.ID())
}

// SelectAndKill selects, kills and removes the available plugin from the pool
func (p *pool) SelectAndKill(id, reason string) {
rp, err := p.Remove(p.plugins.Values(), id)
Expand Down
14 changes: 12 additions & 2 deletions control/subscription_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type ManagesSubscriptionGroups interface {
Remove(id string) []serror.SnapError
ValidateDeps(requested []core.RequestedMetric,
plugins []core.SubscribedPlugin,
configTree *cdata.ConfigDataTree) (serrs []serror.SnapError)
configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) (serrs []serror.SnapError)
validateMetric(metric core.Metric) (serrs []serror.SnapError)
}

Expand Down Expand Up @@ -203,14 +203,24 @@ func (s *subscriptionGroups) Process() (errs []serror.SnapError) {

func (s *subscriptionGroups) ValidateDeps(requested []core.RequestedMetric,
plugins []core.SubscribedPlugin,
configTree *cdata.ConfigDataTree) (serrs []serror.SnapError) {
configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) (serrs []serror.SnapError) {

// resolve requested metrics and map to collectors
pluginToMetricMap, collectors, errs := s.getMetricsAndCollectors(requested, configTree)
if errs != nil {
serrs = append(serrs, errs...)
}

// Validate if schedule type is streaming and we have a non-streaming plugin or vice versa
for _, assert := range asserts {
if serr := assert(collectors); serr != nil {
serrs = append(serrs, serr)
}
}
if len(serrs) > 0 {
return serrs
}

// validateMetricsTypes
for _, pmt := range pluginToMetricMap {
for _, mt := range pmt.Metrics() {
Expand Down
13 changes: 10 additions & 3 deletions core/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/asaskevich/govalidator"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/pkg/fileutils"
)

Expand All @@ -48,9 +49,10 @@ type PluginType int

func ToPluginType(name string) (PluginType, error) {
pts := map[string]PluginType{
"collector": 0,
"processor": 1,
"publisher": 2,
"collector": 0,
"processor": 1,
"publisher": 2,
"streaming-collector": 3,
}
t, ok := pts[name]
if !ok {
Expand All @@ -64,6 +66,7 @@ func CheckPluginType(id PluginType) bool {
0: "collector",
1: "processor",
2: "publisher",
3: "streaming-collector",
}

_, ok := pts[id]
Expand All @@ -86,6 +89,7 @@ func (pt PluginType) String() string {
"collector",
"processor",
"publisher",
"streaming-collector",
}[pt]
}

Expand All @@ -94,6 +98,7 @@ const (
CollectorPluginType PluginType = iota
ProcessorPluginType
PublisherPluginType
StreamingCollectorPluginType
)

type AvailablePlugin interface {
Expand Down Expand Up @@ -125,6 +130,8 @@ type SubscribedPlugin interface {
Config() *cdata.ConfigDataNode
}

type SubscribedPluginAssert func(plugins []SubscribedPlugin) serror.SnapError

type RequestedPlugin struct {
path string
checkSum [sha256.Size]byte
Expand Down
Loading