Skip to content

Commit

Permalink
base64 encode Cloud Task payload
Browse files Browse the repository at this point in the history
  • Loading branch information
Arnaud Lachaume committed Jan 23, 2020
1 parent 0d2e7d8 commit 311fa8f
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 17 deletions.
24 changes: 21 additions & 3 deletions app/controllers/cloudtasker/worker_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ class WorkerController < ApplicationController
# Run a worker from a Cloud Task payload
#
def run
# Build payload
payload = JSON.parse(request.body.read).merge(job_retries: job_retries)

# Process payload
WorkerHandler.execute_from_payload!(payload)
head :no_content
Expand All @@ -37,6 +34,27 @@ def run

private

#
# Parse the request body and return the actual job
# payload.
#
# @return [Hash] The job payload
#
def payload
@payload ||= begin
# Get raw body
content = request.body.read

# Decode content if the body is Base64 encoded
if request.headers[Cloudtasker::Config::ENCODING_HEADER].to_s.downcase == 'base64'
content = Base64.decode64(content)
end

# Return content parsed as JSON and add job retries count
JSON.parse(content).merge(job_retries: job_retries)
end
end

#
# Extract the number of times this task failed at runtime.
#
Expand Down
9 changes: 6 additions & 3 deletions examples/sinatra/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
# Authenticate request
Cloudtasker::Authenticator.verify!(request.env['HTTP_AUTHORIZATION'].to_s.split(' ').last)

# Build payload
payload = JSON.parse(request.body.read, symbolize_names: true)
.slice(:worker, :job_id, :job_args, :job_meta, :job_queue)
# Capture content and decode content
content = request.body.read
content = Base64.decode64(content) if request.env['HTTP_CONTENT_TRANSFER_ENCODING'].to_s.downcase == 'base64'

# Format job payload
payload = JSON.parse(content)
.merge(job_retries: request.env['HTTP_X_CLOUDTASKS_TASKEXECUTIONCOUNT'].to_i)

# Process payload
Expand Down
28 changes: 24 additions & 4 deletions lib/cloudtasker/backend/google_cloud_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,29 @@ def self.format_schedule_time(schedule_time)
Google::Protobuf::Timestamp.new.tap { |e| e.seconds = schedule_time.to_i }
end

#
# Format the job payload sent to Cloud Tasks.
#
# @param [Hash] hash The worker payload.
#
# @return [Hash] The Cloud Task payloadd.
#
def self.format_task_payload(payload)
payload = JSON.parse(payload.to_json, symbolize_names: true) # deep dup

# Format schedule time to Google Protobuf timestamp
payload[:schedule_time] = format_schedule_time(payload[:schedule_time])

# Encode job content to support UTF-8. Google Cloud Task
# expect content to be ASCII-8BIT compatible (binary)
payload[:http_request][:headers] ||= {}
payload[:http_request][:headers][Cloudtasker::Config::CONTENT_TYPE_HEADER] = 'text/json'
payload[:http_request][:headers][Cloudtasker::Config::ENCODING_HEADER] = 'Base64'
payload[:http_request][:body] = Base64.encode64(payload[:http_request][:body])

payload
end

#
# Find a task by id.
#
Expand All @@ -104,10 +127,7 @@ def self.find(id)
# @return [Cloudtasker::Backend::GoogleCloudTask, nil] The created task.
#
def self.create(payload)
# Format payload
payload = payload.merge(
schedule_time: format_schedule_time(payload[:schedule_time])
).compact
payload = format_task_payload(payload)

# Extract relative queue name
relative_queue = payload.delete(:queue)
Expand Down
9 changes: 9 additions & 0 deletions lib/cloudtasker/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ class Config
# Retry header in Cloud Task responses
RETRY_HEADER = 'X-CloudTasks-TaskExecutionCount'

# Content-Transfer-Encoding header in Cloud Task responses
ENCODING_HEADER = 'Content-Transfer-Encoding'

# Content Type
CONTENT_TYPE_HEADER = 'Content-Type'

# Authorization header
AUTHORIZATION_HEADER = 'Authorization'

# Default values
DEFAULT_LOCATION_ID = 'us-east1'
DEFAULT_PROCESSOR_PATH = '/cloudtasker/run'
Expand Down
4 changes: 2 additions & 2 deletions lib/cloudtasker/worker_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def task_payload
http_method: 'POST',
url: Cloudtasker.config.processor_url,
headers: {
'Content-Type' => 'application/json',
'Authorization' => "Bearer #{Authenticator.verification_token}"
Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json',
Cloudtasker::Config::AUTHORIZATION_HEADER => "Bearer #{Authenticator.verification_token}"
},
body: worker_payload.to_json
},
Expand Down
26 changes: 22 additions & 4 deletions spec/cloudtasker/backend/google_cloud_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@
end
end

describe '.format_task_payload' do
subject { described_class.format_task_payload(job_payload) }

let(:expected_payload) do
payload = JSON.parse(job_payload.to_json, symbolize_names: true)
payload[:schedule_time] = described_class.format_schedule_time(job_payload[:schedule_time])
payload[:http_request][:headers]['Content-Type'] = 'text/json'
payload[:http_request][:headers]['Content-Transfer-Encoding'] = 'Base64'
payload[:http_request][:body] = Base64.encode64(job_payload[:http_request][:body])
payload
end

it { is_expected.to eq(expected_payload) }
end

describe '.find' do
subject { described_class.find(id) }

Expand Down Expand Up @@ -170,10 +185,13 @@
let(:resp) { instance_double('Google::Cloud::Tasks::V2beta3::Task') }
let(:task) { instance_double(described_class.to_s) }
let(:expected_payload) do
job_payload.merge(
schedule_time: described_class.format_schedule_time(job_payload[:schedule_time]),
queue: nil
).compact
payload = JSON.parse(job_payload.to_json, symbolize_names: true)
payload.delete(:queue)
payload[:schedule_time] = described_class.format_schedule_time(job_payload[:schedule_time])
payload[:http_request][:headers]['Content-Type'] = 'text/json'
payload[:http_request][:headers]['Content-Transfer-Encoding'] = 'Base64'
payload[:http_request][:body] = Base64.encode64(job_payload[:http_request][:body])
payload
end

before { allow(described_class).to receive(:queue_path).with(job_payload[:queue]).and_return(queue) }
Expand Down
17 changes: 16 additions & 1 deletion spec/cloudtasker/worker_controller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
routes { Cloudtasker::Engine.routes }

describe 'POST #run' do
subject { post :run, body: payload.to_json, as: :json }
subject { post :run, body: request_body, as: :json }

let(:payload) do
{
Expand All @@ -16,6 +16,7 @@
'other' => 'foo'
}
end
let(:request_body) { payload.to_json }
let(:expected_payload) { payload.merge(job_retries: retries) }
let(:id) { '111' }
let(:worker_class_name) { 'TestWorker' }
Expand All @@ -38,6 +39,20 @@
it { is_expected.to be_successful }
end

context 'with base64 encoded body' do
let(:request_body) { Base64.encode64(payload.to_json) }

before do
request.env['HTTP_AUTHORIZATION'] = "Bearer #{auth_token}"
request.env['HTTP_CONTENT_TRANSFER_ENCODING'] = 'BASE64'
allow(Cloudtasker::WorkerHandler).to receive(:execute_from_payload!)
.with(expected_payload)
.and_return(true)
end
after { expect(Cloudtasker::WorkerHandler).to have_received(:execute_from_payload!) }
it { is_expected.to be_successful }
end

context 'with execution errors' do
before do
request.env['HTTP_AUTHORIZATION'] = "Bearer #{auth_token}"
Expand Down

0 comments on commit 311fa8f

Please sign in to comment.