diff --git a/changelog.d/5776.misc b/changelog.d/5776.misc new file mode 100644 index 000000000000..1fb1b9c15295 --- /dev/null +++ b/changelog.d/5776.misc @@ -0,0 +1 @@ +Update opentracing docs to use the unified `trace` method. diff --git a/changelog.d/5885.bugfix b/changelog.d/5885.bugfix new file mode 100644 index 000000000000..411d925fd442 --- /dev/null +++ b/changelog.d/5885.bugfix @@ -0,0 +1 @@ +Fix stack overflow when recovering an appservice which had an outage. diff --git a/changelog.d/5886.misc b/changelog.d/5886.misc new file mode 100644 index 000000000000..22adba3d8503 --- /dev/null +++ b/changelog.d/5886.misc @@ -0,0 +1 @@ +Refactor the Appservice scheduler code. diff --git a/changelog.d/5893.misc b/changelog.d/5893.misc new file mode 100644 index 000000000000..07ee4888dc21 --- /dev/null +++ b/changelog.d/5893.misc @@ -0,0 +1 @@ +Drop some unused tables. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 42a350bff8b5..9998f822f1db 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -70,35 +70,37 @@ def __init__(self, hs): self.store = hs.get_datastore() self.as_api = hs.get_application_service_api() - def create_recoverer(service, callback): - return _Recoverer(self.clock, self.store, self.as_api, service, callback) - - self.txn_ctrl = _TransactionController( - self.clock, self.store, self.as_api, create_recoverer - ) + self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api) self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) @defer.inlineCallbacks def start(self): logger.info("Starting appservice scheduler") + # check for any DOWN ASes and start recoverers for them. - recoverers = yield _Recoverer.start( - self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN ) - self.txn_ctrl.add_recoverers(recoverers) + + for service in services: + self.txn_ctrl.start_recoverer(service) def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) class _ServiceQueuer(object): - """Queues events for the same application service together, sending - transactions as soon as possible. Once a transaction is sent successfully, - this schedules any other events in the queue to run. + """Queue of events waiting to be sent to appservices. + + Groups events into transactions per-appservice, and sends them on to the + TransactionController. Makes sure that we only have one transaction in flight per + appservice at a given time. """ def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} + + # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock @@ -136,13 +138,29 @@ def _send_request(self, service): class _TransactionController(object): - def __init__(self, clock, store, as_api, recoverer_fn): + """Transaction manager. + + Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer + if a transaction fails. + + (Note we have only have one of these in the homeserver.) + + Args: + clock (synapse.util.Clock): + store (synapse.storage.DataStore): + as_api (synapse.appservice.api.ApplicationServiceApi): + """ + + def __init__(self, clock, store, as_api): self.clock = clock self.store = store self.as_api = as_api - self.recoverer_fn = recoverer_fn - # keep track of how many recoverers there are - self.recoverers = [] + + # map from service id to recoverer instance + self.recoverers = {} + + # for UTs + self.RECOVERER_CLASS = _Recoverer @defer.inlineCallbacks def send(self, service, events): @@ -154,42 +172,45 @@ def send(self, service, events): if sent: yield txn.complete(self.store) else: - run_in_background(self._start_recoverer, service) + run_in_background(self._on_txn_fail, service) except Exception: logger.exception("Error creating appservice transaction") - run_in_background(self._start_recoverer, service) + run_in_background(self._on_txn_fail, service) @defer.inlineCallbacks def on_recovered(self, recoverer): - self.recoverers.remove(recoverer) logger.info( "Successfully recovered application service AS ID %s", recoverer.service.id ) + self.recoverers.pop(recoverer.service.id) logger.info("Remaining active recoverers: %s", len(self.recoverers)) yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - def add_recoverers(self, recoverers): - for r in recoverers: - self.recoverers.append(r) - if len(recoverers) > 0: - logger.info("New active recoverers: %s", len(self.recoverers)) - @defer.inlineCallbacks - def _start_recoverer(self, service): + def _on_txn_fail(self, service): try: yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id, - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + self.start_recoverer(service) except Exception: logger.exception("Error starting AS recoverer") + def start_recoverer(self, service): + """Start a Recoverer for the given service + + Args: + service (synapse.appservice.ApplicationService): + """ + logger.info("Starting recoverer for AS ID %s", service.id) + assert service.id not in self.recoverers + recoverer = self.RECOVERER_CLASS( + self.clock, self.store, self.as_api, service, self.on_recovered + ) + self.recoverers[service.id] = recoverer + recoverer.recover() + logger.info("Now %i active recoverers", len(self.recoverers)) + @defer.inlineCallbacks def _is_service_up(self, service): state = yield self.store.get_appservice_state(service) @@ -197,18 +218,17 @@ def _is_service_up(self, service): class _Recoverer(object): - @staticmethod - @defer.inlineCallbacks - def start(clock, store, as_api, callback): - services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN) - recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services] - for r in recoverers: - logger.info( - "Starting recoverer for AS ID %s which was marked as " "DOWN", - r.service.id, - ) - r.recover() - return recoverers + """Manages retries and backoff for a DOWN appservice. + + We have one of these for each appservice which is currently considered DOWN. + + Args: + clock (synapse.util.Clock): + store (synapse.storage.DataStore): + as_api (synapse.appservice.api.ApplicationServiceApi): + service (synapse.appservice.ApplicationService): the service we are managing + callback (callable[_Recoverer]): called once the service recovers. + """ def __init__(self, clock, store, as_api, service, callback): self.clock = clock @@ -224,7 +244,9 @@ def _retry(): "as-recoverer-%s" % (self.service.id,), self.retry ) - self.clock.call_later((2 ** self.backoff_counter), _retry) + delay = 2 ** self.backoff_counter + logger.info("Scheduling retries on %s in %fs", self.service.id, delay) + self.clock.call_later(delay, _retry) def _backoff(self): # cap the backoff to be around 8.5min => (2^9) = 512 secs @@ -234,25 +256,30 @@ def _backoff(self): @defer.inlineCallbacks def retry(self): + logger.info("Starting retries on %s", self.service.id) try: - txn = yield self.store.get_oldest_unsent_txn(self.service) - if txn: + while True: + txn = yield self.store.get_oldest_unsent_txn(self.service) + if not txn: + # nothing left: we're done! + self.callback(self) + return + logger.info( "Retrying transaction %s for AS ID %s", txn.id, txn.service.id ) sent = yield txn.send(self.as_api) - if sent: - yield txn.complete(self.store) - # reset the backoff counter and retry immediately - self.backoff_counter = 1 - yield self.retry() - else: - self._backoff() - else: - self._set_service_recovered() - except Exception as e: - logger.exception(e) - self._backoff() - - def _set_service_recovered(self): - self.callback(self) + if not sent: + break + + yield txn.complete(self.store) + + # reset the backoff counter and then process the next transaction + self.backoff_counter = 1 + + except Exception: + logger.exception("Unexpected error running retries") + + # we didn't manage to send all of the transactions before we got an error of + # some flavour: reschedule the next retry. + self._backoff() diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index d2c209c471fa..6b706e189282 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -43,6 +43,9 @@ an optional dependency. This does however limit the number of modifiable spans at any point in the code to one. From here out references to `opentracing` in the code snippets refer to the Synapses module. +Most methods provided in the module have a direct correlation to those provided +by opentracing. Refer to docs there for a more in-depth documentation on some of +the args and methods. Tracing ------- @@ -68,52 +71,62 @@ Tracing functions ----------------- -Functions can be easily traced using decorators. There is a decorator for -'normal' function and for functions which are actually deferreds. The name of +Functions can be easily traced using decorators. The name of the function becomes the operation name for the span. .. code-block:: python - from synapse.logging.opentracing import trace, trace_deferred + from synapse.logging.opentracing import trace - # Start a span using 'normal_function' as the operation name + # Start a span using 'interesting_function' as the operation name @trace - def normal_function(*args, **kwargs): + def interesting_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful - # Start a span using 'deferred_function' as the operation name - @trace_deferred - @defer.inlineCallbacks - def deferred_function(*args, **kwargs): - # We start - yield we_wait - # we finish - return something_usual_and_useful Operation names can be explicitly set for functions by using -``trace_using_operation_name`` and -``trace_deferred_using_operation_name`` +``trace_using_operation_name`` .. code-block:: python - from synapse.logging.opentracing import ( - trace_using_operation_name, - trace_deferred_using_operation_name - ) + from synapse.logging.opentracing import trace_using_operation_name @trace_using_operation_name("A *much* better operation name") - def normal_function(*args, **kwargs): + def interesting_badly_named_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful - @trace_deferred_using_operation_name("Another exciting operation name!") - @defer.inlineCallbacks - def deferred_function(*args, **kwargs): - # We start - yield we_wait - # we finish - return something_usual_and_useful +Setting Tags +------------ + +To set a tag on the active span do + +.. code-block:: python + + from synapse.logging.opentracing import set_tag + + set_tag(tag_name, tag_value) + +There's a convenient decorator to tag all the args of the method. It uses +inspection in order to use the formal parameter names prefixed with 'ARG_' as +tag names. It uses kwarg names as tag names without the prefix. + +.. code-block:: python + + from synapse.logging.opentracing import tag_args + + @tag_args + def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): + pass + + set_fates("the story", "the end", "the act") + # This will have the following tags + # - ARG_clotho: "the story" + # - ARG_lachesis: "the end" + # - ARG_atropos: "the act" + # - father: "Zues" + # - mother: "Themis" Contexts and carriers --------------------- diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ac876287fc10..6fcfa4d7894d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1302,15 +1302,11 @@ def _delete_existing_rows_txn(cls, txn, events_and_contexts): "event_reference_hashes", "event_search", "event_to_state_groups", - "guest_access", - "history_visibility", "local_invites", - "room_names", "state_events", "rejections", "redactions", "room_memberships", - "topics", ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), @@ -1454,10 +1450,10 @@ def _update_metadata_tables_txn( for event, _ in events_and_contexts: if event.type == EventTypes.Name: - # Insert into the room_names and event_search tables. + # Insert into the event_search table. self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: - # Insert into the topics table and event_search table. + # Insert into the event_search table. self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Message: # Insert into the event_search table. @@ -1465,12 +1461,6 @@ def _update_metadata_tables_txn( elif event.type == EventTypes.Redaction: # Insert into the redactions table. self._store_redaction(txn, event) - elif event.type == EventTypes.RoomHistoryVisibility: - # Insert into the event_search table. - self._store_history_visibility_txn(txn, event) - elif event.type == EventTypes.GuestAccess: - # Insert into the event_search table. - self._store_guest_access_txn(txn, event) self._handle_event_relations(txn, event) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index bc606292b82f..08e13f3a3bb0 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -386,32 +386,12 @@ def f(txn): def _store_room_topic_txn(self, txn, event): if hasattr(event, "content") and "topic" in event.content: - self._simple_insert_txn( - txn, - "topics", - { - "event_id": event.event_id, - "room_id": event.room_id, - "topic": event.content["topic"], - }, - ) - self.store_event_search_txn( txn, event, "content.topic", event.content["topic"] ) def _store_room_name_txn(self, txn, event): if hasattr(event, "content") and "name" in event.content: - self._simple_insert_txn( - txn, - "room_names", - { - "event_id": event.event_id, - "room_id": event.room_id, - "name": event.content["name"], - }, - ) - self.store_event_search_txn( txn, event, "content.name", event.content["name"] ) @@ -422,21 +402,6 @@ def _store_room_message_txn(self, txn, event): txn, event, "content.body", event.content["body"] ) - def _store_history_visibility_txn(self, txn, event): - self._store_content_index_txn(txn, event, "history_visibility") - - def _store_guest_access_txn(self, txn, event): - self._store_content_index_txn(txn, event, "guest_access") - - def _store_content_index_txn(self, txn, event, key): - if hasattr(event, "content") and key in event.content: - sql = ( - "INSERT INTO %(key)s" - " (event_id, room_id, %(key)s)" - " VALUES (?, ?, ?)" % {"key": key} - ) - txn.execute(sql, (event.event_id, event.room_id, event.content[key])) - def add_event_report( self, room_id, event_id, user_id, reason, content, received_ts ): diff --git a/synapse/storage/schema/delta/56/drop_unused_event_tables.sql b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql new file mode 100644 index 000000000000..9f09922c677d --- /dev/null +++ b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql @@ -0,0 +1,20 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- these tables are never used. +DROP TABLE IF EXISTS room_names; +DROP TABLE IF EXISTS topics; +DROP TABLE IF EXISTS history_visibility; +DROP TABLE IF EXISTS guest_access; diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 04b8c2c07c62..52f89d3f834e 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -37,11 +37,9 @@ def setUp(self): self.recoverer = Mock() self.recoverer_fn = Mock(return_value=self.recoverer) self.txnctrl = _TransactionController( - clock=self.clock, - store=self.store, - as_api=self.as_api, - recoverer_fn=self.recoverer_fn, + clock=self.clock, store=self.store, as_api=self.as_api ) + self.txnctrl.RECOVERER_CLASS = self.recoverer_fn def test_single_service_up_txn_sent(self): # Test: The AS is up and the txn is successfully sent.