Skip to content

Commit

Permalink
update queue sharding so that it can be rebalanced using a seed (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy authored Sep 25, 2024
1 parent 158d3da commit 042f051
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
25 changes: 12 additions & 13 deletions app/reports/page-content.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ToolsOzoneModerationDefs,
ToolsOzoneModerationEmitEvent,
ToolsOzoneModerationQueryStatuses,
ComAtprotoAdminDefs,
} from '@atproto/api'
import { SectionHeader } from '../../components/SectionHeader'
import { ModActionIcon } from '@/common/ModActionIcon'
Expand All @@ -23,16 +24,14 @@ import { SubjectTable } from 'components/subject/table'
import { useTitle } from 'react-use'
import { LanguagePicker } from '@/common/LanguagePicker'
import { QueueSelector, QUEUE_NAMES } from '@/reports/QueueSelector'
import { unique } from '@/lib/util'
import { simpleHash, unique } from '@/lib/util'
import { useEmitEvent } from '@/mod-event/helpers/emitEvent'
import { useFluentReportSearchParams } from '@/reports/useFluentReportSearch'
import { useLabelerAgent } from '@/shell/ConfigurationContext'
import { WorkspacePanel } from 'components/workspace/Panel'
import { useWorkspaceOpener } from '@/common/useWorkspaceOpener'
import {
EmbedTypePicker,
EmbedTypePickerForModerationQueue,
} from '@/common/EmbedTypePicker'
import { EmbedTypePickerForModerationQueue } from '@/common/EmbedTypePicker'
import { QUEUE_SEED } from '@/lib/constants'

const TABS = [
{
Expand Down Expand Up @@ -389,17 +388,13 @@ const getQueueItems = async (
...queryParams,
})

const queueDivider = QUEUE_NAMES.length
const queueIndex = QUEUE_NAMES.indexOf(queueName ?? '')
const statusesInQueue = queueName
? data.subjectStatuses.filter((status) => {
const subjectDid =
status.subject.$type === 'com.atproto.admin.defs#repoRef'
? status.subject.did
: new AtUri(`${status.subject.uri}`).host
const queueDeciderCharCode =
`${subjectDid}`.split(':').pop()?.charCodeAt(0) || 0
return queueDeciderCharCode % queueDivider === queueIndex
const subjectDid = ComAtprotoAdminDefs.isRepoRef(status.subject)
? status.subject.did
: new AtUri(`${status.subject.uri}`).host
return getQueueIndex(subjectDid) === queueIndex
})
: data.subjectStatuses

Expand All @@ -420,3 +415,7 @@ const getQueueItems = async (

return { cursor: data.cursor, subjectStatuses: statusesInQueue }
}

function getQueueIndex(did: string) {
return simpleHash(did + QUEUE_SEED) % QUEUE_NAMES.length
}
2 changes: 2 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export const PLC_DIRECTORY_URL =

export const QUEUE_CONFIG = process.env.NEXT_PUBLIC_QUEUE_CONFIG || '{}'

export const QUEUE_SEED = process.env.NEXT_PUBLIC_QUEUE_SEED || ''

export const SOCIAL_APP_URL =
process.env.NEXT_PUBLIC_SOCIAL_APP_URL ||
(process.env.NODE_ENV === 'development'
Expand Down
10 changes: 10 additions & 0 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,13 @@ export function chunkArray<T>(arr: T[], chunkSize: number) {
}
return chunks
}

// @NOTE hash function is insecure, though suitable for basic purposes such as bucketing.
export function simpleHash(str: string) {
let hash = 0
for (let i = 0; i < str.length; ++i) {
const chr = str.charCodeAt(i)
hash = ((hash << 5) - hash + chr) | 0
}
return hash
}

0 comments on commit 042f051

Please sign in to comment.