-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CON-247 Update redis write locking (Divergent State Recovery #1) #3415
Changes from all commits
5c4c6d7
99f28bb
1694d6b
5d3108c
816d307
2e005db
fd3a213
5a61327
4aa353a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,27 +89,6 @@ async function authMiddleware(req, res, next) { | |
next() | ||
} | ||
|
||
/** Ensure resource write access */ | ||
async function syncLockMiddleware(req, res, next) { | ||
if (req.session && req.session.wallet) { | ||
const redisClient = req.app.get('redisClient') | ||
const redisKey = redisClient.getNodeSyncRedisKey(req.session.wallet) | ||
const lockHeld = await redisClient.lock.getLock(redisKey) | ||
if (lockHeld) { | ||
return sendResponse( | ||
req, | ||
res, | ||
errorResponse( | ||
423, | ||
`Cannot change state of wallet ${req.session.wallet}. Node sync currently in progress.` | ||
) | ||
) | ||
} | ||
} | ||
req.logger.info(`syncLockMiddleware succeeded`) | ||
next() | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removing this since it made no sense to have - primaries would never have syncs happening at the same time |
||
/** | ||
* Blocks writes if node is not the primary for audiusUser associated with wallet | ||
*/ | ||
|
@@ -852,7 +831,6 @@ module.exports = { | |
ensureStorageMiddleware, | ||
ensureValidSPMiddleware, | ||
issueAndWaitForSecondarySyncRequests, | ||
syncLockMiddleware, | ||
getOwnEndpoint, | ||
getCreatorNodeEndpoints | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,36 +1,122 @@ | ||
/** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. diff is confusing - better way to view this might be to see what's currently on master for RedisLock: https://github.com/AudiusProject/audius-protocol/blob/master/creator-node/src/redis.js Changes:
|
||
* Exports a Singleton Redis client instance, with custom wallet write locking logic | ||
*/ | ||
|
||
const Redis = require('ioredis') | ||
|
||
const config = require('./config.js') | ||
const { logger: genericLogger } = require('./logging') | ||
const Redis = require('ioredis') | ||
const { asyncRetry } = require('./utils') | ||
|
||
const redisClient = new Redis(config.get('redisPort'), config.get('redisHost')) | ||
|
||
const EXPIRATION = 60 * 60 * 2 // 2 hours in seconds | ||
class RedisLock { | ||
static async setLock(key, expiration = EXPIRATION) { | ||
genericLogger.info(`SETTING LOCK ${key}`) | ||
// set allows you to set an optional expire param | ||
return redisClient.set(key, true, 'EX', expiration) | ||
} | ||
const _getWalletWriteLockKey = function (wallet) { | ||
return `WRITE.WALLET.${wallet}` | ||
} | ||
|
||
static async getLock(key) { | ||
genericLogger.info(`GETTING LOCK ${key}`) | ||
return redisClient.get(key) | ||
} | ||
const WalletWriteLock = { | ||
WALLET_WRITE_LOCK_EXPIRATION_SEC: 1800, // 30 min in sec | ||
|
||
static async acquireLock(key, expiration = EXPIRATION) { | ||
genericLogger.info(`SETTING LOCK IF NOT EXISTS ${key}`) | ||
const response = await redisClient.set(key, true, 'NX', 'EX', expiration) | ||
return !!response | ||
} | ||
VALID_ACQUIRERS: { | ||
SecondarySyncFromPrimary: 'secondarySyncFromPrimary', | ||
PrimarySyncFromSecondary: 'primarySyncFromSecondary' | ||
}, | ||
|
||
static async removeLock(key) { | ||
genericLogger.info(`DELETING LOCK ${key}`) | ||
return redisClient.del(key) | ||
} | ||
} | ||
/** | ||
* Return lock holder, if held; else null | ||
*/ | ||
getCurrentHolder: async function (wallet) { | ||
const key = _getWalletWriteLockKey(wallet) | ||
const holder = await redisClient.get(key) | ||
return holder | ||
}, | ||
|
||
/** Returns true if lock is held by sync, else false */ | ||
syncIsInProgress: async function (wallet) { | ||
const holder = await this.getCurrentHolder(wallet) | ||
|
||
function getNodeSyncRedisKey(wallet) { | ||
return `NODESYNC.${wallet}` | ||
return ( | ||
holder === this.VALID_ACQUIRERS.PrimarySyncFromSecondary || | ||
holder === this.VALID_ACQUIRERS.SecondarySyncFromPrimary | ||
) | ||
}, | ||
|
||
/** | ||
* Return true if lock is held, else false | ||
*/ | ||
isHeld: async function (wallet) { | ||
const key = _getWalletWriteLockKey(wallet) | ||
const holder = await redisClient.get(key) | ||
return !!holder | ||
}, | ||
|
||
ttl: async function (wallet) { | ||
const key = _getWalletWriteLockKey(wallet) | ||
const ttl = await redisClient.ttl(key) | ||
return ttl | ||
}, | ||
|
||
/** | ||
* Attempt to acquire write lock for wallet | ||
* Throws error on call failure or acquisition failure | ||
* Does not return any value on success | ||
* @param wallet | ||
* @param acquirer | ||
* @param expirationSec | ||
*/ | ||
acquire: async function ( | ||
wallet, | ||
acquirer, | ||
expirationSec = this.WALLET_WRITE_LOCK_EXPIRATION_SEC | ||
) { | ||
// Ensure `acquirer` is valid | ||
if (!Object.values(this.VALID_ACQUIRERS).includes(acquirer)) { | ||
throw new Error(`Must provide valid acquirer`) | ||
} | ||
|
||
const key = _getWalletWriteLockKey(wallet) | ||
|
||
let acquired = false | ||
|
||
await asyncRetry({ | ||
asyncFn: async function () { | ||
const response = await redisClient.set( | ||
key, | ||
acquirer, // value | ||
'NX', // set if not exists | ||
'EX', // set expiration in seconds | ||
expirationSec | ||
) | ||
acquired = !!response | ||
}, | ||
logger: genericLogger, | ||
log: false | ||
}) | ||
|
||
if (!acquired) { | ||
throw new Error( | ||
`[acquireWriteLockForWallet][Wallet: ${wallet}] Error: Failed to acquire lock - already held.` | ||
) | ||
} | ||
}, | ||
|
||
/** | ||
* Attempt to release write lock for wallet | ||
* Throws error on call failure | ||
* Does not return any value on success | ||
*/ | ||
release: async function (wallet) { | ||
const key = _getWalletWriteLockKey(wallet) | ||
|
||
await asyncRetry({ | ||
asyncFn: async function () { | ||
// Succeeds if removed or if no lock exists; throws error on call failure | ||
await redisClient.del(key) | ||
}, | ||
logger: genericLogger, | ||
log: false | ||
}) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -39,7 +125,10 @@ function getNodeSyncRedisKey(wallet) { | |
* @param {string} param.keyPattern the redis key pattern that matches keys to remove | ||
* @param {Object} param.logger the logger instance | ||
*/ | ||
function deleteKeyPatternInRedis({ keyPattern, logger = genericLogger }) { | ||
const deleteKeyPatternInRedis = function ({ | ||
keyPattern, | ||
logger = genericLogger | ||
}) { | ||
// Create a readable stream (object mode) | ||
const stream = redisClient.scanStream({ | ||
match: keyPattern | ||
|
@@ -63,6 +152,5 @@ function deleteKeyPatternInRedis({ keyPattern, logger = genericLogger }) { | |
} | ||
|
||
module.exports = redisClient | ||
module.exports.lock = RedisLock | ||
module.exports.getNodeSyncRedisKey = getNodeSyncRedisKey | ||
module.exports.WalletWriteLock = WalletWriteLock | ||
module.exports.deleteKeyPatternInRedis = deleteKeyPatternInRedis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a naming nit for accuracy