Skip to content

Commit

Permalink
Add retries to transaction handler (#3148)
Browse files Browse the repository at this point in the history
  • Loading branch information
piazzatron authored May 26, 2022
1 parent baf8bfa commit d95cb72
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 31 deletions.
5 changes: 3 additions & 2 deletions identity-service/src/routes/solana.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ solanaRouter.post(
}

// Unpack instructions
let { instructions = [], skipPreflight, feePayerOverride, signatures = [], recentBlockhash } = req.body
let { instructions = [], skipPreflight, feePayerOverride, signatures = [], retry = true, recentBlockhash } = req.body

// Allowed relay checks
const isRelayAllowed = await areRelayAllowedInstructions(instructions)
Expand Down Expand Up @@ -91,7 +91,8 @@ solanaRouter.post(
signatures: (signatures || []).map(s => ({ ...s, signature: Buffer.from(s.signature.data) })),
instructions,
skipPreflight,
feePayerOverride
feePayerOverride,
retry
})

if (error) {
Expand Down
223 changes: 196 additions & 27 deletions libs/src/services/solanaWeb3Manager/transactionHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ const SolanaUtils = require('./utils')
const {
Transaction,
PublicKey,
sendAndConfirmRawTransaction
} = require('@solana/web3.js')

/**
Expand All @@ -28,12 +27,24 @@ class TransactionHandler {
* }
* @memberof TransactionHandler
*/
constructor ({ connection, useRelay, identityService = null, feePayerKeypairs = null, skipPreflight = true }) {
constructor ({
connection,
useRelay,
identityService = null,
feePayerKeypairs = null,
skipPreflight = true,
retryTimeoutMs = 60000,
pollingFrequencyMs = 300,
sendingFrequencyMs = 300
}) {
this.connection = connection
this.useRelay = useRelay
this.identityService = identityService
this.feePayerKeypairs = feePayerKeypairs
this.skipPreflight = skipPreflight
this.retryTimeoutMs = retryTimeoutMs
this.pollingFrequencyMs = pollingFrequencyMs
this.sendingFrequencyMs = sendingFrequencyMs
}

/**
Expand All @@ -55,27 +66,37 @@ class TransactionHandler {
* @returns {Promise<HandleTransactionReturn>}
* @memberof TransactionHandler
*/
async handleTransaction ({ instructions, errorMapping = null, recentBlockhash = null, logger = console, skipPreflight = null, feePayerOverride = null, sendBlockhash = true, signatures = null }) {
async handleTransaction ({ instructions, errorMapping = null, recentBlockhash = null, logger = console, skipPreflight = null, feePayerOverride = null, sendBlockhash = true, signatures = null, retry = true }) {
let result = null
if (this.useRelay) {
result = await this._relayTransaction(instructions, recentBlockhash, skipPreflight, feePayerOverride, sendBlockhash, signatures)
result = await this._relayTransaction(instructions, recentBlockhash, skipPreflight, feePayerOverride, sendBlockhash, signatures, retry)
} else {
result = await this._locallyConfirmTransaction(instructions, recentBlockhash, logger, skipPreflight, feePayerOverride, signatures)
result = await this._locallyConfirmTransaction(instructions, recentBlockhash, logger, skipPreflight, feePayerOverride, signatures, retry)
}
if (result.error && result.errorCode !== null && errorMapping) {
result.errorCode = errorMapping.fromErrorCode(result.errorCode)
}
return result
}

async _relayTransaction (instructions, recentBlockhash, skipPreflight, feePayerOverride = null, sendBlockhash, signatures) {
async _relayTransaction (
instructions,
recentBlockhash,
skipPreflight,
feePayerOverride = null,
sendBlockhash,
signatures,
retry
) {
const relayable = instructions.map(SolanaUtils.prepareInstructionForRelay)

const transactionData = {
signatures,
instructions: relayable,
skipPreflight: skipPreflight === null ? this.skipPreflight : skipPreflight,
feePayerOverride: feePayerOverride ? feePayerOverride.toString() : null
skipPreflight:
skipPreflight === null ? this.skipPreflight : skipPreflight,
feePayerOverride: feePayerOverride ? feePayerOverride.toString() : null,
retry
}

if (sendBlockhash || Array.isArray(signatures)) {
Expand All @@ -86,36 +107,45 @@ class TransactionHandler {
const response = await this.identityService.solanaRelay(transactionData)
return { res: response, error: null, errorCode: null }
} catch (e) {
const error = (e.response && e.response.data && e.response.data.error) || e.message
const error =
(e.response && e.response.data && e.response.data.error) || e.message
const errorCode = this._parseSolanaErrorCode(error)
return { res: null, error, errorCode }
}
}

async _locallyConfirmTransaction (instructions, recentBlockhash, logger, skipPreflight, feePayerOverride = null, signatures = null) {
async _locallyConfirmTransaction (instructions, recentBlockhash, logger, skipPreflight, feePayerOverride = null, signatures = null, retry = true) {
const feePayerKeypairOverride = (() => {
if (feePayerOverride && this.feePayerKeypairs) {
const stringFeePayer = feePayerOverride.toString()
return this.feePayerKeypairs.find(keypair => keypair.publicKey.toString() === stringFeePayer)
return this.feePayerKeypairs.find(
(keypair) => keypair.publicKey.toString() === stringFeePayer
)
}
return null
})()

const feePayerAccount = feePayerKeypairOverride || (this.feePayerKeypairs && this.feePayerKeypairs[0])
if (!feePayerAccount) {
console.error('Local feepayer keys missing for direct confirmation!')
logger.error('transactionHandler: Local feepayer keys missing for direct confirmation!')
return {
res: null,
error: 'Missing keys',
errorCode: null
}
}

recentBlockhash = recentBlockhash || (await this.connection.getLatestBlockhash('confirmed')).blockhash
const tx = new Transaction({ recentBlockhash })
tx.feePayer = feePayerAccount.publicKey
// Get blockhash

instructions.forEach(i => tx.add(i))
recentBlockhash =
recentBlockhash ||
(await this.connection.getLatestBlockhash('confirmed')).blockhash

// Construct the txn

const tx = new Transaction({ recentBlockhash })
instructions.forEach((i) => tx.add(i))
tx.feePayer = feePayerAccount.publicKey
tx.sign(feePayerAccount)

if (Array.isArray(signatures)) {
Expand All @@ -124,24 +154,70 @@ class TransactionHandler {
})
}

const rawTransaction = tx.serialize()

// Send the txn

const sendRawTransaction = async () => {
return this.connection.sendRawTransaction(rawTransaction, {
skipPreflight:
skipPreflight === null ? this.skipPreflight : skipPreflight,
commitment: 'processed',
preflightCommitment: 'processed',
maxRetries: retry ? 0 : undefined
})
}

let txid
try {
const txSerialized = tx.serialize()
const transactionSignature = await sendAndConfirmRawTransaction(
this.connection,
txSerialized,
{
skipPreflight: skipPreflight === null ? this.skipPreflight : skipPreflight,
commitment: 'processed',
preflightCommitment: 'processed'
txid = await sendRawTransaction()
} catch (e) {
// Rarely, this intiial send will fail
logger.warn(`transactionHandler: Initial send failed: ${e}`)
const { message: error } = e
const errorCode = this._parseSolanaErrorCode(error)
return {
res: null,
error,
errorCode
}
}

let done = false

// Start up resubmission loop
let sendCount = 0
const startTime = Date.now()
if (retry) {
;(async () => {
let elapsed = Date.now() - startTime
// eslint-disable-next-line no-unmodified-loop-condition
while (!done && elapsed < this.retryTimeoutMs) {
try {
sendRawTransaction()
} catch (e) {
logger.warn(`transactionHandler: error in send loop: ${e} for txId ${txid}`)
}
sendCount++
await delay(this.sendingFrequencyMs)
elapsed = Date.now() - startTime
}
)
logger.info(`transactionHandler: signature: ${transactionSignature}`)
})()
}

// Await for tx confirmation
try {
await this._awaitTransactionSignatureConfirmation(txid, logger)
done = true
logger.info(`Finished for txid ${txid} with ${sendCount} retries`)
return {
res: transactionSignature,
res: txid,
error: null,
errorCode: null
}
} catch (e) {
logger.warn(`transactionHandler: error in awaitTransactionSignature: ${e}, ${txid}`)
done = true
const { message: error } = e
const errorCode = this._parseSolanaErrorCode(error)
return {
Expand All @@ -152,6 +228,95 @@ class TransactionHandler {
}
}

async _awaitTransactionSignatureConfirmation (txid, logger) {
let done = false

const result = await new Promise((resolve, reject) => {
;(async () => {
// Setup timeout if nothing else finishes
setTimeout(() => {
if (done) {
return
}
done = true
const message = `transactionHandler: Timed out in await, ${txid}`
logger.warn(message)
reject(new Error(message))
}, this.retryTimeoutMs)

// Setup WS listener
try {
this.connection.onSignature(
txid,
(result) => {
if (done) return
done = true
if (result.err) {
logger.warn(`transactionHandler: Error in onSignature ${txid}, ${result.err}`)
reject(result.err)
} else {
resolve(txid)
}
},
'processed'
)
} catch (e) {
done = true
logger.error(`transactionHandler: WS error in setup ${txid}, ${e}`)
}

// Setup polling
while (!done) {
;(async () => {
try {
const signatureStatuses =
await this.connection.getSignatureStatuses([txid])
const result = signatureStatuses?.value[0]

// Early return this iteration if already done, or no result
if (done || !result) return

// End loop if error
if (result.err) {
logger.error(
`transactionHandler: polling saw result error: ${result.err}, tx: ${txid}`
)
done = true
reject(result.err)
return
}

// Early return if response without confirmation
if (
!(
result.confirmations ||
result.confirmationStatus === 'confirmed' ||
result.confirmationStatus === 'finalized'
)
) {
return
}

// Otherwise, we made it
done = true
resolve(txid)
} catch (e) {
if (!done) {
logger.error(
`transactionHandler: REST polling connection error: ${e}, tx: ${txid}`
)
}
}
})()

await delay(this.pollingFrequencyMs)
}
})()
})
done = true
return result
}

/**
* Attempts to parse an error code out of a message of the form:
* "... custom program error: 0x1", where the return in this case would be the number 1.
Expand All @@ -171,4 +336,8 @@ class TransactionHandler {
}
}

async function delay (ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}

module.exports = { TransactionHandler }
13 changes: 13 additions & 0 deletions service-commands/scripts/seed.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ program
process.exit(0)
})

program
.command('tip-identity')
.description('Sends a tip, via identity')
.option('-u, --user-id <number>', 'ID of user to set as active', null)
.option('-a, --amount <number>', 'Amount of audio to send', null)
.option('-r, --recipient-id <number>', 'ID of user to receive tip', null)
.action(async options => {
const { amount, userId, recipientId } = options.opts()
const seed = new SeedSession()
await seed.tipAudioIdentity({ amount, userId, recipientId })
process.exit(0)
})


const addCommandsToCli = (CLI_TO_COMMAND_MAP, program) => {
Object.entries(CLI_TO_COMMAND_MAP).forEach(
Expand Down
23 changes: 22 additions & 1 deletion service-commands/src/commands/seed/SeedSession.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const AudiusLibs = require('@audius/libs')
const UserCache = require('./UserCache')
const LocalStorageWrapper = require('./LocalStorageWrapper')
const fetch = require('node-fetch')
const BN = require('bn.js')

const { RandomUtils, SeedUtils, Constants } = require('../../utils')

Expand Down Expand Up @@ -130,7 +132,26 @@ class SeedSession {
'Encoded-Data-Message': message,
'Encoded-Data-Signature': signature
}

}

tipAudioIdentity = async ({ userId = null, recipientId, amount}) => {
if (!recipientId) {
console.error(`Needs valid recipient ID!`)
throw new Error()
}
// Get the amount in wei
const uiAmount = new BN(amount).mul(new BN('1000000000000000000'))
await this.setUser({ userId })
// const recipientDetails = this.cache.findUser({ userId: recipientId })
// get the spl wallet
const splWallet = await (async() => {
const res = await fetch(`http://localhost:5000/v1/users/${this.libs.Utils.encodeHashId(recipientId)}`)
const { data: { spl_wallet }} = await res.json()
return spl_wallet
})()

const res = await this.libs.solanaWeb3Manager.transferWAudio(splWallet, uiAmount)
console.log(res)
}
}

Expand Down
Loading

0 comments on commit d95cb72

Please sign in to comment.