Skip to content

Commit

Permalink
[CON-213] - Paginate delist adding and small refactor (#3287)
Browse files Browse the repository at this point in the history
* logging memory used

* batch 500 + logs hm..

* quick exit for easy testing

* remove unused fn

* dont kill app after blacklist init

* remove test fns

* update duration log

* move init flag

* no good bye msg lol

* add async retry

* remove early exit

* extra new line

* fix log

* remove deleting non-existent keys

* update logs

* removing timing responsibiltiy from bl

* add completion log

* update var name + set enabled to true

* Revert "update var name + set enabled to true"

This reverts commit f30efb8.

* remove skips

* remove async retry

* remove unused var
  • Loading branch information
vicky-g authored and raymondjacobson committed Jun 25, 2022
1 parent cdfd16d commit 4a342cd
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 128 deletions.
218 changes: 98 additions & 120 deletions creator-node/src/blacklistManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ const CID_WHITELIST = new Set(config.get('cidWhitelist').split(','))
const REDIS_SET_BLACKLIST_TRACKID_KEY = 'BM.SET.BLACKLIST.TRACKID'
const REDIS_SET_BLACKLIST_USERID_KEY = 'BM.SET.BLACKLIST.USERID'
const REDIS_SET_BLACKLIST_SEGMENTCID_KEY = 'BM.SET.BLACKLIST.SEGMENTCID'
const REDIS_MAP_BLACKLIST_SEGMENTCID_TO_TRACKID_KEY =
'BM.MAP.BLACKLIST.SEGMENTCID.TRACKID'
const REDIS_MAP_TRACKID_TO_SEGMENTCIDS_KEY = 'BM.MAP.TRACKID.SEGMENTCIDS'
const REDIS_SET_INVALID_TRACKIDS_KEY = 'BM.SET.INVALID.TRACKIDS'

Expand All @@ -18,6 +16,8 @@ const SEGMENTCID_TO_TRACKID_EXPIRATION_SECONDS =
const INVALID_TRACKID_EXPIRATION_SECONDS =
1 /* hour */ * 60 /* minutes */ * 60 /* seconds */

const PROCESS_TRACKS_BATCH_SIZE = 200

const types = models.ContentBlacklist.Types

class BlacklistManager {
Expand All @@ -27,14 +27,19 @@ class BlacklistManager {

static async init() {
try {
this.log('Initializing BlacklistManager...')

const { trackIdsToBlacklist, userIdsToBlacklist, segmentsToBlacklist } =
await this.getDataToBlacklist()
await this.fetchCIDsAndAddToRedis({
trackIdsToBlacklist,
userIdsToBlacklist,
segmentsToBlacklist
})

this.initialized = true

this.log('Initialized BlacklistManager')
} catch (e) {
throw new Error(`Could not init BlacklistManager: ${e.message}`)
}
Expand Down Expand Up @@ -95,19 +100,6 @@ class BlacklistManager {
// Dedupe trackIds
const allTrackIdsToBlacklistSet = new Set(allTrackIdsToBlacklist)

// Retrieves CIDs from deduped trackIds
const { segmentCIDs: segmentsFromTrackIds, trackIdToSegments } =
await this.getCIDsToBlacklist({
allTrackIds: [...allTrackIdsToBlacklistSet],
explicitTrackIds: new Set(trackIdsToBlacklist)
})
let segmentCIDsToBlacklist =
segmentsFromTrackIds.concat(segmentsToBlacklist)
const segmentCIDsToBlacklistSet = new Set(segmentCIDsToBlacklist)
// Filter out whitelisted CID's from the segments to remove
;[...CID_WHITELIST].forEach((cid) => segmentCIDsToBlacklistSet.delete(cid))

segmentCIDsToBlacklist = [...segmentCIDsToBlacklistSet]
try {
await this.addToRedis(
REDIS_SET_BLACKLIST_TRACKID_KEY,
Expand All @@ -116,19 +108,59 @@ class BlacklistManager {
await this.addToRedis(REDIS_SET_BLACKLIST_USERID_KEY, userIdsToBlacklist)
await this.addToRedis(
REDIS_SET_BLACKLIST_SEGMENTCID_KEY,
segmentCIDsToBlacklist
)

// Creates a mapping of CID to blacklisted trackIds in redis. This should not include the trackIds from users.
await this.addToRedis(
REDIS_MAP_BLACKLIST_SEGMENTCID_TO_TRACKID_KEY,
trackIdToSegments
segmentsToBlacklist
)
} catch (e) {
throw new Error(
`[fetchCIDsAndAddToRedis] - Failed to add to blacklist: ${e}`
`[fetchCIDsAndAddToRedis] - Failed to add track ids, user ids, or explicitly blacklisted segments to blacklist: ${e.message}`
)
}

await BlacklistManager.addAggregateCIDsToRedis([
...allTrackIdsToBlacklistSet
])
}

/**
* Helper method to batch adding CIDs from tracks and users to the blacklist
* @param {number[]} allTrackIdsToBlacklist aggregate list of track ids to blacklist from explicit track id blacklist and tracks from blacklisted users
*/
static async addAggregateCIDsToRedis(allTrackIdsToBlacklist) {
let i
for (
i = 0;
i < allTrackIdsToBlacklist.length;
i = i + PROCESS_TRACKS_BATCH_SIZE
) {
try {
const tracksSlice = allTrackIdsToBlacklist.slice(
i,
i + PROCESS_TRACKS_BATCH_SIZE
)

this.logDebug(
`[addAggregateCIDsToRedis] - tracks slice size: ${tracksSlice.length}`
)

const segmentsFromTrackIdsToBlacklist =
await BlacklistManager.getCIDsToBlacklist(tracksSlice)

this.logDebug(
`[addAggregateCIDsToRedis] - number of segments: ${segmentsFromTrackIdsToBlacklist.length}`
)

await BlacklistManager.addToRedis(
REDIS_SET_BLACKLIST_SEGMENTCID_KEY,
segmentsFromTrackIdsToBlacklist
)
} catch (e) {
throw new Error(
`[addAggregateCIDsToRedis] - Could not add tracks slice ${i} to ${
i + PROCESS_TRACKS_BATCH_SIZE
}: ${e.message}`
)
}
}
}

/**
Expand All @@ -150,11 +182,9 @@ class BlacklistManager {
const allTrackIdsToBlacklistSet = new Set(allTrackIdsToBlacklist)

// Retrieves CIDs from deduped trackIds
const { segmentCIDs: segmentsFromTrackIds, trackIdToSegments } =
await this.getCIDsToBlacklist({
allTrackIds: [...allTrackIdsToBlacklistSet],
explicitTrackIds: new Set(trackIdsToRemove)
})
const segmentsFromTrackIds = await this.getCIDsToBlacklist([
...allTrackIdsToBlacklistSet
])

let segmentCIDsToRemove = segmentsFromTrackIds.concat(segmentsToRemove)
const segmentCIDsToRemoveSet = new Set(segmentCIDsToRemove)
Expand All @@ -173,10 +203,6 @@ class BlacklistManager {
REDIS_SET_BLACKLIST_SEGMENTCID_KEY,
segmentCIDsToRemove
)
await this.removeFromRedis(
REDIS_MAP_BLACKLIST_SEGMENTCID_TO_TRACKID_KEY,
trackIdToSegments
)
} catch (e) {
throw new Error(`Failed to remove from blacklist: ${e}`)
}
Expand Down Expand Up @@ -213,22 +239,44 @@ class BlacklistManager {

/**
* Retrieves all the deduped CIDs from the params and builds a mapping to <trackId: segments> for explicit trackIds (i.e. trackIds from table, not tracks belonging to users).
* @param {Object} param
* @param {number[]} param.allTrackIds all the trackIds to find CIDs for
* @param {Set<number>} param.explicitTrackIds explicit trackIds from table. Used to create mapping of <trackId: segments> in redis
* @returns
* {
* segmentCIDs: <string[]> all CIDs that are BL'd
* trackIdToSegments: {Object<number, string[]>} trackId : CIDs
* }
* @param {number[]} allTrackIds all the trackIds to find CIDs for (explictly blacklisted tracks and tracks from blacklisted users)
* @returns {string[]} all CIDs that are blacklisted from input track ids
*/
static async getCIDsToBlacklist({ allTrackIds, explicitTrackIds }) {
const tracks = await this.getAllCIDsFromTrackIdsInDb(allTrackIds)
static async getCIDsToBlacklist(inputTrackIds) {
const tracks = await this.getAllCIDsFromTrackIdsInDb(inputTrackIds)

const segmentCIDs = new Set()
const trackIdToSegments = {}

return { segmentCIDs: [...segmentCIDs], trackIdToSegments }
// Retrieve CIDs from the track metadata and build mapping of <trackId: segments>
for (const track of tracks) {
if (!track.metadataJSON || !track.metadataJSON.track_segments) continue

for (const segment of track.metadataJSON.track_segments) {
if (!segment.multihash || CID_WHITELIST.has(segment.multihash)) continue

segmentCIDs.add(segment.multihash)
}
}

// also retrieves the CID's directly from the files table so we get copy320
if (inputTrackIds.length > 0) {
const files = await models.File.findAll({
where: {
trackBlockchainId: inputTrackIds
}
})
for (const file of files) {
if (
file.type === 'track' ||
file.type === 'copy320' ||
!CID_WHITELIST.has(file.multihash)
) {
segmentCIDs.add(file.multihash)
}
}
}

return [...segmentCIDs]
}

static async add({ values, type }) {
Expand Down Expand Up @@ -337,14 +385,16 @@ class BlacklistManager {
static async _addToRedisChunkHelper(redisKey, data) {
const redisAddMaxItemsSize = 100000
try {
logger.info(
this.logDebug(
`About to call _addToRedisChunkHelper for ${redisKey} with data of length ${data.length}`
)
for (let i = 0; i < data.length; i += redisAddMaxItemsSize) {
await redis.sadd(redisKey, data.slice(i, i + redisAddMaxItemsSize))
}
} catch (e) {
logger.error(`Unable to call _addToRedisChunkHelper for ${redisKey}`)
this.logError(
`Unable to call _addToRedisChunkHelper for ${redisKey}: ${e.message}`
)
}
}

Expand Down Expand Up @@ -379,37 +429,15 @@ class BlacklistManager {
}
break
}
case REDIS_MAP_BLACKLIST_SEGMENTCID_TO_TRACKID_KEY: {
// Add "MAP.BLACKLIST.SEGMENTCID.TRACKID:::<cid>" to set of trackIds to redis
const errors = []
for (let [trackId, cids] of Object.entries(data)) {
trackId = parseInt(trackId)
for (const cid of cids) {
const redisCIDKey = this.getRedisBlacklistSegmentToTrackIdKey(cid)
try {
await this._addToRedisChunkHelper(redisCIDKey, [trackId])
} catch (e) {
errors.push(
`Unable to add ${redisCIDKey}:${trackId}: ${e.toString()}`
)
}
}
}

if (errors.length > 0) {
throw new Error(errors.toString())
}
break
}
case REDIS_SET_INVALID_TRACKIDS_KEY:
case REDIS_SET_BLACKLIST_SEGMENTCID_KEY:
case REDIS_SET_BLACKLIST_TRACKID_KEY:
case REDIS_SET_BLACKLIST_USERID_KEY:
default: {
if (!data || data.length === 0) return
try {
const resp = await this._addToRedisChunkHelper(redisKey, data)
this.logDebug(`redis set add ${redisKey} response ${resp}`)
await this._addToRedisChunkHelper(redisKey, data)
this.logDebug(`redis set add ${redisKey} successful`)
} catch (e) {
throw new Error(`Unable to add ${redisKey}:${data}: ${e.toString()}`)
}
Expand All @@ -425,27 +453,6 @@ class BlacklistManager {
*/
static async removeFromRedis(redisKey, data) {
switch (redisKey) {
case REDIS_MAP_BLACKLIST_SEGMENTCID_TO_TRACKID_KEY: {
const errors = []
for (let [trackId, cids] of Object.entries(data)) {
trackId = parseInt(trackId)
for (const cid of cids) {
const redisCIDKey = this.getRedisBlacklistSegmentToTrackIdKey(cid)
try {
await redis.srem(redisCIDKey, trackId)
} catch (e) {
errors.push(
`Unable to remove ${redisCIDKey}:${trackId}: ${e.toString()}`
)
}
}
}

if (errors.length > 0) {
throw new Error(errors.toString())
}
break
}
case REDIS_SET_BLACKLIST_SEGMENTCID_KEY:
case REDIS_SET_BLACKLIST_TRACKID_KEY:
case REDIS_SET_BLACKLIST_USERID_KEY:
Expand Down Expand Up @@ -489,14 +496,6 @@ class BlacklistManager {

trackId = parseInt(trackId)

// If the CID belongs to an explicitly blacklisted track and the input trackId is found in this mapping,
// do not serve
const foundCIDFromExplicitBlacklistedTrack =
await this.CIDIsInExplicitlyBlacklistedTrack(trackId, cid)
if (foundCIDFromExplicitBlacklistedTrack) return false

// If reached here, explicitly blacklisted track CID mapping to input trackId is not found.

// Check to see if CID belongs to input trackId from redis.
let cidsOfInputTrackId = await this.getAllCIDsFromTrackIdInRedis(trackId)

Expand Down Expand Up @@ -573,10 +572,6 @@ class BlacklistManager {
return REDIS_SET_BLACKLIST_SEGMENTCID_KEY
}

static getRedisBlacklistSegmentToTrackIdKey(cid) {
return `${REDIS_MAP_BLACKLIST_SEGMENTCID_TO_TRACKID_KEY}:::${cid}`
}

static getRedisTrackIdToCIDsKey(trackId) {
return `${REDIS_MAP_TRACKID_TO_SEGMENTCIDS_KEY}:::${trackId}`
}
Expand All @@ -600,12 +595,6 @@ class BlacklistManager {
return redis.sismember(REDIS_SET_BLACKLIST_SEGMENTCID_KEY, cid)
}

// Check if the input CID is blacklisted from TRACK type
static async CIDIsInExplicitlyBlacklistedTrack(trackId, cid) {
const redisCIDKey = this.getRedisBlacklistSegmentToTrackIdKey(cid)
return redis.sismember(redisCIDKey, trackId)
}

// Check if the input CID belongs to the track with the input trackId in redis.
static async CIDIsInTrackRedis(trackId, cid) {
const redisKey = this.getRedisTrackIdToCIDsKey(trackId)
Expand Down Expand Up @@ -636,17 +625,6 @@ class BlacklistManager {
return redis.smembers(REDIS_SET_INVALID_TRACKIDS_KEY)
}

/**
* Retrieves all the blacklisted trackIds associated with the cid. This mapping is created by
* fetching the segments from explicitly blacklisted tracks.
* @param {string} cid the observed cid
* @returns {string[]} the trackIds associated with the cid
*/
static async getBlacklistedTrackIdsFromCID(cid) {
const redisCIDKey = this.getRedisBlacklistSegmentToTrackIdKey(cid)
return redis.smembers(redisCIDKey)
}

/**
* Retrieve all the relevant CIDs from the input trackId in redis.
* @param {number} trackId
Expand Down
8 changes: 4 additions & 4 deletions creator-node/test/contentBlacklist.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ describe('test ContentBlacklist', function () {
// TODO: add remove and test that the segments are unblacklisted
})

it.skip('should throw an error when adding a track id to the blacklist, and streaming /ipfs/:CID without the trackId query string', async () => {
it('should throw an error when adding a track id to the blacklist, and streaming /ipfs/:CID without the trackId query string', async () => {
// Create user and upload track
const data = await createUserAndUploadTrack()
const trackId = data.track.blockchainId
Expand Down Expand Up @@ -773,7 +773,7 @@ describe('test ContentBlacklist', function () {
)
})

it.skip('should throw an error when adding a track id to the blacklist, and streaming /ipfs/:CID?trackId=<trackIdThatDoesntContainCID>', async () => {
it('should throw an error when adding a track id to the blacklist, and streaming /ipfs/:CID?trackId=<trackIdThatDoesntContainCID>', async () => {
// Create user and upload track
const data = await createUserAndUploadTrack()
const trackId = data.track.blockchainId
Expand Down Expand Up @@ -985,7 +985,7 @@ describe('test ContentBlacklist', function () {
.expect(400)
})

it.skip('should add the relevant CIDs to redis when adding a type TRACK to redis', async () => {
it('should add the relevant CIDs to redis when adding a type TRACK to redis', async () => {
// Create user and upload track
const data = await createUserAndUploadTrack()
const trackId = data.track.blockchainId
Expand Down Expand Up @@ -1062,7 +1062,7 @@ describe('test ContentBlacklist', function () {
.send(associateRequest)

// Upload a track
let trackUploadResponse = await uploadTrack(
const trackUploadResponse = await uploadTrack(
testAudioFilePath,
cnodeUserUUID,
mockServiceRegistry.blacklistManager
Expand Down
4 changes: 0 additions & 4 deletions creator-node/test/lib/blacklistManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ async function restartBlacklistManager(redis) {
keyPattern: BlacklistManager.getRedisTrackIdToCIDsKey('*')
})

deleteKeyPatternInRedis({
keyPattern: BlacklistManager.getRedisBlacklistSegmentToTrackIdKey('*')
})

BlacklistManager.initialized = false
}

Expand Down

0 comments on commit 4a342cd

Please sign in to comment.