Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Include newly added sequences in the port DB script #9449

Merged
merged 5 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9449.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in v1.26.0 where some sequences were not properly configured when running `synapse_port_db`.
65 changes: 41 additions & 24 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import logging
import sys
import time
import traceback
from typing import Dict, Optional, Set
from typing import Dict, Iterable, Optional, Set

import yaml

Expand Down Expand Up @@ -629,7 +629,13 @@ class Porter(object):
await self._setup_state_group_id_seq()
await self._setup_user_id_seq()
await self._setup_events_stream_seqs()
await self._setup_device_inbox_seq()
await self._setup_sequence(
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
)
Comment on lines +632 to +634
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the other sequences don't quite follow the same pattern so have custom functions. It could probably be abstract to handle them, but...not sure it is worth it.

await self._setup_sequence(
"account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
await self._setup_auth_chain_sequence()

# Step 3. Get tables.
self.progress.set_state("Fetching tables")
Expand Down Expand Up @@ -854,7 +860,7 @@ class Porter(object):

return done, remaining + done

async def _setup_state_group_id_seq(self):
async def _setup_state_group_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)
Expand All @@ -868,7 +874,7 @@ class Porter(object):

await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)

async def _setup_user_id_seq(self):
async def _setup_user_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.runInteraction(
"setup_user_id_seq", find_max_generated_user_id_localpart
)
Expand All @@ -877,9 +883,9 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))

return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)

async def _setup_events_stream_seqs(self):
async def _setup_events_stream_seqs(self) -> None:
"""Set the event stream sequences to the correct values.
"""

Expand Down Expand Up @@ -908,35 +914,46 @@ class Porter(object):
(curr_backward_id + 1,),
)

return await self.postgres_store.db_pool.runInteraction(
await self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
)

async def _setup_device_inbox_seq(self):
"""Set the device inbox sequence to the correct value.
async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None:
"""Set a sequence to the correct value.
"""
curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="device_inbox",
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
current_stream_ids = []
for stream_id_table in stream_id_tables:
max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table=stream_id_table,
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
current_stream_ids.append(max_stream_id)

curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="device_federation_outbox",
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
next_id = max(current_stream_ids) + 1

def r(txn):
sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, )
txn.execute(sql + " %s", (next_id, ))

next_id = max(curr_local_id, curr_federation_id) + 1
await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r)

async def _setup_auth_chain_sequence(self) -> None:
curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True
)

def r(txn):
txn.execute(
"ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,)
"ALTER SEQUENCE event_auth_chain_id RESTART WITH %s",
(curr_chain_id,),
)

return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r)
await self.postgres_store.db_pool.runInteraction(
"_setup_event_auth_chain_id", r,
)



##############################################
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(self, main_store_class, hs):
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main)
persist_events = PersistEventsStore(hs, database, main, db_conn)

if "state" in database_config.databases:
logger.info(
Expand Down
13 changes: 12 additions & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import StateMap, get_domain_from_id
from synapse.util import json_encoder
Expand Down Expand Up @@ -90,7 +91,11 @@ class PersistEventsStore:
"""

def __init__(
self, hs: "HomeServer", db: DatabasePool, main_data_store: "DataStore"
self,
hs: "HomeServer",
db: DatabasePool,
main_data_store: "DataStore",
db_conn: Connection,
):
self.hs = hs
self.db_pool = db
Expand All @@ -109,6 +114,12 @@ def __init__(
) # type: MultiWriterIdGenerator
self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator

# The consistency of this cannot be checked when the ID generator is
# created since the database might not yet be up-to-date.
self.db_pool.event_chain_id_gen.check_consistency(
db_conn, "event_auth_chains", "chain_id" # type: ignore
)

# This should only exist on instances that are configured to write
assert (
hs.get_instance_name() in hs.config.worker.writers.events
Expand Down