Skip to content

Commit

Permalink
Fix processing of remote Delete activities (mastodon#16084)
Browse files Browse the repository at this point in the history
* Add tests

* Ensure deleted statuses are marked as such

* Save some redis memory by not storing URIs in delete_upon_arrival values

* Avoid possible race condition when processing incoming Deletes

* Avoid potential duplicate Delete forwards

* Lower lock durations to reduce issues in case of hard crash of the Rails process

* Check for `lock.aquired?` and improve comment

* Refactor RedisLock usage in app/lib/activitypub

* Fix using incorrect or non-existent sender for relaying Deletes
  • Loading branch information
ClearlyClaire committed Jan 28, 2022
1 parent 6386421 commit 13d1111
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 79 deletions.
14 changes: 12 additions & 2 deletions app/lib/activitypub/activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def delete_arrived_first?(uri)
end

def delete_later!(uri)
redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, uri)
redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, true)
end

def status_from_object
Expand Down Expand Up @@ -210,12 +210,22 @@ def fetch_remote_original_status
end
end

def lock_or_return(key, expire_after = 7.days.seconds)
def lock_or_return(key, expire_after = 2.hours.seconds)
yield if redis.set(key, true, nx: true, ex: expire_after)
ensure
redis.del(key)
end

def lock_or_fail(key)
RedisLock.acquire({ redis: Redis.current, key: key }) do |lock|
if lock.acquired?
yield
else
raise Mastodon::RaceConditionError
end
end
end

def fetch?
!@options[:delivery]
end
Expand Down
36 changes: 14 additions & 22 deletions app/lib/activitypub/activity/announce.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,25 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
def perform
return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?

RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
original_status = status_from_object
lock_or_fail("announce:#{@object['id']}") do
original_status = status_from_object

return reject_payload! if original_status.nil? || !announceable?(original_status)
return reject_payload! if original_status.nil? || !announceable?(original_status)

@status = Status.find_by(account: @account, reblog: original_status)
@status = Status.find_by(account: @account, reblog: original_status)

return @status unless @status.nil?
return @status unless @status.nil?

@status = Status.create!(
account: @account,
reblog: original_status,
uri: @json['id'],
created_at: @json['published'],
override_timestamps: @options[:override_timestamps],
visibility: visibility_from_audience
)
@status = Status.create!(
account: @account,
reblog: original_status,
uri: @json['id'],
created_at: @json['published'],
override_timestamps: @options[:override_timestamps],
visibility: visibility_from_audience
)

distribute(@status)
else
raise Mastodon::RaceConditionError
end
distribute(@status)
end

@status
Expand Down Expand Up @@ -69,8 +65,4 @@ def requested_through_relay?
def reblog_of_local_status?
status_from_uri(object_uri)&.account&.local?
end

def lock_options
{ redis: Redis.current, key: "announce:#{@object['id']}" }
end
end
36 changes: 10 additions & 26 deletions app/lib/activitypub/activity/create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,15 @@ def message_franking
def create_status
return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity?

RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator
lock_or_fail("create:#{object_uri}") do
return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator

@status = find_existing_status
@status = find_existing_status

if @status.nil?
process_status
elsif @options[:delivered_to_account_id].present?
postprocess_audience_and_deliver
end
else
raise Mastodon::RaceConditionError
if @status.nil?
process_status
elsif @options[:delivered_to_account_id].present?
postprocess_audience_and_deliver
end
end

Expand Down Expand Up @@ -314,13 +310,9 @@ def poll_vote!
poll = replied_to_status.preloadable_poll
already_voted = true

RedisLock.acquire(poll_lock_options) do |lock|
if lock.acquired?
already_voted = poll.votes.where(account: @account).exists?
poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
else
raise Mastodon::RaceConditionError
end
lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do
already_voted = poll.votes.where(account: @account).exists?
poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
end

increment_voters_count! unless already_voted
Expand Down Expand Up @@ -513,12 +505,4 @@ def increment_voters_count!
poll.reload
retry
end

def lock_options
{ redis: Redis.current, key: "create:#{object_uri}" }
end

def poll_lock_options
{ redis: Redis.current, key: "vote:#{replied_to_status.poll_id}:#{@account.id}" }
end
end
62 changes: 33 additions & 29 deletions app/lib/activitypub/activity/delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,35 @@ def delete_person
def delete_note
return if object_uri.nil?

unless invalid_origin?(object_uri)
RedisLock.acquire(lock_options) { |_lock| delete_later!(object_uri) }
Tombstone.find_or_create_by(uri: object_uri, account: @account)
end
lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do
unless invalid_origin?(object_uri)
# This lock ensures a concurrent `ActivityPub::Activity::Create` either
# does not create a status at all, or has finished saving it to the
# database before we try to load it.
# Without the lock, `delete_later!` could be called after `delete_arrived_first?`
# and `Status.find` before `Status.create!`
lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) }

@status = Status.find_by(uri: object_uri, account: @account)
@status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
Tombstone.find_or_create_by(uri: object_uri, account: @account)
end

return if @status.nil?
@status = Status.find_by(uri: object_uri, account: @account)
@status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?

if @status.distributable?
forward_for_reply
forward_for_reblogs
end
return if @status.nil?

delete_now!
forward! if @json['signature'].present? && @status.distributable?
delete_now!
end
end

def forward_for_reblogs
return if @json['signature'].blank?

rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
inboxes = Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes - [@account.preferred_inbox_url]
def rebloggers_ids
return @rebloggers_ids if defined?(@rebloggers_ids)
@rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
end

ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, rebloggers_ids.first, inbox_url]
end
def inboxes_for_reblogs
Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes
end

def replied_to_status
Expand All @@ -58,13 +60,19 @@ def reply_to_local?
!replied_to_status.nil? && replied_to_status.account.local?
end

def forward_for_reply
return unless @json['signature'].present? && reply_to_local?
def inboxes_for_reply
replied_to_status.account.followers.inboxes
end

def forward!
inboxes = inboxes_for_reblogs
inboxes += inboxes_for_reply if reply_to_local?
inboxes -= [@account.preferred_inbox_url]

inboxes = replied_to_status.account.followers.inboxes - [@account.preferred_inbox_url]
sender_id = reply_to_local? ? replied_to_status.account_id : rebloggers_ids.first

ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, replied_to_status.account_id, inbox_url]
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes.uniq) do |inbox_url|
[payload, sender_id, inbox_url]
end
end

Expand All @@ -75,8 +83,4 @@ def delete_now!
def payload
@payload ||= Oj.dump(@json)
end

def lock_options
{ redis: Redis.current, key: "create:#{object_uri}" }
end
end
2 changes: 2 additions & 0 deletions app/services/remove_status_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def call(status, **options)
@account = status.account
@options = options

@status.discard

RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
remove_from_self if @account.local?
Expand Down
20 changes: 20 additions & 0 deletions spec/lib/activitypub/activity/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,24 @@
end
end
end

context 'when the status has been reported' do
describe '#perform' do
subject { described_class.new(json, sender) }
let!(:reporter) { Fabricate(:account) }

before do
reporter.reports.create!(target_account: status.account, status_ids: [status.id], forwarded: false)
subject.perform
end

it 'marks the status as deleted' do
expect(Status.find_by(id: status.id)).to be_nil
end

it 'actually keeps a copy for inspection' do
expect(Status.with_discarded.find_by(id: status.id)).to_not be_nil
end
end
end
end

0 comments on commit 13d1111

Please sign in to comment.