Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Make Relay a Collector Plugin #3

Merged
merged 1 commit into from
Jul 11, 2017
Merged

Conversation

kjlyon
Copy link
Contributor

@kjlyon kjlyon commented May 5, 2017

img
The docker-compose file and task used in this demo can be found here.

Note: this PR requires the latest commits from intelsdi-x/snap-plugin-lib-go#85

README.md Outdated

Terminal 2:
1. `echo "test.first 10 `date +%s`"|nc -c localhost <number from TCP listener>`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the form of presentation:
current: echo "test.first 10 date +%s"|nc -c localhost <number from TCP listener>
suggested: echo "test.first 10 `date +%s`"|nc -c localhost <number from TCP listener>

README.md Outdated
## How to test streaming plugin without Snap:
Terminal 1:
1. `cd relay-plugin`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean snap-relay?

client/main.go Outdated
log.Fields{
"metric": metric,
},
).Debug("recieved metric")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo -> received

main.go Outdated
/*
http://www.apache.org/licenses/LICENSE-2.0.txt

Copyright 2015 Intel Corporation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a new file - there should be 2017

relay/relay.go Outdated
@@ -0,0 +1,122 @@
/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2016 Intel Corporation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a new file - should be 2017

relay/relay.go Outdated
"github.com/intelsdi-x/snap-relay/graphite"
)

//TODO rename :)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this "Todo" still valid?

relay/relay.go Outdated
log.Fields{
"len(metrics)": len(metrics),
},
).Debug("recieved metrics")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo -> received

@IzabellaRaulin
Copy link
Contributor

IzabellaRaulin commented May 10, 2017

Missing glide files

To inform others - glides files have not been added until related PR (in snap-plugin-lib-go) will be merged to master

@@ -144,6 +168,7 @@ func (g *graphite) stop() {
}

func parse(data string) *plugin.Metric {
data = strings.Trim(data, "\r")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@kjlyon
Copy link
Contributor Author

kjlyon commented Jun 7, 2017

Streaming tasks need to be able to fail. Currently since they can't really fail, the tasks will never stop. This is true even when all necessary plugins are unloaded, the task appears to be running just fine. Note the task stops getting Hits, but there is no other sign of failure.

screen shot 2017-06-07 at 3 52 50 pm

Though, when at this point all plugins are unloaded and the task appears to be running just fine, then I stop the task and try to start it again, I get the following error:

screen shot 2017-06-07 at 3 41 48 pm

Question: Why does version for influxdb publisher plugin say -1?

@IzabellaRaulin
Copy link
Contributor

IzabellaRaulin commented Jun 8, 2017

@kjlyon, good catch! According to your question:

Question: Why does version for influxdb publisher plugin say -1?

"-1" means: take the latest version of the plugin. In this case, this error message means that there is no influxdb publisher loaded.
In another case, when a specific version is declared to be used in task manifest (e.g. version 22), looking for plugin exactly in that version will happen during starting a task - and even the plugin is available in version 23, you will receive similar error poining to version(22).
If you have any other further question according to this, please, reach me out.

@kjlyon
Copy link
Contributor Author

kjlyon commented Jun 8, 2017

As an interesting follow up to my previous comment, that behavior occurs when I:

 Open container (task and plugins already loaded)
 unload influxdb
 task list -> Task status is: stopped
 stop task
 start task -> (Error as shown above)

I get a different error when I do this:

 Open container (tasks and plugins already loaded)
 unload snap-relay
 task list -> Task status is: stopping
 stop task
 start task -> Error:

screen shot 2017-06-08 at 4 26 59 pm

It's interesting the difference I'm seeing between when stopped and stopping show up.

@IzabellaRaulin what could be causing the different errors from when I unload relay and then stop the task vs when I unload influxdb then stop the task?

@IzabellaRaulin
Copy link
Contributor

IzabellaRaulin commented Jun 12, 2017

hello @kjlyon, I am quite confused why running task does not change its status to disabled after plugin-in-use was unloaded. Besides that, could you tell me how you unload snap-relay plugin? I mean that snaptel plugin unload <plugin_type> <plugin_name> <plugin_version> does not work work me as expected for stream collector plugin type. Ok, I see my mistake that I did not put it in "". Is there any thought to change "stream collector" to sth without space inside, e.g. "stream-collector". I suppose it might be more user-friendly. I will put appropriate comment on snap repo here

It's interesting the difference I'm seeing between when stopped and stopping show up.

Defined flow for regular (I mean not streaming) plugin looks like below:
a) execute "stop task"
b) change task status to "stopping" see here
c) close killChan
d) change task status to "stopped" and emit TaskStoppedEvent see here
e) in HandleGomitEvent process UnsubscibePlugins() see here

what could be causing the different errors from when I unload relay and then stop the task vs when I unload influxdb then stop the task

I reproduced your case with snap-relay plugin. As a first thing, I confirmed that unsubscription works fine in straightforward mechanism: stop running task and then start it again. So, it needs to be checked what happens during unloading streaming collector. I will look on that and let you know. FYI, I opened the issue addressing disunity in behavior across different type of plugins - intelsdi-x/snap#1659

)

type tcpListener struct {
port *int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcooklin, why the port is a pointer to int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IzabellaRaulin, this has to remain a pointer to an int. The problem roots back to lines 37-38 of main.go. These two lines are evaluated before the cli has a chance to evaluate the flags. Thus the lines are evaluated with the default values and any value being passed in through flags is not being set in time. Having port as a pointer to an int solves this problem, as the value that is being pointed to can be changed once the flags are evaluated.
From what I have found, it is not possible to have port be an int. Please let me know if you find another way or if it is okay leaving it this way.

tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:0")
addr := fmt.Sprintf("%v:0", plugin.ListenAddr)
if t.port != nil {
addr = fmt.Sprintf("%v:%v", plugin.ListenAddr, *t.port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider declaring t.port as an int, so here the lines (81-82) would be changed into a single one:
addr = fmt.Sprintf("%v:%v", plugin.ListenAddr, t.port)

)

type udpListener struct {
port *int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

"udp",
fmt.Sprintf("%v:%v",
plugin.ListenAddr,
*u.port,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the present state, *u.port might be "nil" - there is no check here (in comparison to the implementation of similar functionality for TCP) - please look on that. Also, changing declaration of u.port as int instead of *int might be taken into consideration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to get nil pointer dereference here

relay/relay.go Outdated
for _, val := range vals {
metric := plugin.Metric{
Namespace: plugin.NewNamespace("relay", val),
Version: 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using pluginVersion here to make this easier to manage in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

relay/relay.go Outdated

policy.AddNewStringRule([]string{"relay", "collectd"},
"graphite",
false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcooklin, what can be provided as a value of "graphite" config option? It's not required, even there is no default value - it's not clear for me. It would be great if you can explain it. Thank You

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The graphite config option can be provided as a flag when starting the relay. There are default values for both TCP and UDP here.

relay/relay.go Outdated

policy.AddNewStringRule([]string{"relay", "statsd"},
"statsd",
false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above - it's not clear what can be passed via this config option.

cfg.AddItem("MaxMetricsBuffer", ctypes.ConfigValueInt{Value: 2})
requested_metrics := []core.Metric{
plugin.MetricType{
Namespace_: core.NewNamespace("relay", "collectd"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see we didn't have relay for collectd. It is just graphite relay present here. Shouldn't the namespace consist of it instead of collectd?

limitations under the License.
*/

package main
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is ok to get 2 packages and functions main in the same repo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know for sure. Though I kept it as a main package because that's how I run the client for testing purposes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK AFAIK.

).Debug("received metrics")

//assign port values if any passed in
if metric.Namespace[len(metric.Namespace)-1].Value == "collectd" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

graphite?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a subjective decision IMO. Since the primary intent was to communicate as clearly the ability to integrate snap into environments with collectd I choose to stick with collectd in the namespace knowing that the write_graphite plugin in collectd would be used. This being said I can see an argument for changing the namesapce and all references of collectd to graphite in this relay.

func (r *relay) GetConfigPolicy() (plugin.ConfigPolicy, error) {
policy := plugin.NewConfigPolicy()

policy.AddNewStringRule([]string{"relay", "collectd"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

graphite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it should be graphite here instead of collectd, should it also be graphite on line 139 in place of statsd?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends. Statsd relay was wrote to support statsd utility in repeater mode - statsd passes through messages sent by user. If we configure statsd to run with graphite it will behave same as collectd relay

@IzabellaRaulin
Copy link
Contributor

@kjlyon, please squash commits before merging it

- Added core functions to relay.go
- Create Client for testing purposes
- Adds graphite UDP and TCP flags
- Add capability for the relay to set MaxCollectDuration and MaxMetricsBuffer
- Support for simultaneous streams
- Renamed relay package to protocol for clarity
@kjlyon kjlyon merged commit 5f727c2 into intelsdi-x:master Jul 11, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants