Skip to content
KUOKA Yusuke edited this page Dec 10, 2013 · 36 revisions

This is a note about improvements and features which may or may not come to Observed. I appreciate any comments on ideas on the note!

--

Observed has two meanings. The one observe daemon and the other one is the past tense and the past participle of the word "observe". The former means that Observed can be ran as a daemon, and the latter means that by using Observed in any form you can observe events to report.

Similar products

  • Though Observed's use-cases are not limited to RSS scraping, Observed and its architecture seems to be comparable to Plagger.

Design decisions

Plugins should be state-less.

Each observer or reporter can be started or stopped at any time. They ideally should not rely on libraries, storages which are not thread-safe. If a plugin is state-less, Observed can create many instances of it to distribute loads until observations. It scales out especially long running blocking I/O for MRI, and long running CPU intensive computations for other Ruby implementations such as JRuby.

Independency from executors

When we need concurrency, just configure it and let the executor to take care of it. The executor is replaceable via plugins. A executor can be based on preforking, threading, etc.

executor 'thread-pool', size: 10

# Observed won't scale out the observation below
#  ==bar==>
#          ==bar==>
#                  ==bar==>
observe 'foo', via: 'http'

# Observed can scale out this one
#  ==bar==>
#    ==bar==>
#          ==bar==>
observe 'bar', via: 'http'

Plugins

  • mail (observes by reading mails via IMAP, reports it via sendmail)
  • http (observes via GET or POST or etc, reports via POST or PUT or etc)
  • github_stars (observes the star gazers count on specific GitHub repositories)
  • ganglia (observes metrics via ganglia-api, reports data by gmetric spoofing)
  • graphite (observes data provided through The URL API, reports data through Graphite's data collection daemon mentioned here with the plaintext protocol optionally using graphite-api gem)
  • *-gexf (observes "links" from the server i.e. web servers load-balanced by it, masters/slaves of it, and report the information as an GEXF)

Wrap up

observe('foo', via: 'http', with: { url: 'http://example.com/stats.json?naming=camel_case' })
  .then(translate to: 'exclude_keys', with: { keys: [:status] })

# The above equals to the code:
#   observe 'foo', via: 'http', url: 'http://example.com/stats.json?naming=camel_case'
#   observe(url: 'http://example.com/stats.json?naming=camel_case', via: 'http') | tag('foo')
match(/^foo$/)
  .then(translate to: 'make_keys_snake_case')
  .then(report to: 'stdout')

every '1 minute' {
  run 'foo'
}

def include_tag(args={})
  translate to: 'include_tag'
end

def include_time(args={})
  translate to: 'include_time', where: args
end

def convert_to_snake_case_keys
  translate via: 'snake_case_keys'
end

endpoints = %w| http://example.com/stats.json |
observations = endpoints.map do |endpoint|
  observe(via: 'http', with: {url: endpoint})
    .then(convert_to_snake_case_keys)
    .then(include_tag)
    .then(include_time)
    .then(
      report(to: 'mongodb', with: { host: 'localhost', db: 'test', collection: 'test' }),
      report(to: 'mail', with: { address: 'mumoshu@example.com' }),
      report(to: 'serf_custom_event'),
      report(to: 'yaml', with: { path: '~/stats.yaml' }),
      report(to: 'stdout')
    )
  end
end

every '1 minute' {
  observations.each(&:now)
}

DSL

Conciseness improvements

Extend the DSL to write one liners which define both observer and reporter at once

#
# AFTER
#
# The `observer => translator => reporter` pattern
# We observe the data tagged 'foo' via HTTP, and then translate it via the sanitizer, and finally report the result to the standard output.
(observe 'foo', via: 'http', with: {}).then(translate it, using: 'sanitizer', with: {}).then(report it, to: 'stdout', with: {})

#
# BEFORE
#
observe 'foo.data', via: 'http', with: {}
translate /foo\.data/, via: 'sanitizer', with: { tag: 'foo' }
report /^foo$/, via: 'stdout', with: {}

Extend the DSL to configure the interval of between observations at once

observe 'foo', via: 'http', with: {}

# We might also want to write one liners as follows
(observe 'foo', via: 'http', with: { url: 'http://example.com/foo' }).then(report via: 'stdout', with: {})
(observe 'foo', via: 'http', with: { url: 'http://example.com/foo' }) >> (report via: 'stdout', with: {})

every '1 minute' do
  run 'foo'
end

Eager or Lazy evaluation

# Does not observe the data yet
observe('foo', via: 'http').then(report to: 'stdout')

# Observe the data now
run 'foo'

# Immediately observe the data
observe('foo', via: 'http').then(report to: 'stdout').now

# With the DSL:
include Observed
# Observe the data now
observe('foo', via: 'http').then(report to: 'stdout').now

Extend the DSL to pre-configure components

report_it_to_stdout = report to: 'stdout'

# You can now reuse `report_it_to_stdout` as follows:
(observe 'foo', via: 'http', with: {}).then(report_it_to_stdout)

observe_foo_via_http = observe 'foo', via: 'http', with: { url: url }
# You can now reuse `observe_foo_via_http` as follows:
observe_foo_via_http.then(report_it_to_stdout)

Extend the DSL to define ad-hoc components

observe 'foo', with: { url: 'http://example.com/stats.json' } do |tag, data|
  [tag, JSON.parse(HTTP.get(url).body)]
end

pass_data_tagged(/foo/).to(report via: 'stdout')

run 'foo'

Feature Extensions

Multiple piping

# Without `multiple piping`
[tag_foo, tag_bar].map do |tag|
  observe_that.then(tag)
end.map do |observe_then_tag|
  [report_via_stdout, report_via_http].each do |report|
  observe_then_tag.then(report)
end

# With `multiple piping`
observe_that.then(tag_foo, tag_bar).then(report_via_stdout, report_via_http)

# Or even do multiple observations at once
[observe_this, observe_that].map do |o|
  o.then(report_via_stdout, report_via_http)
end

# The above code is equivalent to the below:
[observe_this, observe_that] .product [report_via_stdout, report_via_http] .map { |a,b| a.then(b) }

System messages

["example.com",  "mumoshu.com"].each do |host|
  # (1)
  # Implicitly emit data tagged "system.observe.#{tag}"
  observe "host=\"#{host}\",type=\"http\"", via: 'http', with: { ... }
end

# (2)
# Implicitly emit data tagged "system.report.#{tag}"
report /host=.+/, via: 'http', with: { ... }

# Matches (1)
# Doesn't emit data tagged "system.observe.system.observe.#{tag}"
report /system\.observe\.host="[^"]+",type="[^"]+"/, via: 'http', with: { ... }
# Matches (2)
# Doesn't emit data tagged "system.observe.system.observe.#{tag}"
report /system\.report\.host="[^"]+",type="[^"]+"/

Replaceable executor backends

executor 'thread-pool-executor', size: 10

# It basically allows you to execute long running computations in observe/translate/report on top of the thread pool.

observe 'foo' do
  # something takes 1 minutes to complete
end

# The below code should finish in time less than 10 minutes
10.times { run 'foo' }

# Or you can even schedule observations that each of them takes at 3 minutes to run every 1 minute.

observe 'foo', every: '1 minute' do
  # something takes longer than 1 minute to complete
end

translate /^foo$/, do |tag, time, data|
  # something takes longer than 1 minute to complete
  ['translated', time, new_data]
end

report /^translated$/ do |tag, time, data|
  # something takes longer than 1 minute to complete
end

start

# You may implement other executors

executor 'prefork', workers: 10

Integration with Sinatra

require 'sinatra'

# Trigger observations by calling RESTful Web API
post '/trigger/:tag' do
  tag = params[:tag]
  run tag
  "Observation #{tag} started."
end

# Push data to observe via HTTP POST parameters
post '/observe/:tag' do
  tag = params[:tag]
  data = params[:data]
  run tag, data
  "Observation #{tag} started on data: #{data}"
end

More reliability

Retries

observe 'foo', via: 'http', with: {}, retries: 3

# Retries up to 3 times when the observation on 'foo' failed with runtime errors.
run 'foo'

Faster loading

Lazy loading plugins

Usually, it may take long to start-up Observed when you are loading many plugins code on top of your code, by calling require. It will be a bit annoying when you only requires a part of require. For instance, you may use cron or mesos to trigger one of the observations defined in your code, which doesn't need to load all the plugins.

# Register the plugin to be lazy loaded
observer('http') { require 'observed/http' }

# The plugin does not get loaded yet
observe 'foo', via: 'http', with: {}

# The plugin gets loaded
run 'foo'

Intgrations

cron

It may add reliability of cron job scheduling to Observed. It is accomplished by generating crontabs to trigger Observed observation from cron.

observe 'foo', via: 'snmp', with: {}, every: '1 minute'
observe 'bar', via: 'snmp', with: {}, every: '1 minute'
observe 'baz', via: 'snmp', with: {}, every: '2 minute'

When you processed the observed.rb above with the exporter, you get the crontab as follows:

*/1 * * * *  /path/to/observed-oneshot /path/to/observed.rb -t 1m
*/2 * * * *  /path/to/observed-oneshot /path/to/observed.rb -t 2m

Redis based work distribution

It may add scalability with multiple processes and reliability with multiple workers on top of Observed.

configure redis: { host: '...', port: '...' }

observe 'foo', via: 'snmp', with: {}, every: '1 second'
translate /^foo$/ #...
report /^translated$/ #...

# - When ran as the `scheduler`, it `lpush`es observation 'foo' to Redis every 1 second.
# - When ran as the `worker`, it periodically `blpop` observations from Redis and run them.
run

You can run exactly one scheduler and one or more workers as follows:

$ observed-redis observed.rb scheduler
$ observed-redis observed.rb worker
$ observed-redis observed.rb worker
...