Skip to content

Commit

Permalink
fix: wrap mempool tx inserts in sql transactions, along with a few ot…
Browse files Browse the repository at this point in the history
…her queries
  • Loading branch information
zone117x committed Nov 18, 2020
1 parent 0a9d690 commit a6cf1f1
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 139 deletions.
2 changes: 1 addition & 1 deletion src/datastore/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ export interface DataStore extends DataStoreEventEmitter {
}): Promise<FoundOrNot<DbSmartContractEvent[]>>;

update(data: DataStoreUpdateData): Promise<void>;
updateMempoolTx(args: { mempoolTx: DbMempoolTx }): Promise<void>;
updateMempoolTxs(args: { mempoolTxs: DbMempoolTx[] }): Promise<void>;

updateBurnchainRewards(args: {
burnchainBlockHash: string;
Expand Down
8 changes: 5 additions & 3 deletions src/datastore/memory-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,11 @@ export class MemoryDataStore extends (EventEmitter as { new (): DataStoreEventEm
return Promise.resolve();
}

updateMempoolTx({ mempoolTx: tx }: { mempoolTx: DbMempoolTx }): Promise<void> {
this.txMempool.set(tx.tx_id, tx);
this.emit('txUpdate', tx);
updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTx[] }): Promise<void> {
txs.forEach(tx => {
this.txMempool.set(tx.tx_id, tx);
this.emit('txUpdate', tx);
});
return Promise.resolve();
}

Expand Down
291 changes: 169 additions & 122 deletions src/datastore/postgres-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1007,25 +1007,34 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
}

async getBlocks({ limit, offset }: { limit: number; offset: number }) {
const totalQuery = this.pool.query<{ count: number }>(`
SELECT COUNT(*)::integer
FROM blocks
WHERE canonical = true
`);
const resultQuery = this.pool.query<BlockQueryResult>(
`
SELECT ${BLOCK_COLUMNS}
FROM blocks
WHERE canonical = true
ORDER BY block_height DESC
LIMIT $1
OFFSET $2
`,
[limit, offset]
);
const [total, results] = await Promise.all([totalQuery, resultQuery]);
const parsed = results.rows.map(r => this.parseBlockQueryResult(r));
return { results: parsed, total: total.rows[0].count } as const;
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const total = await client.query<{ count: number }>(`
SELECT COUNT(*)::integer
FROM blocks
WHERE canonical = true
`);
const results = await client.query<BlockQueryResult>(
`
SELECT ${BLOCK_COLUMNS}
FROM blocks
WHERE canonical = true
ORDER BY block_height DESC
LIMIT $1
OFFSET $2
`,
[limit, offset]
);
await client.query('COMMIT');
const parsed = results.rows.map(r => this.parseBlockQueryResult(r));
return { results: parsed, total: total.rows[0].count } as const;
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
}

async getBlockTxs(indexBlockHash: string) {
Expand Down Expand Up @@ -1160,6 +1169,7 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
};
return parsed;
});
await client.query('COMMIT');
return results;
} catch (e) {
await client.query('ROLLBACK');
Expand All @@ -1185,6 +1195,7 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
`,
[burnchainRecipient]
);
await client.query('COMMIT');
const resultAmount = BigInt(queryResults.rows[0]?.amount ?? 0);
return { reward_recipient: burnchainRecipient, reward_amount: resultAmount };
} catch (e) {
Expand Down Expand Up @@ -1238,45 +1249,62 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
return result.rowCount;
}

async updateMempoolTx({ mempoolTx: tx }: { mempoolTx: DbMempoolTx }): Promise<void> {
const result = await this.pool.query(
`
INSERT INTO mempool_txs(
${MEMPOOL_TX_COLUMNS}
) values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
ON CONFLICT ON CONSTRAINT unique_tx_id
DO NOTHING
`,
[
tx.pruned,
hexToBuffer(tx.tx_id),
tx.raw_tx,
tx.type_id,
tx.status,
tx.receipt_time,
tx.post_conditions,
tx.fee_rate,
tx.sponsored,
tx.sponsor_address,
tx.sender_address,
tx.origin_hash_mode,
tx.token_transfer_recipient_address,
tx.token_transfer_amount,
tx.token_transfer_memo,
tx.smart_contract_contract_id,
tx.smart_contract_source_code,
tx.contract_call_contract_id,
tx.contract_call_function_name,
tx.contract_call_function_args,
tx.poison_microblock_header_1,
tx.poison_microblock_header_2,
tx.coinbase_payload,
]
);
if (result.rowCount !== 1) {
const errMsg = `A duplicate transaction was attempted to be inserted into the mempool_txs table: ${tx.tx_id}`;
logger.error(errMsg);
} else {
async updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTx[] }): Promise<void> {
const client = await this.pool.connect();
const updatedTxs: DbMempoolTx[] = [];
try {
await client.query('BEGIN');
for (const tx of txs) {
const result = await client.query(
`
INSERT INTO mempool_txs(
${MEMPOOL_TX_COLUMNS}
) values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
ON CONFLICT ON CONSTRAINT unique_tx_id
DO NOTHING
`,
[
tx.pruned,
hexToBuffer(tx.tx_id),
tx.raw_tx,
tx.type_id,
tx.status,
tx.receipt_time,
tx.post_conditions,
tx.fee_rate,
tx.sponsored,
tx.sponsor_address,
tx.sender_address,
tx.origin_hash_mode,
tx.token_transfer_recipient_address,
tx.token_transfer_amount,
tx.token_transfer_memo,
tx.smart_contract_contract_id,
tx.smart_contract_source_code,
tx.contract_call_contract_id,
tx.contract_call_function_name,
tx.contract_call_function_args,
tx.poison_microblock_header_1,
tx.poison_microblock_header_2,
tx.coinbase_payload,
]
);
if (result.rowCount !== 1) {
const errMsg = `A duplicate transaction was attempted to be inserted into the mempool_txs table: ${tx.tx_id}`;
logger.error(errMsg);
} else {
updatedTxs.push(tx);
}
}
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}

for (const tx of updatedTxs) {
this.emit('txUpdate', tx);
}
}
Expand Down Expand Up @@ -1402,28 +1430,36 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
limit: number;
offset: number;
}): Promise<{ results: DbMempoolTx[]; total: number }> {
// TODO: wrap the following queries in a transaction
const totalQuery = await this.pool.query<{ count: number }>(
`
SELECT COUNT(*)::integer
FROM mempool_txs
WHERE pruned = false
`
);
const resultQuery = await this.pool.query<MempoolTxQueryResult>(
`
SELECT ${MEMPOOL_TX_COLUMNS}
FROM mempool_txs
WHERE pruned = false
ORDER BY receipt_time DESC
LIMIT $1
OFFSET $2
`,
[limit, offset]
);

const parsed = resultQuery.rows.map(r => this.parseMempoolTxQueryResult(r));
return { results: parsed, total: totalQuery.rows[0].count };
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const totalQuery = await client.query<{ count: number }>(
`
SELECT COUNT(*)::integer
FROM mempool_txs
WHERE pruned = false
`
);
const resultQuery = await client.query<MempoolTxQueryResult>(
`
SELECT ${MEMPOOL_TX_COLUMNS}
FROM mempool_txs
WHERE pruned = false
ORDER BY receipt_time DESC
LIMIT $1
OFFSET $2
`,
[limit, offset]
);
await client.query('COMMIT');
const parsed = resultQuery.rows.map(r => this.parseMempoolTxQueryResult(r));
return { results: parsed, total: totalQuery.rows[0].count };
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
}

async getMempoolTxIdList(): Promise<{ results: DbMempoolTxId[] }> {
Expand Down Expand Up @@ -1473,49 +1509,59 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
}) {
let totalQuery: QueryResult<{ count: number }>;
let resultQuery: QueryResult<TxQueryResult>;
if (txTypeFilter.length === 0) {
totalQuery = await this.pool.query<{ count: number }>(
`
SELECT COUNT(*)::integer
FROM txs
WHERE canonical = true
`
);
resultQuery = await this.pool.query<TxQueryResult>(
`
SELECT ${TX_COLUMNS}
FROM txs
WHERE canonical = true
ORDER BY block_height DESC, tx_index DESC
LIMIT $1
OFFSET $2
`,
[limit, offset]
);
} else {
const txTypeIds = txTypeFilter.map<number>(t => getTxTypeId(t));
totalQuery = await this.pool.query<{ count: number }>(
`
SELECT COUNT(*)::integer
FROM txs
WHERE canonical = true AND type_id = ANY($1)
`,
[txTypeIds]
);
resultQuery = await this.pool.query<TxQueryResult>(
`
SELECT ${TX_COLUMNS}
FROM txs
WHERE canonical = true AND type_id = ANY($1)
ORDER BY block_height DESC, tx_index DESC
LIMIT $2
OFFSET $3
`,
[txTypeIds, limit, offset]
);
const client = await this.pool.connect();
try {
await client.query('BEGIN');
if (txTypeFilter.length === 0) {
totalQuery = await client.query<{ count: number }>(
`
SELECT COUNT(*)::integer
FROM txs
WHERE canonical = true
`
);
resultQuery = await client.query<TxQueryResult>(
`
SELECT ${TX_COLUMNS}
FROM txs
WHERE canonical = true
ORDER BY block_height DESC, tx_index DESC
LIMIT $1
OFFSET $2
`,
[limit, offset]
);
} else {
const txTypeIds = txTypeFilter.map<number>(t => getTxTypeId(t));
totalQuery = await client.query<{ count: number }>(
`
SELECT COUNT(*)::integer
FROM txs
WHERE canonical = true AND type_id = ANY($1)
`,
[txTypeIds]
);
resultQuery = await client.query<TxQueryResult>(
`
SELECT ${TX_COLUMNS}
FROM txs
WHERE canonical = true AND type_id = ANY($1)
ORDER BY block_height DESC, tx_index DESC
LIMIT $2
OFFSET $3
`,
[txTypeIds, limit, offset]
);
}
await client.query('COMMIT');
const parsed = resultQuery.rows.map(r => this.parseTxQueryResult(r));
return { results: parsed, total: totalQuery.rows[0].count };
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
const parsed = resultQuery.rows.map(r => this.parseTxQueryResult(r));
return { results: parsed, total: totalQuery.rows[0].count };
}

async getTxEvents(txId: string, indexBlockHash: string) {
Expand Down Expand Up @@ -1703,6 +1749,7 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
events[rowIndex++] = event;
}
events.sort((a, b) => a.event_index - b.event_index);
await client.query('COMMIT');
return { results: events };
} catch (e) {
await client.query('ROLLBACK');
Expand Down
7 changes: 4 additions & 3 deletions src/event-stream/event-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async function handleMempoolTxsMessage(rawTxs: string[], db: DataStore): Promise
rawTx: buffer,
};
});
for (const tx of decodedTxs) {
const dbMempoolTxs = decodedTxs.map(tx => {
logger.verbose(`Received mempool tx: ${tx.txId}`);
const dbMempoolTx = createDbMempoolTxFromCoreMsg({
txId: tx.txId,
Expand All @@ -91,8 +91,9 @@ async function handleMempoolTxsMessage(rawTxs: string[], db: DataStore): Promise
rawTx: tx.rawTx,
receiptDate: receiptDate,
});
await db.updateMempoolTx({ mempoolTx: dbMempoolTx });
}
return dbMempoolTx;
});
await db.updateMempoolTxs({ mempoolTxs: dbMempoolTxs });
}

async function handleClientMessage(msg: CoreNodeBlockMessage, db: DataStore): Promise<void> {
Expand Down
Loading

0 comments on commit a6cf1f1

Please sign in to comment.