Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make synapse_port_db correctly create indexes #6102

Merged
merged 37 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
820f3a3
First attempt at correctly dealing with background updates when porti…
babolivier Jun 4, 2019
6f03a49
Failed attempt at doing background updates in synapse_port_db
babolivier Jun 5, 2019
0edfad7
Merge branch 'develop' into babolivier/port_db_background_updates
babolivier Jun 19, 2019
e916203
Various fixes
babolivier Jun 24, 2019
37b4dda
Merge branch 'develop' into babolivier/port_db_background_updates
babolivier Aug 21, 2019
dac8eb0
Make BackgroundUpdateStore not inherit from SQLBaseStore
babolivier Sep 2, 2019
bbf6bde
Merge branch 'develop' into babolivier/port_db_background_updates
babolivier Sep 2, 2019
2e33955
Revert "Make BackgroundUpdateStore not inherit from SQLBaseStore"
babolivier Sep 3, 2019
a521b65
Make script use DataStore
babolivier Sep 3, 2019
12ae8c9
Attempt at attaching a config to the mocked homeserver object
babolivier Sep 4, 2019
a4358d5
Define and use the experimental BackgroundUpdatesRunnerStore
babolivier Sep 23, 2019
8ab7ecc
Cleanup
babolivier Sep 23, 2019
b7f56d8
Replace the Store with the new background updates compatible store
babolivier Sep 23, 2019
c1fd831
Merge branch 'develop' into babolivier/port_db_background_updates
babolivier Sep 24, 2019
bb4fda2
Update list of stores that use background updates
babolivier Sep 24, 2019
7889e29
Ignore logging context loss warnings
babolivier Sep 24, 2019
f394dc7
Lint
babolivier Sep 24, 2019
3b10ccb
Add \n to errors for niceness
babolivier Sep 24, 2019
7a86b08
Cleanup
babolivier Sep 24, 2019
b17ec9c
Changelog
babolivier Sep 24, 2019
335f467
Copyright
babolivier Sep 25, 2019
e44995f
Use the HomeServer and DataStore classes as bases
babolivier Sep 26, 2019
8b5d88a
Fix logcontext leaks
richvdh Oct 2, 2019
e0df0ee
Complete list of boolean columns
babolivier Oct 3, 2019
c4d6807
Move store creation and background updates to dedicated functions
babolivier Oct 3, 2019
cd81907
Merge branch 'develop' into babolivier/port_db_background_updates
babolivier Oct 3, 2019
bb05e4b
Use new stores instead of the DataStore
babolivier Oct 3, 2019
65885b0
Add missing stores to the synapse_port_db store
babolivier Oct 3, 2019
ce339e6
Don't inherit from HomeServer
babolivier Oct 4, 2019
53f7d03
Lint
babolivier Oct 3, 2019
1eb0600
Merge branch 'babolivier/factor_out_bg_updates' into babolivier/port_…
babolivier Oct 7, 2019
5874465
Merge branch 'develop' into babolivier/port_db_background_updates
babolivier Oct 9, 2019
a603f4f
Merge branch 'develop' into babolivier/port_db_background_updates
babolivier Oct 22, 2019
7c5ab28
Incorporate review
babolivier Oct 22, 2019
797073a
Fixes
babolivier Oct 22, 2019
bb6e395
Convert bytes to bytearrays
babolivier Oct 22, 2019
d03a555
Incorporate review
babolivier Oct 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6102.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.
201 changes: 146 additions & 55 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,9 +30,23 @@ import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor

from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
from synapse.storage.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.deviceinbox import DeviceInboxBackgroundUpdateStore
from synapse.storage.devices import DeviceBackgroundUpdateStore
from synapse.storage.engines import create_engine
from synapse.storage.events_bg_updates import EventsBackgroundUpdatesStore
from synapse.storage.media_repository import MediaRepositoryBackgroundUpdateStore
from synapse.storage.prepare_database import prepare_database
from synapse.storage.registration import RegistrationBackgroundUpdateStore
from synapse.storage.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.search import SearchBackgroundUpdateStore
from synapse.storage.state import StateBackgroundUpdateStore
from synapse.storage.stats import StatsStore
from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore
from synapse.util import Clock

logger = logging.getLogger("synapse_port_db")

Expand All @@ -55,6 +70,8 @@ BOOLEAN_COLUMNS = {
"local_group_membership": ["is_publicised", "is_admin"],
"e2e_room_keys": ["is_verified"],
"account_validity": ["email_sent"],
"room_stats_state": ["is_federatable"],
"redactions": ["have_censored"],
}


Expand Down Expand Up @@ -96,33 +113,24 @@ APPEND_ONLY_TABLES = [
end_error_exec_info = None


class Store(object):
"""This object is used to pull out some of the convenience API from the
Storage layer.

*All* database interactions should go through this object.
"""

def __init__(self, db_pool, engine):
self.db_pool = db_pool
self.database_engine = engine

_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
_simple_insert = SQLBaseStore.__dict__["_simple_insert"]

_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
_simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
_simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
_simple_select_one_onecol_txn = SQLBaseStore.__dict__[
"_simple_select_one_onecol_txn"
]

_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
_simple_update_txn = SQLBaseStore.__dict__["_simple_update_txn"]
class Store(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
ClientIpBackgroundUpdateStore,
DeviceInboxBackgroundUpdateStore,
DeviceBackgroundUpdateStore,
EventsBackgroundUpdatesStore,
MediaRepositoryBackgroundUpdateStore,
RegistrationBackgroundUpdateStore,
RoomMemberBackgroundUpdateStore,
SearchBackgroundUpdateStore,
StateBackgroundUpdateStore,
UserDirectoryBackgroundUpdateStore,
StatsStore,
):
def __init__(self, db_conn, hs):
super(Store, self).__init__(db_conn, hs)
babolivier marked this conversation as resolved.
Show resolved Hide resolved
self.db_pool = hs.get_db_pool()

@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
def r(conn):
try:
Expand All @@ -148,7 +156,8 @@ class Store(object):
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise

return self.db_pool.runWithConnection(r)
with PreserveLoggingContext():
return (yield self.db_pool.runWithConnection(r))

def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs)
Expand All @@ -174,6 +183,25 @@ class Store(object):
raise


class MockHomeserver:
def __init__(self, config, database_engine, db_conn, db_pool):
self.database_engine = database_engine
self.db_conn = db_conn
self.db_pool = db_pool
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name

def get_db_conn(self):
return self.db_conn

def get_db_pool(self):
return self.db_pool

def get_clock(self):
return self.clock


class Porter(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
Expand Down Expand Up @@ -445,31 +473,86 @@ class Porter(object):

db_conn.commit()

return db_conn

def get_sqlite_store(self):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
sqlite_db_pool = adbapi.ConnectionPool(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
self.sqlite_config["name"], **self.sqlite_config["args"]
)
sqlite_engine = create_engine(self.sqlite_config)

self.progress.set_state("Preparing SQLite3")
sqlite_conn = self.setup_db(sqlite_config, sqlite_engine)

hs = MockHomeserver(
self.hs_config,
sqlite_engine,
sqlite_conn,
sqlite_db_pool,
)

return Store(sqlite_conn, hs)

@defer.inlineCallbacks
def run(self):
try:
sqlite_db_pool = adbapi.ConnectionPool(
self.sqlite_config["name"], **self.sqlite_config["args"]
)
def get_postgres_store(self):
postgres_db_pool = adbapi.ConnectionPool(
self.postgres_config["name"], **self.postgres_config["args"]
)
postgres_engine = create_engine(self.postgres_config)

postgres_db_pool = adbapi.ConnectionPool(
self.postgres_config["name"], **self.postgres_config["args"]
)
self.progress.set_state("Preparing PostgreSQL")
postgres_conn = self.setup_db(postgres_config, postgres_engine)

hs = MockHomeserver(
self.hs_config,
postgres_engine,
postgres_conn,
postgres_db_pool,
)

postgres_store = Store(postgres_conn, hs)

yield postgres_store.runInteraction(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"postgres_engine.check_database",
postgres_engine.check_database,
)

sqlite_engine = create_engine(sqlite_config)
postgres_engine = create_engine(postgres_config)
defer.returnValue(postgres_store)

self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
self.postgres_store = Store(postgres_db_pool, postgres_engine)
@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = yield self.postgres_store.has_completed_background_updates()

if not postgres_ready:
# Only say that we're running background updates when there are background
# updates to run.
self.progress.set_state("Running background updates on PostgreSQL")

while not postgres_ready:
yield self.postgres_store.do_next_background_update(100)
postgres_ready = yield (
self.postgres_store.has_completed_background_updates()
)

yield self.postgres_store.execute(postgres_engine.check_database)
@defer.inlineCallbacks
def run(self):
try:
self.sqlite_store = self.get_sqlite_store()

# Check if all background updates are done, abort if not.
updates_complete = yield self.sqlite_store.has_completed_background_updates()
if not updates_complete:
sys.stderr.write(
"Pending background updates exist in the SQLite3 database."
" Please start Synapse again and wait until every update has finished"
" before running this script.\n"
)
defer.returnValue(None)

# Step 1. Set up databases.
self.progress.set_state("Preparing SQLite3")
self.setup_db(sqlite_config, sqlite_engine)
self.postgres_store = yield self.get_postgres_store()

self.progress.set_state("Preparing PostgreSQL")
self.setup_db(postgres_config, postgres_engine)
yield self.run_background_updates_on_postgres()

self.progress.set_state("Creating port tables")

Expand Down Expand Up @@ -924,17 +1007,24 @@ if __name__ == "__main__":
},
}

postgres_config = yaml.safe_load(args.postgres_config)
hs_config = yaml.safe_load(args.postgres_config)

config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")

if "database" in postgres_config:
postgres_config = postgres_config["database"]
if "database" in hs_config:
babolivier marked this conversation as resolved.
Show resolved Hide resolved
postgres_config = hs_config["database"]

if "name" not in postgres_config:
sys.stderr.write("Malformed database config: no 'name'")
sys.exit(2)
if postgres_config["name"] != "psycopg2":
sys.stderr.write("Database must use 'psycopg2' connector.")
sys.exit(3)
if "name" not in postgres_config:
babolivier marked this conversation as resolved.
Show resolved Hide resolved
sys.stderr.write("Malformed database config: no 'name'\n")
sys.exit(2)
if postgres_config["name"] != "psycopg2":
sys.stderr.write("Database must use 'psycopg2' connector.\n")
sys.exit(3)

else:
sys.stderr.write("The configuration file must have a 'database' section.\n")
sys.exit(4)

def start(stdscr=None):
if stdscr:
Expand All @@ -947,6 +1037,7 @@ if __name__ == "__main__":
postgres_config=postgres_config,
progress=progress,
batch_size=args.batch_size,
hs_config=config,
babolivier marked this conversation as resolved.
Show resolved Hide resolved
)

reactor.callWhenRunning(porter.run)
Expand Down