From 0050af4be3b93fb1276db71ccffca830a5940743 Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Mon, 22 May 2017 16:22:32 -0700 Subject: [PATCH] Adds support for streaming collectors - Adds streaming support - Updates deps - Fixes #1650 --- cmd/snaptel/task.go | 6 +++ control/available_plugin.go | 23 +++++++++-- control/config.go | 8 ++-- control/control.go | 4 +- control/plugin/client/grpc.go | 35 +++++++++++++++- control/plugin/plugin.go | 2 +- control/plugin_manager.go | 12 +++--- control/runner.go | 30 +++++--------- control/strategy/pool.go | 24 +---------- control/subscription_group.go | 14 ++++++- core/plugin.go | 13 ++++-- glide.lock | 16 ++++---- grpc/controlproxy/controlproxy.go | 2 +- scheduler/scheduler.go | 65 +++++++++++++++++++++++++++++- scheduler/scheduler_medium_test.go | 2 +- scheduler/scheduler_test.go | 2 +- scheduler/task.go | 3 ++ scheduler/task_test.go | 2 +- 18 files changed, 185 insertions(+), 78 deletions(-) diff --git a/cmd/snaptel/task.go b/cmd/snaptel/task.go index 73a8c01b2..0a0be787d 100644 --- a/cmd/snaptel/task.go +++ b/cmd/snaptel/task.go @@ -241,6 +241,12 @@ func (t *task) setScheduleFromCliOptions(ctx *cli.Context) error { } duration = &d } + + // It's a streaming collector + if strings.Compare(t.Schedule.Type, "streaming") == 0 { + return nil + } + // Grab the interval for the schedule (if one was provided). Note that if an // interval value was not passed in and there is no interval defined for the // schedule associated with this task, it's an error diff --git a/control/available_plugin.go b/control/available_plugin.go index 5e151b5af..ea838ebac 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -88,7 +88,7 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab if !security.TLSEnabled && resp.Meta.TLSEnabled { return nil, errors.New(ErrMsgInsecureClient + "; plugin_name: " + resp.Meta.Name) } - if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType { + if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType && resp.Type != plugin.StreamCollectorPluginType { return nil, strategy.ErrBadType } ap := &availablePlugin{ @@ -172,6 +172,20 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab default: return nil, errors.New("Invalid RPCTYPE") } + case plugin.StreamCollectorPluginType: + switch resp.Meta.RPCType { + case plugin.STREAMGRPC: + c, e := client.NewStreamCollectorGrpcClient( + resp.ListenAddress, + DefaultClientTimeout, + security) + if e != nil { + return nil, errors.New("error while creating client connection: " + e.Error()) + } + ap.client = c + default: + return nil, errors.New("Invalid RPCTYPE") + } default: return nil, errors.New("Cannot create a client for a plugin of the type: " + resp.Type.String()) } @@ -279,7 +293,10 @@ func (a *availablePlugin) Kill(r string) error { c.Killed() } - return a.ePlugin.Kill() + if a.ePlugin != nil { + return a.ePlugin.Kill() + } + return nil } // CheckHealth checks the health of a plugin and updates @@ -365,7 +382,7 @@ func newAvailablePlugins() *availablePlugins { } func (ap *availablePlugins) insert(pl *availablePlugin) error { - if pl.pluginType != plugin.CollectorPluginType && pl.pluginType != plugin.ProcessorPluginType && pl.pluginType != plugin.PublisherPluginType { + if pl.pluginType != plugin.CollectorPluginType && pl.pluginType != plugin.ProcessorPluginType && pl.pluginType != plugin.PublisherPluginType && pl.pluginType != plugin.StreamCollectorPluginType { return strategy.ErrBadType } diff --git a/control/config.go b/control/config.go index 3e8ab0628..8c0d83eb3 100644 --- a/control/config.go +++ b/control/config.go @@ -331,12 +331,12 @@ func (p *pluginConfig) deletePluginConfigDataNodeFieldAll(key string) { } func (p *pluginConfig) switchPluginConfigType(pluginType core.PluginType) *pluginTypeConfigItem { - switch pluginType { - case core.CollectorPluginType: + switch { + case pluginType == core.CollectorPluginType || pluginType == core.StreamingCollectorPluginType: return p.Collector - case core.ProcessorPluginType: + case pluginType == core.ProcessorPluginType: return p.Processor - case core.PublisherPluginType: + case pluginType == core.PublisherPluginType: return p.Publisher } return nil diff --git a/control/control.go b/control/control.go index 783fe0f24..3501d385b 100644 --- a/control/control.go +++ b/control/control.go @@ -714,8 +714,8 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged return nil } -func (p *pluginControl) ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, configTree *cdata.ConfigDataTree) []serror.SnapError { - return p.subscriptionGroups.ValidateDeps(requested, plugins, configTree) +func (p *pluginControl) ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) []serror.SnapError { + return p.subscriptionGroups.ValidateDeps(requested, plugins, configTree, asserts...) } // SubscribeDeps will subscribe to collectors, processors and publishers. The collectors are subscribed by mapping the provided diff --git a/control/plugin/client/grpc.go b/control/plugin/client/grpc.go index 99eca3597..ebb0d5ad3 100644 --- a/control/plugin/client/grpc.go +++ b/control/plugin/client/grpc.go @@ -27,6 +27,7 @@ import ( "io/ioutil" "os" "path/filepath" + "reflect" "strconv" "strings" "time" @@ -468,14 +469,44 @@ func (g *grpcClient) StreamMetrics(mts []core.Metric) (chan []core.Metric, chan if values != nil { maxCollectDuration, ok := values["MaxCollectDuration"] if ok { - t, ok := maxCollectDuration.(*ctypes.ConfigValueInt) + t, ok := maxCollectDuration.(ctypes.ConfigValueInt) if ok { + // MaxCollectDuration was passed as an int therefore + // it is representing nanoseconds arg.MaxCollectDuration = int64(t.Value) + } else { + t, ok := maxCollectDuration.(ctypes.ConfigValueStr) + if ok { + // MaxCollectDuration was passed as a string therefore + // it should be a string rep of a duration + dur, err := time.ParseDuration(t.Value) + if err != nil { + log.WithFields( + log.Fields{ + "_block": "StreamMetrics", + "config-key": "MaxCollectDuration", + "hint": "value should be a parsable duration (e.g. 5s)", + "error": err.Error(), + }, + ).Warn("invalid config value") + } + arg.MaxCollectDuration = dur.Nanoseconds() + } else { + log.WithFields( + log.Fields{ + "_block": "StreamMetrics", + "config-key": "MaxCollectDuration", + "type-provided": reflect.TypeOf(maxCollectDuration).String(), + "type-wanted": ctypes.ConfigValueStr{}.Type(), + "hint": "value should be a parsable duration (e.g. 5s)", + }, + ).Warn("wrong config value type") + } } } maxMetricsBuffer, ok := values["MaxMetricsBuffer"] if ok { - t, ok := maxMetricsBuffer.(*ctypes.ConfigValueInt) + t, ok := maxMetricsBuffer.(ctypes.ConfigValueInt) if ok { arg.MaxMetricsBuffer = int64(t.Value) } diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index 14c0371af..e1c34eabe 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -91,7 +91,7 @@ var ( "collector", "processor", "publisher", - "streamCollector", + "streaming-collector", } routingStrategyTypes = [...]string{ diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 3ad525a7a..3ea3a3903 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -491,7 +491,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter lPlugin.LoadedTime = time.Now() lPlugin.State = LoadedState - if resp.Type == plugin.CollectorPluginType { + if resp.Type == plugin.CollectorPluginType || resp.Type == plugin.StreamCollectorPluginType { cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version) if lPlugin.ConfigPolicy != nil { @@ -543,9 +543,11 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter metricTypes, err := colClient.GetMetricTypes(cfg) if err != nil { pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "plugin-type": "collector", - "error": err.Error(), + "_block": "load-plugin", + "plugin-type": resp.Type.String(), + "error": err.Error(), + "plugin-name": ap.Name(), + "plugin-version": ap.Version(), }).Error("error in getting metric types") resultChan <- result{nil, serror.New(err)} return @@ -714,7 +716,7 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap p.loadedPlugins.remove(plugin.Key()) // Remove any metrics from the catalog if this was a collector - if plugin.TypeName() == "collector" { + if plugin.TypeName() == core.CollectorPluginType.String() || plugin.TypeName() == core.StreamingCollectorPluginType.String() { p.metricCatalog.RmUnloadedPluginMetrics(plugin) } diff --git a/control/runner.go b/control/runner.go index c7c08e77c..cf0751440 100644 --- a/control/runner.go +++ b/control/runner.go @@ -422,27 +422,19 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID return errors.New("pool not found") } if pool.SubscriptionCount() < pool.Count() { - lp, err := r.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion)) - if lp != nil && lp.Details.Uri != nil { - if err != nil { - runnerLog.WithFields(log.Fields{ - "_block": "handle-unsubscription", - "pool-count": pool.Count(), - "pool-subscription-count": pool.SubscriptionCount(), - "plugin-name": pName, - "plugin-version": pVersion, - "plugin-type": pType, - "error": err.Error(), - }).Error("unable to get loaded plugin") - } + _, err := r.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion)) + if err != nil { runnerLog.WithFields(log.Fields{ - "_block": "handle-unsubscription", - "plugin-uri": lp.Details.Uri, - }).Debug(fmt.Sprintf("unsubscribe called on standalone plugin")) - pool.SelectAndStop(taskID, "remote unsubscription event") - } else { - pool.SelectAndKill(taskID, "unsubscription event") + "_block": "handle-unsubscription", + "pool-count": pool.Count(), + "pool-subscription-count": pool.SubscriptionCount(), + "plugin-name": pName, + "plugin-version": pVersion, + "plugin-type": pType, + "error": err.Error(), + }).Error("unable to get loaded plugin") } + pool.SelectAndKill(taskID, "unsubscription event") } return nil } diff --git a/control/strategy/pool.go b/control/strategy/pool.go index b061c2a51..397fe4b39 100644 --- a/control/strategy/pool.go +++ b/control/strategy/pool.go @@ -59,7 +59,6 @@ type Pool interface { Plugins() MapAvailablePlugin RLock() RUnlock() - SelectAndStop(taskID, reason string) SelectAndKill(taskID, reason string) SelectAP(taskID string, configID map[string]ctypes.ConfigValue) (AvailablePlugin, serror.SnapError) Strategy() RoutingAndCaching @@ -181,7 +180,7 @@ func (p *pool) IncRestartCount() { // Insert inserts an AvailablePlugin into the pool func (p *pool) Insert(a AvailablePlugin) error { - if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType { + if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType && a.Type() != plugin.StreamCollectorPluginType { return ErrBadType } // If an empty pool is created, it does not have @@ -306,27 +305,6 @@ func (p *pool) KillAll(reason string) { } } -// SelectAndStop selects, stops and removes the available plugin from the pool -func (p *pool) SelectAndStop(id, reason string) { - rp, err := p.Remove(p.plugins.Values(), id) - if err != nil { - log.WithFields(log.Fields{ - "_block": "SelectAndStop", - "taskID": id, - "reason": reason, - }).Error(err) - return - } - if err := rp.Stop(reason); err != nil { - log.WithFields(log.Fields{ - "_block": "SelectAndStop", - "taskID": id, - "reason": reason, - }).Error(err) - } - p.remove(rp.ID()) -} - // SelectAndKill selects, kills and removes the available plugin from the pool func (p *pool) SelectAndKill(id, reason string) { rp, err := p.Remove(p.plugins.Values(), id) diff --git a/control/subscription_group.go b/control/subscription_group.go index 9051a0d9c..a8ace4dca 100644 --- a/control/subscription_group.go +++ b/control/subscription_group.go @@ -59,7 +59,7 @@ type ManagesSubscriptionGroups interface { Remove(id string) []serror.SnapError ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, - configTree *cdata.ConfigDataTree) (serrs []serror.SnapError) + configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) (serrs []serror.SnapError) validateMetric(metric core.Metric) (serrs []serror.SnapError) } @@ -203,7 +203,7 @@ func (s *subscriptionGroups) Process() (errs []serror.SnapError) { func (s *subscriptionGroups) ValidateDeps(requested []core.RequestedMetric, plugins []core.SubscribedPlugin, - configTree *cdata.ConfigDataTree) (serrs []serror.SnapError) { + configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) (serrs []serror.SnapError) { // resolve requested metrics and map to collectors pluginToMetricMap, collectors, errs := s.getMetricsAndCollectors(requested, configTree) @@ -211,6 +211,16 @@ func (s *subscriptionGroups) ValidateDeps(requested []core.RequestedMetric, serrs = append(serrs, errs...) } + // Validate if schedule type is streaming and we have a non-streaming plugin or vice versa + for _, assert := range asserts { + if serr := assert(collectors); serr != nil { + serrs = append(serrs, serr) + } + } + if len(serrs) > 0 { + return serrs + } + // validateMetricsTypes for _, pmt := range pluginToMetricMap { for _, mt := range pmt.Metrics() { diff --git a/core/plugin.go b/core/plugin.go index b474376f1..466edd055 100644 --- a/core/plugin.go +++ b/core/plugin.go @@ -35,6 +35,7 @@ import ( "github.com/asaskevich/govalidator" "github.com/intelsdi-x/snap/control/plugin/cpolicy" "github.com/intelsdi-x/snap/core/cdata" + "github.com/intelsdi-x/snap/core/serror" "github.com/intelsdi-x/snap/pkg/fileutils" ) @@ -48,9 +49,10 @@ type PluginType int func ToPluginType(name string) (PluginType, error) { pts := map[string]PluginType{ - "collector": 0, - "processor": 1, - "publisher": 2, + "collector": 0, + "processor": 1, + "publisher": 2, + "streaming-collector": 3, } t, ok := pts[name] if !ok { @@ -64,6 +66,7 @@ func CheckPluginType(id PluginType) bool { 0: "collector", 1: "processor", 2: "publisher", + 3: "streaming-collector", } _, ok := pts[id] @@ -86,6 +89,7 @@ func (pt PluginType) String() string { "collector", "processor", "publisher", + "streaming-collector", }[pt] } @@ -94,6 +98,7 @@ const ( CollectorPluginType PluginType = iota ProcessorPluginType PublisherPluginType + StreamingCollectorPluginType ) type AvailablePlugin interface { @@ -125,6 +130,8 @@ type SubscribedPlugin interface { Config() *cdata.ConfigDataNode } +type SubscribedPluginAssert func(plugins []SubscribedPlugin) serror.SnapError + type RequestedPlugin struct { path string checkSum [sha256.Size]byte diff --git a/glide.lock b/glide.lock index d7ee21634..7b21789bc 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: 14712d189ae570ecb1e287d5a11120347c831b9b4bb6b2265eecb53acf52dc9d -updated: 2017-03-03T11:55:17.822848886-08:00 +updated: 2017-05-18T11:52:57.349827646-07:00 imports: - name: github.com/appc/spec version: ba99d6b8ccbbed2942e53eb5395fddae113cdf8e @@ -12,11 +12,11 @@ imports: - schema/types - schema/types/resource - name: github.com/armon/go-metrics - version: 3df31a1ada83e310c2e24b267c8e8b68836547b4 + version: 93f237eba9b0602f3e73710416558854a81d9337 - name: github.com/asaskevich/govalidator version: 9699ab6b38bee2e02cd3fe8b99ecf67665395c96 - name: github.com/coreos/go-semver - version: 6fe83ccda8fb9b7549c9ab4ba47f47858bc950aa + version: 294930c1e79c64e7dbe360054274fdad492c8cf5 subpackages: - semver - name: github.com/ghodss/yaml @@ -34,12 +34,12 @@ imports: - name: github.com/intelsdi-x/gomit version: db68f6fda248706a71980abc58e969fcd63f5ea6 - name: github.com/intelsdi-x/snap-plugin-lib-go - version: 3527311f5c8e6fe9b55fc1f44e1b515a30845e18 + version: d1d32a057c0c23169796016876ce89634b2c012f subpackages: - v1/plugin - v1/plugin/rpc - name: github.com/intelsdi-x/snap-plugin-utilities - version: 925bafffac36edcfaf4aff937dcb7d3d5659782d + version: 3c37e3965f3fc2f24714779d3bae7ba7032b87a9 subpackages: - str - name: github.com/julienschmidt/httprouter @@ -49,7 +49,7 @@ imports: - name: github.com/robfig/cron version: 32d9c273155a0506d27cf73dd1246e86a470997e - name: github.com/rs/cors - version: a62a804a8a009876ca59105f7899938a1349f4b3 + version: 2d7dd2a10331137ae3f931ba08c21fd00cbf208d - name: github.com/rs/xhandler version: ed27b6fd65218132ee50cd95f38474a3d8a2cd12 - name: github.com/Sirupsen/logrus @@ -63,7 +63,7 @@ imports: - name: github.com/vrischmann/jsonutil version: 694784f9315ee9fc763c1d30f28753cba21307aa - name: github.com/xeipuuv/gojsonpointer - version: e0fe6f68307607d540ed8eac07a342c33fa1b54a + version: 6fe8760cad3569743d51ddbb243b26f8456742dc - name: github.com/xeipuuv/gojsonreference version: e02fc20de94c78484cd5ffb007f8af96be030a45 - name: github.com/xeipuuv/gojsonschema @@ -93,7 +93,7 @@ imports: - lex/httplex - trace - name: golang.org/x/sys - version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 + version: e62c3de784db939836898e5c19ffd41bece347da subpackages: - unix - name: google.golang.org/grpc diff --git a/grpc/controlproxy/controlproxy.go b/grpc/controlproxy/controlproxy.go index df21c2576..3c52effae 100644 --- a/grpc/controlproxy/controlproxy.go +++ b/grpc/controlproxy/controlproxy.go @@ -146,7 +146,7 @@ func (c ControlProxy) StreamMetrics( } } -func (c ControlProxy) ValidateDeps(mts []core.RequestedMetric, plugins []core.SubscribedPlugin, _ *cdata.ConfigDataTree) []serror.SnapError { +func (c ControlProxy) ValidateDeps(mts []core.RequestedMetric, plugins []core.SubscribedPlugin, _ *cdata.ConfigDataTree, _ ...core.SubscribedPluginAssert) []serror.SnapError { // The configDataTree is kept so that we can match the interface provided by control // we do not need it here though since the configDataTree is only used for metrics // and we do not allow remote collection. diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 62644c851..55135a803 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -35,6 +35,7 @@ import ( "github.com/intelsdi-x/gomit" + "github.com/intelsdi-x/snap/control/plugin" "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" "github.com/intelsdi-x/snap/core/ctypes" @@ -67,6 +68,10 @@ var ( ErrTaskDisabledNotStoppable = errors.New("Task is disabled. Only running tasks can be stopped.") // ErrTaskEndedNotStoppable - The error message for when a task is ended and cannot be stopped ErrTaskEndedNotStoppable = errors.New("Task is ended. Only running tasks can be stopped.") + // ErrPluginIncompatibleWithScheduleType - The error message for when a streaming schedule type references a non streaming plugin or vice versa. + ErrPluginIncompatibleWithScheduleType = errors.New("Plugin is incompatible with the tasks schedule type.") + // ErrMultipleStreamingPlugins - The error message when a task with a streaming schedule refers to multiple streaming plugins. + ErrMultipleStreamingPlugins = errors.New("Multiple streaming plugins within the same task is not supported.") ) type schedulerState int @@ -93,7 +98,7 @@ type managesMetrics interface { publishesMetrics processesMetrics GetAutodiscoverPaths() []string - ValidateDeps([]core.RequestedMetric, []core.SubscribedPlugin, *cdata.ConfigDataTree) []serror.SnapError + ValidateDeps([]core.RequestedMetric, []core.SubscribedPlugin, *cdata.ConfigDataTree, ...core.SubscribedPluginAssert) []serror.SnapError SubscribeDeps(string, []core.RequestedMetric, []core.SubscribedPlugin, *cdata.ConfigDataTree) []serror.SnapError UnsubscribeDeps(string) []serror.SnapError } @@ -337,17 +342,73 @@ func (s *scheduler) createTask(sch schedule.Schedule, wfMap *wmap.WorkflowMap, s return nil, te } + // subscribedPluginAsserts includes rules that need to be evaluated once we + // have mapped the metrics to specific collector plugins. Examples include + // asserting that streaming tasks don't reference non-streaming collectors. + subscribedPluginAsserts := []core.SubscribedPluginAssert{} // Group dependencies by the node they live on // and validate them. depGroups := getWorkflowPlugins(wf.processNodes, wf.publishNodes, wf.metrics) for k, group := range depGroups { + + // populate subscribedPluginAsserts + switch sch.(type) { + case *schedule.StreamingSchedule: + // assert no non-streaming plugins + subscribedPluginAsserts = append(subscribedPluginAsserts, func(plugins []core.SubscribedPlugin) serror.SnapError { + for _, plg := range plugins { + if plg.TypeName() != plugin.StreamCollectorPluginType.String() { + return serror.New( + ErrPluginIncompatibleWithScheduleType, + map[string]interface{}{ + "schedule_type": fmt.Sprintf("%T", sch), + "plugin_name": plg.Name(), + "plugin_type": plg.TypeName(), + }, + ) + } + } + return nil + }) + // assert only a single streaming plugin + subscribedPluginAsserts = append(subscribedPluginAsserts, func(plugins []core.SubscribedPlugin) serror.SnapError { + if len(plugins) > 1 { + return serror.New( + ErrMultipleStreamingPlugins, + map[string]interface{}{ + "schedule_type": fmt.Sprintf("%T", sch), + "num_of_collectors": len(plugins), + }, + ) + } + return nil + }) + default: + // assert no streaming plugins + subscribedPluginAsserts = append(subscribedPluginAsserts, func(plugins []core.SubscribedPlugin) serror.SnapError { + for _, plg := range plugins { + if plg.TypeName() == plugin.StreamCollectorPluginType.String() { + return serror.New( + ErrPluginIncompatibleWithScheduleType, + map[string]interface{}{ + "schedule_type": fmt.Sprintf("%T", sch), + "plugin_name": plg.Name(), + "plugin_type": plg.TypeName(), + }, + ) + } + } + return nil + }) + } + manager, err := task.RemoteManagers.Get(k) if err != nil { te.errs = append(te.errs, serror.New(err)) return nil, te } var errs []serror.SnapError - errs = manager.ValidateDeps(group.requestedMetrics, group.subscribedPlugins, wf.configTree) + errs = manager.ValidateDeps(group.requestedMetrics, group.subscribedPlugins, wf.configTree, subscribedPluginAsserts...) if len(errs) > 0 { te.errs = append(te.errs, errs...) diff --git a/scheduler/scheduler_medium_test.go b/scheduler/scheduler_medium_test.go index 4da691945..34e316576 100644 --- a/scheduler/scheduler_medium_test.go +++ b/scheduler/scheduler_medium_test.go @@ -63,7 +63,7 @@ func (m *mockMetricManager) ProcessMetrics([]core.Metric, map[string]ctypes.Conf return nil, nil } -func (m *mockMetricManager) ValidateDeps(mts []core.RequestedMetric, prs []core.SubscribedPlugin, ctree *cdata.ConfigDataTree) []serror.SnapError { +func (m *mockMetricManager) ValidateDeps(mts []core.RequestedMetric, prs []core.SubscribedPlugin, ctree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) []serror.SnapError { if m.failValidatingMetrics { return []serror.SnapError{ serror.New(errors.New("metric validation error")), diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 69b2f5efe..2f76c643b 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -62,7 +62,7 @@ func (m *mockMetricManager) PublishMetrics([]core.Metric, map[string]ctypes.Conf func (m *mockMetricManager) ProcessMetrics([]core.Metric, map[string]ctypes.ConfigValue, string, string, int) ([]core.Metric, []error) { return nil, nil } -func (m *mockMetricManager) ValidateDeps(mts []core.RequestedMetric, prs []core.SubscribedPlugin, cdt *cdata.ConfigDataTree) []serror.SnapError { +func (m *mockMetricManager) ValidateDeps(mts []core.RequestedMetric, prs []core.SubscribedPlugin, cdt *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) []serror.SnapError { if m.failValidatingMetrics { return []serror.SnapError{ serror.New(errors.New("metric validation error")), diff --git a/scheduler/task.go b/scheduler/task.go index 0de072f64..de8e98d5e 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -308,6 +308,9 @@ func (t *task) stream() { t.state = core.TaskStopped t.Unlock() done = true + event := new(scheduler_event.TaskStoppedEvent) + event.TaskID = t.id + defer t.eventEmitter.Emit(event) return case mts, ok := <-metricsChan: if !ok { diff --git a/scheduler/task_test.go b/scheduler/task_test.go index 93f1697b1..2d8e74f59 100644 --- a/scheduler/task_test.go +++ b/scheduler/task_test.go @@ -134,7 +134,7 @@ func TestTask(t *testing.T) { task.Spin() err = task.Enable() So(err, ShouldNotBeNil) - So(task.State(), ShouldEqual, core.TaskSpinning) + So(task.State(), ShouldBeIn, []core.TaskState{core.TaskSpinning, core.TaskFiring}) }) Convey("Enable a disabled task", func() {