Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrations post stateless version #515

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ require (
golang.org/x/sync v0.8.0
)

require gopkg.in/yaml.v3 v3.0.1 // indirect

require (
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/jackc/pgxlisten v0.0.0-20241005155529-9d952acd6a6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// stateless version (+1 regarding directory name, as migrations start from 1 in the lib)
const MinimalSchemaVersion = 12
const MinimalSchemaVersion = 15

type Bucket struct {
name string
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/bucket/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ var MigrationsFS embed.FS

func GetMigrator(db *bun.DB, name string) *migrations.Migrator {
migrator := migrations.NewMigrator(db, migrations.WithSchema(name))
migrations, err := migrations.CollectMigrations(MigrationsFS, name)
_migrations, err := migrations.CollectMigrations(MigrationsFS, name)
if err != nil {
panic(err)
}
migrator.RegisterMigrations(migrations...)
migrator.RegisterMigrations(_migrations...)

return migrator
}
Expand All @@ -27,4 +27,4 @@ func migrate(ctx context.Context, tracer trace.Tracer, db *bun.DB, name string)
defer span.End()

return GetMigrator(db, name).Up(ctx)
}
}
2 changes: 1 addition & 1 deletion internal/storage/bucket/migrations/1-fix-trigger/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ begin
posting ->> 'destination', posting ->> 'asset', (posting ->> 'amount')::numeric, false,
_destination_exists);
end;
$$ set search_path from current;
$$ set search_path from current;
3 changes: 0 additions & 3 deletions internal/storage/bucket/migrations/11-make-stateless/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ execute procedure set_compat_on_transactions_metadata();

alter table transactions
add column post_commit_volumes jsonb,
-- todo: set in subsequent migration `default transaction_date()`,
-- otherwise the function is called for every existing lines
add column inserted_at timestamp without time zone,
alter column timestamp set default transaction_date()
-- todo: we should change the type of this column, but actually it cause a full lock of the table
Expand Down Expand Up @@ -363,7 +361,6 @@ from (select row_number() over () as number, v.value
select null) v) data
$$ set search_path from current;

-- todo(next-minor): remove that on future version when the table will have this default value (need to fill nulls before)
create or replace function set_transaction_inserted_at() returns trigger
security definer
language plpgsql
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill transaction ids of table moves
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
do $$
declare
_batch_size integer := 100;
_max integer;
begin
set search_path = '{{.Schema}}';

create index moves_transactions_id on moves(transactions_id);

select count(seq)
from moves
where transactions_id is null
into _max;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _max);
loop

with _outdated_moves as (
select *
from moves
where transactions_id is null
limit _batch_size
)
update moves
set transactions_id = (
select id
from transactions
where seq = moves.transactions_seq
)
from _outdated_moves
where moves.seq in (_outdated_moves.seq);

exit when not found;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

commit ;
end loop;

alter table moves
alter column transactions_id set not null;
end
$$
language plpgsql;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill inserted_at column of transactions table
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
do $$
declare
_batch_size integer := 100;
_date timestamp without time zone;
_count integer := 0;
begin
--todo: take explicit advisory lock to avoid concurrent migrations when the service is killed
set search_path = '{{.Schema}}';

-- select the date where the "11-make-stateless" migration has been applied
select tstamp into _date
from _system.goose_db_version
where version_id = 12;

create temporary table logs_transactions as
select id, ledger, date, (data->'transaction'->>'id')::bigint as transaction_id
from logs
where date <= _date;

create index on logs_transactions (ledger, transaction_id) include (id, date);

select count(*) into _count
from logs_transactions;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

for i in 0.._count by _batch_size loop
-- disable triggers
set session_replication_role = replica;

with _rows as (
select *
from logs_transactions
order by ledger, transaction_id
offset i
limit _batch_size
)
update transactions
set inserted_at = _rows.date
from _rows
where transactions.ledger = _rows.ledger and transactions.id = _rows.transaction_id;

-- enable triggers
set session_replication_role = default;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
end loop;

drop table logs_transactions;

alter table transactions
alter column inserted_at set default transaction_date();

drop trigger set_transaction_inserted_at on transactions;
drop function set_transaction_inserted_at;
end
$$;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill post_commit_volumes column of transactions table
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
do $$
declare
_batch_size integer := 100;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(id)
from transactions
where post_commit_volumes is null
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
-- disable triggers
set session_replication_role = replica;

with _outdated_transactions as (
select id
from transactions
where post_commit_volumes is null
limit _batch_size
)
update transactions
set post_commit_volumes = (
select public.aggregate_objects(post_commit_volumes::jsonb) as post_commit_volumes
from (
select accounts_address, json_build_object(accounts_address, post_commit_volumes) post_commit_volumes
from (
select accounts_address, json_build_object(asset, post_commit_volumes) as post_commit_volumes
from (
select distinct on (accounts_address, asset)
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by accounts_address, asset
order by seq desc
) as post_commit_volumes
from moves
where transactions_id = transactions.id and ledger = transactions.ledger
) moves
) values
) values
)
from _outdated_transactions
where transactions.id in (_outdated_transactions.id);

-- enable triggers
set session_replication_role = default;

exit when not found;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
end loop;

alter table transactions
alter column post_commit_volumes set not null;
end
$$;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Populate accounts_volumes table with historic data
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
do $$
declare
_count integer;
_batch_size integer := 100;
begin
set search_path = '{{.Schema}}';

create temporary table tmp_volumes as
select distinct on (ledger, accounts_address, asset)
ledger,
accounts_address,
asset,
first_value(post_commit_volumes) over (
partition by ledger, accounts_address, asset
order by seq desc
) as post_commit_volumes
from moves
where not exists(
select
from accounts_volumes
where ledger = moves.ledger
and asset = moves.asset
and accounts_address = moves.accounts_address
);

select count(*)
from tmp_volumes
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

raise info '_count: %', _count;

for i in 0.._count by _batch_size loop
with _rows as (
select *
from tmp_volumes
offset i
limit _batch_size
)
insert into accounts_volumes (ledger, accounts_address, asset, input, output)
select ledger, accounts_address, asset, (post_commit_volumes).inputs, (post_commit_volumes).outputs
from _rows
on conflict do nothing; -- can be inserted by a concurrent transaction

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

end loop;

drop table tmp_volumes;
end
$$;
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ with all_assets as (select v.v as asset
) m on true)
select moves.asset, moves.post_commit_volumes
from moves
$$ set search_path from current;

$$ set search_path from current;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill transactions_id column of transactions_metadata table
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

do $$
declare
_batch_size integer := 100;
_count integer;
begin
set search_path = '{{.Schema}}';

select count(seq)
from transactions_metadata
where transactions_id is null
into _count;

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count);

loop
with _outdated_transactions_metadata as (
select seq
from transactions_metadata
where transactions_id is null
limit _batch_size
)
update transactions_metadata
set transactions_id = (
select id
from transactions
where transactions_metadata.transactions_seq = seq
)
from _outdated_transactions_metadata
where transactions_metadata.seq in (_outdated_transactions_metadata.seq);

exit when not found;

commit;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

end loop;

alter table transactions_metadata
alter column transactions_id set not null ;
end
$$;

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Fill accounts_address column of accounts_metadata table
Loading
Loading