Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CON-238 - Divergent state PR#4 - Primary Sync from Secondary #3399

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions creator-node/scripts/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ run_unit_tests () {

SidSethi marked this conversation as resolved.
Show resolved Hide resolved
run_integration_tests () {
echo Running integration tests...
./node_modules/mocha/bin/mocha --require ts-node/register test/*.test.js --timeout "${INTEGRATION_TIMEOUT}" --exit
./node_modules/mocha/bin/mocha --require ts-node/register test/issueSyncRequest.jobProcessor.test.js --timeout "${INTEGRATION_TIMEOUT}" --exit
}

ARG1=${@:$OPTIND:1}
Expand Down Expand Up @@ -124,7 +124,7 @@ export minimumRollingSyncCount=10
export minimumSuccessfulSyncCountPercentage=50

# tests
run_unit_tests
# run_unit_tests
run_integration_tests

rm -r $storagePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const {
} = require('./URSMRegistrationComponentService')
const { ensureStorageMiddleware } = require('../../middlewares')
const { enqueueSync } = require('./syncQueueComponentService')
const processSync = require('../../services/sync/processSync')
const processSync = require('../../services/sync/secondarySyncFromPrimary')

const router = express.Router()

Expand Down
6 changes: 4 additions & 2 deletions creator-node/src/dbManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ class DBManager {
queryObj.clock = selectCNodeUserClockSubqueryLiteral

// Create new Data table entry with queryObj using new CNodeUser.clock
const file = await sequelizeTableInstance.create(queryObj, { transaction })
const newDataRecord = await sequelizeTableInstance.create(queryObj, {
transaction
})

return file.dataValues
return newDataRecord.dataValues
}

/**
Expand Down
5 changes: 5 additions & 0 deletions creator-node/src/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ function getNodeSyncRedisKey(wallet) {
return `NODESYNC.${wallet}`
}

function getRedisKeyForWallet(wallet) {
return `WALLET.${wallet}`
}

SidSethi marked this conversation as resolved.
Show resolved Hide resolved
/**
* Deletes keys of a pattern: https://stackoverflow.com/a/36006360
* @param {Object} param
Expand Down Expand Up @@ -66,3 +70,4 @@ module.exports = redisClient
module.exports.lock = RedisLock
module.exports.getNodeSyncRedisKey = getNodeSyncRedisKey
module.exports.deleteKeyPatternInRedis = deleteKeyPatternInRedis
module.exports.getRedisKeyForWallet = getRedisKeyForWallet
5 changes: 4 additions & 1 deletion creator-node/src/serviceRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class ServiceRegistry {
* Configure all services
*/
async initServices() {
// init libs
this.libs = await this._initAudiusLibs()

// Transcode handoff requires libs. Set libs in AsyncProcessingQueue after libs init is complete
Expand All @@ -83,6 +82,10 @@ class ServiceRegistry {
this.servicesInitialized = true
}

async initLibs() {
this.libs = this.libs || (await this._initAudiusLibs())
}

SidSethi marked this conversation as resolved.
Show resolved Hide resolved
/**
* These services do not need to be awaited and do not require the server.
*/
Expand Down
3 changes: 2 additions & 1 deletion creator-node/src/services/stateMachineManager/processJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ module.exports = async function (
const jobLogger = createChildLogger(parentLogger, { jobName, jobId })
jobLogger.info(`New job: ${JSON.stringify(job)}`)

let result
const jobDurationSecondsHistogram = prometheusRegistry.getMetric(
prometheusRegistry.metricNames[
`STATE_MACHINE_${jobName}_JOB_DURATION_SECONDS_HISTOGRAM`
]
)
const metricEndTimerFn = jobDurationSecondsHistogram.startTimer()

let result
try {
await redis.set(`latestJobStart_${jobName}`, Date.now())
result = await jobProcessor({ logger: jobLogger, ...jobData })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,32 +224,29 @@ const _findSyncsForUser = async (
continue
}

if (syncMode === SYNC_MODES.SyncSecondaryFromPrimary) {
try {
const { duplicateSyncReq, syncReqToEnqueue } = getNewOrExistingSyncReq({
userWallet: wallet,
primaryEndpoint: thisContentNodeEndpoint,
secondaryEndpoint: secondary,
syncType: SyncType.Recurring
})
try {
const { duplicateSyncReq, syncReqToEnqueue } = getNewOrExistingSyncReq({
userWallet: wallet,
primaryEndpoint: thisContentNodeEndpoint,
secondaryEndpoint: secondary,
syncType: SyncType.Recurring,
syncMode
})

if (!_.isEmpty(syncReqToEnqueue)) {
syncReqsToEnqueue.push(syncReqToEnqueue)
} else if (!_.isEmpty(duplicateSyncReq)) {
duplicateSyncReqs.push(duplicateSyncReq)
}
} catch (e) {
errors.push(
`Error getting new or existing sync request for user ${wallet} and secondary ${secondary} - ${e.message}`
)
continue
if (!_.isEmpty(syncReqToEnqueue)) {
syncReqsToEnqueue.push(syncReqToEnqueue)
} else if (!_.isEmpty(duplicateSyncReq)) {
duplicateSyncReqs.push(duplicateSyncReq)
}
} else if (syncMode === SYNC_MODES.MergePrimaryAndSecondary) {
/**
* TODO - currently just logs as placeholder
* 1. Primary will sync all content from secondary
* 2. Primary will force secondary to wipe its local state and resync all content
*/
} catch (e) {
errors.push(
`Error getting new or existing sync request for syncMode ${syncMode}, user ${wallet} and secondary ${secondary} - ${e.message}`
)
continue
}

// TODOSID
if (syncMode === SYNC_MODES.MergePrimaryAndSecondary) {
logger.info(
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
`[findSyncRequests][_findSyncsForUser][MergePrimaryAndSecondary = true][SyncType = ${SyncType.Recurring}] wallet ${wallet} secondary ${secondary} Clocks: [${primaryClock},${secondaryClock}] Files hashes: [${primaryFilesHash},${secondaryFilesHash}]`
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ const SyncRequestDeDuplicator = require('./SyncRequestDeDuplicator')
const SecondarySyncHealthTracker = require('./SecondarySyncHealthTracker')
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
const {
SYNC_MONITORING_RETRY_DELAY_MS,
QUEUE_NAMES
QUEUE_NAMES,
SYNC_MODES
} = require('../stateMachineConstants')
const primarySyncFromSecondary = require('../../sync/primarySyncFromSecondary')

const thisContentNodeEndpoint = config.get('creatorNodeEndpoint')
const secondaryUserSyncDailyFailureCountThreshold = config.get(
Expand All @@ -37,8 +39,14 @@ const maxSyncMonitoringDurationInMs = config.get(
* @param {string} param.syncType the type of sync (manual or recurring)
* @param {Object} param.syncRequestParameters axios params to make the sync request. Shape: { baseURL, url, method, data }
*/
module.exports = async function ({ logger, syncType, syncRequestParameters }) {
module.exports = async function ({
logger,
syncType,
syncMode,
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
syncRequestParameters
}) {
_validateJobData(logger, syncType, syncRequestParameters)

const isValidSyncJobData =
'baseURL' in syncRequestParameters &&
'url' in syncRequestParameters &&
Expand Down Expand Up @@ -69,7 +77,7 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) {
const userWallet = syncRequestParameters.data.wallet[0]
const secondaryEndpoint = syncRequestParameters.baseURL

const logMsgString = `(${syncType}) User ${userWallet} | Secondary: ${secondaryEndpoint}`
const logMsgString = `(${syncType})(${syncMode}) User ${userWallet} | Secondary: ${secondaryEndpoint}`

/**
* Remove sync from SyncRequestDeDuplicator once it moves to Active status, before processing.
Expand Down Expand Up @@ -99,6 +107,16 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) {
}
}

if (syncMode === SYNC_MODES.MergePrimaryAndSecondary) {
// TODO asyncretry?
const error = await primarySyncFromSecondary(secondaryEndpoint, userWallet)

// TODO logging

if (error) {
}
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
}

// primaryClockValue is used in additionalSyncIsRequired() call below
const primaryClockValue = (await _getUserPrimaryClockValues([userWallet]))[
userWallet
Expand All @@ -108,9 +126,13 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) {
`------------------Process SYNC | ${logMsgString} | Primary clock value ${primaryClockValue}------------------`
)

// Issue sync request to secondary
// Issue sync request to secondary with forceResync = true
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
try {
await axios(syncRequestParameters)
const syncRequestParametersForceResync = {
...syncRequestParameters,
data: { ...syncRequestParameters.data, forceResync: true }
}
await axios(syncRequestParametersForceResync)
} catch (e) {
// Axios request will throw on non-200 response -> swallow error to ensure below logic is executed
logger.error(`${logMsgString} || Error issuing sync request: ${e.message}`)
Expand All @@ -126,6 +148,7 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) {
primaryClockValue,
secondaryEndpoint,
syncType,
syncMode,
logger
)
metricsToRecord.push(
Expand All @@ -147,7 +170,8 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) {
userWallet,
secondaryEndpoint,
primaryEndpoint: thisContentNodeEndpoint,
syncType
syncType,
syncMode: SYNC_MODES.SyncSecondaryFromPrimary
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
})
if (duplicateSyncReq && !_.isEmpty(duplicateSyncReq)) {
error = {
Expand Down Expand Up @@ -235,6 +259,7 @@ const _additionalSyncIsRequired = async (
primaryClockValue = -1,
secondaryUrl,
syncType,
syncMode,
logger
) => {
const logMsgString = `additionalSyncIsRequired() (${syncType}): wallet ${userWallet} secondary ${secondaryUrl} primaryClock ${primaryClockValue}`
Expand All @@ -259,7 +284,9 @@ const _additionalSyncIsRequired = async (
logger.info(`${logMsgString} secondaryClock ${secondaryClockValue}`)

// Record starting and current clock values for secondary to determine future action
if (initialSecondaryClock === null) {
if (syncMode === SYNC_MODES.MergePrimaryAndSecondary) {
initialSecondaryClock = 0
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
} else if (initialSecondaryClock === null) {
initialSecondaryClock = secondaryClockValue
}
finalSecondaryClock = secondaryClockValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const getNewOrExistingSyncReq = ({
primaryEndpoint,
secondaryEndpoint,
syncType,
syncMode,
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
immediate = false
}) => {
// If duplicate sync already exists, do not add and instead return existing sync job info
Expand Down Expand Up @@ -55,6 +56,7 @@ const getNewOrExistingSyncReq = ({
: JOB_NAMES.ISSUE_RECURRING_SYNC_REQUEST
const jobData = {
syncType,
syncMode,
syncRequestParameters
}
const syncReqToEnqueue = {
Expand Down
Loading