Skip to content

Commit

Permalink
CON-247 Update redis write locking (Divergent State Recovery #1) (#3415)
Browse files Browse the repository at this point in the history
  • Loading branch information
SidSethi authored Jul 12, 2022
1 parent 50914f5 commit fb3beba
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 131 deletions.
40 changes: 20 additions & 20 deletions creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions creator-node/src/dbManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ class DBManager {
queryObj.clock = selectCNodeUserClockSubqueryLiteral

// Create new Data table entry with queryObj using new CNodeUser.clock
const file = await sequelizeTableInstance.create(queryObj, { transaction })
const newDataRecord = await sequelizeTableInstance.create(queryObj, {
transaction
})

return file.dataValues
return newDataRecord.dataValues
}

/**
Expand Down
22 changes: 0 additions & 22 deletions creator-node/src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,6 @@ async function authMiddleware(req, res, next) {
next()
}

/** Ensure resource write access */
async function syncLockMiddleware(req, res, next) {
if (req.session && req.session.wallet) {
const redisClient = req.app.get('redisClient')
const redisKey = redisClient.getNodeSyncRedisKey(req.session.wallet)
const lockHeld = await redisClient.lock.getLock(redisKey)
if (lockHeld) {
return sendResponse(
req,
res,
errorResponse(
423,
`Cannot change state of wallet ${req.session.wallet}. Node sync currently in progress.`
)
)
}
}
req.logger.info(`syncLockMiddleware succeeded`)
next()
}

/**
* Blocks writes if node is not the primary for audiusUser associated with wallet
*/
Expand Down Expand Up @@ -852,7 +831,6 @@ module.exports = {
ensureStorageMiddleware,
ensureValidSPMiddleware,
issueAndWaitForSecondarySyncRequests,
syncLockMiddleware,
getOwnEndpoint,
getCreatorNodeEndpoints
}
142 changes: 115 additions & 27 deletions creator-node/src/redis.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,122 @@
/**
* Exports a Singleton Redis client instance, with custom wallet write locking logic
*/

const Redis = require('ioredis')

const config = require('./config.js')
const { logger: genericLogger } = require('./logging')
const Redis = require('ioredis')
const { asyncRetry } = require('./utils')

const redisClient = new Redis(config.get('redisPort'), config.get('redisHost'))

const EXPIRATION = 60 * 60 * 2 // 2 hours in seconds
class RedisLock {
static async setLock(key, expiration = EXPIRATION) {
genericLogger.info(`SETTING LOCK ${key}`)
// set allows you to set an optional expire param
return redisClient.set(key, true, 'EX', expiration)
}
const _getWalletWriteLockKey = function (wallet) {
return `WRITE.WALLET.${wallet}`
}

static async getLock(key) {
genericLogger.info(`GETTING LOCK ${key}`)
return redisClient.get(key)
}
const WalletWriteLock = {
WALLET_WRITE_LOCK_EXPIRATION_SEC: 1800, // 30 min in sec

static async acquireLock(key, expiration = EXPIRATION) {
genericLogger.info(`SETTING LOCK IF NOT EXISTS ${key}`)
const response = await redisClient.set(key, true, 'NX', 'EX', expiration)
return !!response
}
VALID_ACQUIRERS: {
SecondarySyncFromPrimary: 'secondarySyncFromPrimary',
PrimarySyncFromSecondary: 'primarySyncFromSecondary'
},

static async removeLock(key) {
genericLogger.info(`DELETING LOCK ${key}`)
return redisClient.del(key)
}
}
/**
* Return lock holder, if held; else null
*/
getCurrentHolder: async function (wallet) {
const key = _getWalletWriteLockKey(wallet)
const holder = await redisClient.get(key)
return holder
},

/** Returns true if lock is held by sync, else false */
syncIsInProgress: async function (wallet) {
const holder = await this.getCurrentHolder(wallet)

function getNodeSyncRedisKey(wallet) {
return `NODESYNC.${wallet}`
return (
holder === this.VALID_ACQUIRERS.PrimarySyncFromSecondary ||
holder === this.VALID_ACQUIRERS.SecondarySyncFromPrimary
)
},

/**
* Return true if lock is held, else false
*/
isHeld: async function (wallet) {
const key = _getWalletWriteLockKey(wallet)
const holder = await redisClient.get(key)
return !!holder
},

ttl: async function (wallet) {
const key = _getWalletWriteLockKey(wallet)
const ttl = await redisClient.ttl(key)
return ttl
},

/**
* Attempt to acquire write lock for wallet
* Throws error on call failure or acquisition failure
* Does not return any value on success
* @param wallet
* @param acquirer
* @param expirationSec
*/
acquire: async function (
wallet,
acquirer,
expirationSec = this.WALLET_WRITE_LOCK_EXPIRATION_SEC
) {
// Ensure `acquirer` is valid
if (!Object.values(this.VALID_ACQUIRERS).includes(acquirer)) {
throw new Error(`Must provide valid acquirer`)
}

const key = _getWalletWriteLockKey(wallet)

let acquired = false

await asyncRetry({
asyncFn: async function () {
const response = await redisClient.set(
key,
acquirer, // value
'NX', // set if not exists
'EX', // set expiration in seconds
expirationSec
)
acquired = !!response
},
logger: genericLogger,
log: false
})

if (!acquired) {
throw new Error(
`[acquireWriteLockForWallet][Wallet: ${wallet}] Error: Failed to acquire lock - already held.`
)
}
},

/**
* Attempt to release write lock for wallet
* Throws error on call failure
* Does not return any value on success
*/
release: async function (wallet) {
const key = _getWalletWriteLockKey(wallet)

await asyncRetry({
asyncFn: async function () {
// Succeeds if removed or if no lock exists; throws error on call failure
await redisClient.del(key)
},
logger: genericLogger,
log: false
})
}
}

/**
Expand All @@ -39,7 +125,10 @@ function getNodeSyncRedisKey(wallet) {
* @param {string} param.keyPattern the redis key pattern that matches keys to remove
* @param {Object} param.logger the logger instance
*/
function deleteKeyPatternInRedis({ keyPattern, logger = genericLogger }) {
const deleteKeyPatternInRedis = function ({
keyPattern,
logger = genericLogger
}) {
// Create a readable stream (object mode)
const stream = redisClient.scanStream({
match: keyPattern
Expand All @@ -63,6 +152,5 @@ function deleteKeyPatternInRedis({ keyPattern, logger = genericLogger }) {
}

module.exports = redisClient
module.exports.lock = RedisLock
module.exports.getNodeSyncRedisKey = getNodeSyncRedisKey
module.exports.WalletWriteLock = WalletWriteLock
module.exports.deleteKeyPatternInRedis = deleteKeyPatternInRedis
Loading

0 comments on commit fb3beba

Please sign in to comment.