Skip to content

Commit

Permalink
Merge pull request #1178 from appsignal/update-shoryuken-instrumentation
Browse files Browse the repository at this point in the history
Refactor Shoryuken to not use monitor_transaction
  • Loading branch information
tombruijn authored Jul 10, 2024
2 parents 9e50419 + ad80e3d commit 5b62733
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 234 deletions.
66 changes: 3 additions & 63 deletions lib/appsignal/hooks/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,68 +3,6 @@
module Appsignal
class Hooks
# @api private
class ShoryukenMiddleware
def call(worker_instance, queue, sqs_msg, body, &block)
batch = sqs_msg.is_a?(Array)
attributes =
if batch
# We can't instrument batched message separately, the `yield` will
# perform all the batched messages.
# To provide somewhat useful metadata, Get first message based on
# SentTimestamp, and use its attributes as metadata for the
# transaction. We can't combine them all because then they would
# overwrite each other and the last message (in an sorted order)
# would be used as the source of the metadata. With the
# oldest/first message at least some useful information is stored
# such as the first received time and the number of retries for the
# first message. The newer message should have lower values and
# timestamps in their metadata.
first_msg = sqs_msg.min do |a, b|
a.attributes["SentTimestamp"].to_i <=> b.attributes["SentTimestamp"].to_i
end
# Add batch => true metadata so people can recognize when a
# transaction is about a batch of messages.
first_msg.attributes.merge(:batch => true)
else
sqs_msg.attributes.merge(:message_id => sqs_msg.message_id)
end
metadata = { :queue => queue }.merge(attributes)
options = {
:class => worker_instance.class.name,
:method => "perform",
:metadata => metadata
}

args =
if batch
bodies = {}
sqs_msg.each_with_index do |msg, index|
# Store all separate bodies on a hash with the key being the
# message_id
bodies[msg.message_id] = body[index]
end
bodies
else
case body
when Hash
body
else
{ :params => body }
end
end
options[:params] = Appsignal::Utils::HashSanitizer.sanitize(
args,
Appsignal.config[:filter_parameters]
)

if attributes.key?("SentTimestamp")
options[:queue_start] = Time.at(attributes["SentTimestamp"].to_i / 1000)
end

Appsignal.monitor_transaction("perform_job.shoryuken", options, &block)
end
end

class ShoryukenHook < Appsignal::Hooks::Hook
register :shoryuken

Expand All @@ -73,9 +11,11 @@ def dependencies_present?
end

def install
require "appsignal/integrations/shoryuken"

::Shoryuken.configure_server do |config|
config.server_middleware do |chain|
chain.add Appsignal::Hooks::ShoryukenMiddleware
chain.add Appsignal::Integrations::ShoryukenMiddleware
end
end
end
Expand Down
79 changes: 79 additions & 0 deletions lib/appsignal/integrations/shoryuken.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# frozen_string_literal: true

module Appsignal
module Integrations
# @api private
class ShoryukenMiddleware
def call(worker_instance, queue, sqs_msg, body, &block)
transaction = Appsignal::Transaction.create(
SecureRandom.uuid,
Appsignal::Transaction::BACKGROUND_JOB,
Appsignal::Transaction::GenericRequest.new({})
)

Appsignal.instrument("perform_job.shoryuken", &block)
rescue Exception => error # rubocop:disable Lint/RescueException
transaction.set_error(error)
raise
ensure
batch = sqs_msg.is_a?(Array)
attributes = fetch_attributes(batch, sqs_msg)
transaction.set_action_if_nil("#{worker_instance.class.name}#perform")
transaction.set_params_if_nil { fetch_args(batch, sqs_msg, body) }
transaction.set_tags(attributes)
transaction.set_tags("queue" => queue)
transaction.set_tags("batch" => true) if batch

if attributes.key?("SentTimestamp")
transaction.set_queue_start(Time.at(attributes["SentTimestamp"].to_i).to_i)
end

Appsignal::Transaction.complete_current!
end

private

def fetch_attributes(batch, sqs_msg)
if batch
# We can't instrument batched message separately, the `yield` will
# perform all the batched messages.
# To provide somewhat useful metadata, Get first message based on
# SentTimestamp, and use its attributes as metadata for the
# transaction. We can't combine them all because then they would
# overwrite each other and the last message (in an sorted order)
# would be used as the source of the metadata. With the
# oldest/first message at least some useful information is stored
# such as the first received time and the number of retries for the
# first message. The newer message should have lower values and
# timestamps in their metadata.
first_msg =
sqs_msg.min do |a, b|
a.attributes["SentTimestamp"].to_i <=> b.attributes["SentTimestamp"].to_i
end
first_msg.attributes
else
sqs_msg.attributes.merge(:message_id => sqs_msg.message_id)
end
end

def fetch_args(batch, sqs_msg, body)
if batch
bodies = {}
sqs_msg.each_with_index do |msg, index|
# Store all separate bodies on a hash with the key being the
# message_id
bodies[msg.message_id] = body[index]
end
bodies
else
case body
when Hash
body
else
{ :params => body }
end
end
end
end
end
end
171 changes: 0 additions & 171 deletions spec/lib/appsignal/hooks/shoryuken_spec.rb
Original file line number Diff line number Diff line change
@@ -1,174 +1,3 @@
describe Appsignal::Hooks::ShoryukenMiddleware do
class DemoShoryukenWorker
end

let(:time) { "2010-01-01 10:01:00UTC" }
let(:worker_instance) { DemoShoryukenWorker.new }
let(:queue) { "some-funky-queue-name" }
let(:sqs_msg) { double(:message_id => "msg1", :attributes => {}) }
let(:body) { {} }
before(:context) { start_agent }
around { |example| keep_transactions { example.run } }

def perform_shoryuken_job(&block)
block ||= lambda {}
Timecop.freeze(Time.parse(time)) do
Appsignal::Hooks::ShoryukenMiddleware.new.call(
worker_instance,
queue,
sqs_msg,
body,
&block
)
end
end

context "with a performance call" do
let(:sent_timestamp) { Time.parse("1976-11-18 0:00:00UTC").to_i * 1000 }
let(:sqs_msg) do
double(:message_id => "msg1", :attributes => { "SentTimestamp" => sent_timestamp })
end

context "with complex argument" do
let(:body) { { :foo => "Foo", :bar => "Bar" } }

it "wraps the job in a transaction with the correct params" do
allow_any_instance_of(Appsignal::Transaction).to receive(:set_queue_start).and_call_original
expect { perform_shoryuken_job }.to change { created_transactions.length }.by(1)

transaction = last_transaction
expect(transaction).to have_id
expect(transaction).to have_namespace(Appsignal::Transaction::BACKGROUND_JOB)
expect(transaction).to have_action("DemoShoryukenWorker#perform")
expect(transaction).to_not have_error
expect(transaction).to include_event(
"body" => "",
"body_format" => Appsignal::EventFormatter::DEFAULT,
"count" => 1,
"name" => "perform_job.shoryuken",
"title" => ""
)
expect(transaction).to include_params("foo" => "Foo", "bar" => "Bar")
expect(transaction).to include_sample_metadata(
"message_id" => "msg1",
"queue" => queue,
"SentTimestamp" => sent_timestamp
)
expect(transaction).to have_queue_start(sent_timestamp)
expect(transaction).to be_completed
end

context "with parameter filtering" do
before do
Appsignal.config = project_fixture_config("production")
Appsignal.config[:filter_parameters] = ["foo"]
end
after do
Appsignal.config[:filter_parameters] = []
end

it "filters selected arguments" do
perform_shoryuken_job

expect(last_transaction).to include_params("foo" => "[FILTERED]", "bar" => "Bar")
end
end
end

context "with a string as an argument" do
let(:body) { "foo bar" }

it "handles string arguments" do
perform_shoryuken_job

expect(last_transaction).to include_params("params" => body)
end
end

context "with primitive type as argument" do
let(:body) { 1 }

it "handles primitive types as arguments" do
perform_shoryuken_job

expect(last_transaction).to include_params("params" => body)
end
end
end

context "with exception" do
it "sets the exception on the transaction" do
expect do
expect do
perform_shoryuken_job { raise ExampleException, "error message" }
end.to raise_error(ExampleException)
end.to change { created_transactions.length }.by(1)

transaction = last_transaction
expect(transaction).to have_id
expect(transaction).to have_action("DemoShoryukenWorker#perform")
expect(transaction).to have_namespace(Appsignal::Transaction::BACKGROUND_JOB)
expect(transaction).to have_error("ExampleException", "error message")
expect(transaction).to be_completed
end
end

context "with batched jobs" do
let(:sqs_msg) do
[
double(
:message_id => "msg2",
:attributes => {
"SentTimestamp" => (Time.parse("1976-11-18 01:00:00UTC").to_i * 1000).to_s
}
),
double(
:message_id => "msg1",
:attributes => { "SentTimestamp" => sent_timestamp.to_s }
)
]
end
let(:body) do
[
"foo bar",
{ :id => "123", :foo => "Foo", :bar => "Bar" }
]
end
let(:sent_timestamp) { Time.parse("1976-11-18 01:00:00UTC").to_i * 1000 }

it "creates a transaction for the batch" do
allow_any_instance_of(Appsignal::Transaction).to receive(:set_queue_start).and_call_original
expect do
perform_shoryuken_job {} # rubocop:disable Lint/EmptyBlock
end.to change { created_transactions.length }.by(1)

transaction = last_transaction
expect(transaction).to have_id
expect(transaction).to have_action("DemoShoryukenWorker#perform")
expect(transaction).to have_namespace(Appsignal::Transaction::BACKGROUND_JOB)
expect(transaction).to_not have_error
expect(transaction).to include_event(
"body" => "",
"body_format" => Appsignal::EventFormatter::DEFAULT,
"count" => 1,
"name" => "perform_job.shoryuken",
"title" => ""
)
expect(transaction).to include_params(
"msg2" => "foo bar",
"msg1" => { "id" => "123", "foo" => "Foo", "bar" => "Bar" }
)
expect(transaction).to include_sample_metadata(
"batch" => true,
"queue" => "some-funky-queue-name",
"SentTimestamp" => sent_timestamp.to_s # Earliest/oldest timestamp from messages
)
# Queue time based on earliest/oldest timestamp from messages
expect(transaction).to have_queue_start(sent_timestamp)
end
end
end

describe Appsignal::Hooks::ShoryukenHook do
context "with shoryuken" do
before(:context) do
Expand Down
Loading

0 comments on commit 5b62733

Please sign in to comment.