From ebe8eb9bd0db452c6b306cd6e5c9f88b1c10e94f Mon Sep 17 00:00:00 2001 From: Tomasz Palys Date: Wed, 18 Sep 2024 14:14:07 +0200 Subject: [PATCH] [lib] Take operations from the queue when conditions become met Summary: Check the conditions, take from the queues, process and dispatch actions. https://linear.app/comm/issue/ENG-9189/introduce-additional-operation-queues Depends on D13366 Test Plan: Tested the whole stack at once: 1. Created an operation with a reaction that was received before an operation creating the message 2. Created an operation with an edit that was received before an operation creating the message 3. Created an operation with an entry edit that was received before an operation creating the entry 4. Created an operation with thread subscription change that was received before the user became a member In each of the cases verified that the result was correct and that the operation was removed from the queue. Reviewers: kamil, will Reviewed By: kamil Subscribers: ashoat Differential Revision: https://phab.comm.dev/D13375 --- .../dm-ops/dm-ops-queue-handler.react.js | 169 ++++++++++++++++-- 1 file changed, 155 insertions(+), 14 deletions(-) diff --git a/lib/shared/dm-ops/dm-ops-queue-handler.react.js b/lib/shared/dm-ops/dm-ops-queue-handler.react.js index af70546aeb..4d8b20d835 100644 --- a/lib/shared/dm-ops/dm-ops-queue-handler.react.js +++ b/lib/shared/dm-ops/dm-ops-queue-handler.react.js @@ -4,11 +4,20 @@ import * as React from 'react'; import { dmOperationSpecificationTypes } from './dm-op-utils.js'; import { useProcessDMOperation } from './process-dm-ops.js'; -import { threadInfoSelector } from '../../selectors/thread-selectors.js'; +import { messageInfoSelector } from '../../selectors/chat-selectors.js'; import { + entryInfoSelector, + threadInfoSelector, +} from '../../selectors/thread-selectors.js'; +import { + clearQueuedEntryDMOpsActionType, + clearQueuedMembershipDMOpsActionType, + clearQueuedMessageDMOpsActionType, clearQueuedThreadDMOpsActionType, pruneDMOpsQueueActionType, } from '../../types/dm-ops.js'; +import type { OperationsQueue } from '../../types/dm-ops.js'; +import { threadTypeIsThick } from '../../types/thread-types-enum.js'; import { useDispatch, useSelector } from '../../utils/redux-utils.js'; const PRUNING_FREQUENCY = 60 * 60 * 1000; @@ -40,26 +49,24 @@ function DMOpsQueueHandler(): React.Node { const threadInfos = useSelector(threadInfoSelector); const threadIDs = React.useMemo( - () => new Set(Object.keys(threadInfos)), + () => + new Set( + Object.entries(threadInfos) + .filter(([, threadInfo]) => threadTypeIsThick(threadInfo.type)) + .map(([id]) => id), + ), [threadInfos], ); const prevThreadIDsRef = React.useRef<$ReadOnlySet>(new Set()); - const queuedOperations = useSelector( + const queuedThreadOperations = useSelector( state => state.queuedDMOperations.threadQueue, ); const processDMOperation = useProcessDMOperation(); - - React.useEffect(() => { - const prevThreadIDs = prevThreadIDsRef.current; - prevThreadIDsRef.current = threadIDs; - - for (const threadID in queuedOperations) { - if (!threadIDs.has(threadID) || prevThreadIDs.has(threadID)) { - continue; - } - for (const dmOp of queuedOperations[threadID]) { + const processOperationsQueue = React.useCallback( + (queue: OperationsQueue) => { + for (const dmOp of queue) { void processDMOperation({ // This is `INBOUND` because we assume that when generating // `dmOperationSpecificationTypes.OUBOUND` it should be possible @@ -71,7 +78,19 @@ function DMOpsQueueHandler(): React.Node { metadata: null, }); } + }, + [processDMOperation], + ); + React.useEffect(() => { + const prevThreadIDs = prevThreadIDsRef.current; + prevThreadIDsRef.current = threadIDs; + + for (const threadID in queuedThreadOperations) { + if (!threadIDs.has(threadID) || prevThreadIDs.has(threadID)) { + continue; + } + processOperationsQueue(queuedThreadOperations[threadID]); dispatch({ type: clearQueuedThreadDMOpsActionType, payload: { @@ -79,7 +98,129 @@ function DMOpsQueueHandler(): React.Node { }, }); } - }, [dispatch, processDMOperation, queuedOperations, threadIDs]); + }, [dispatch, processOperationsQueue, queuedThreadOperations, threadIDs]); + + const messageInfos = useSelector(messageInfoSelector); + const messageIDs = React.useMemo( + () => + new Set( + Object.entries(messageInfos) + .filter( + ([, messageInfo]) => + messageInfo && + threadTypeIsThick(threadInfos[messageInfo.threadID]?.type), + ) + .map(([id]) => id), + ), + [messageInfos, threadInfos], + ); + const prevMessageIDsRef = React.useRef<$ReadOnlySet>(new Set()); + + const queuedMessageOperations = useSelector( + state => state.queuedDMOperations.messageQueue, + ); + + React.useEffect(() => { + const prevMessageIDs = prevMessageIDsRef.current; + prevMessageIDsRef.current = messageIDs; + + for (const messageID in queuedMessageOperations) { + if (!messageIDs.has(messageID) || prevMessageIDs.has(messageID)) { + continue; + } + processOperationsQueue(queuedMessageOperations[messageID]); + dispatch({ + type: clearQueuedMessageDMOpsActionType, + payload: { + messageID, + }, + }); + } + }, [dispatch, messageIDs, processOperationsQueue, queuedMessageOperations]); + + const entryInfos = useSelector(entryInfoSelector); + const entryIDs = React.useMemo( + () => + new Set( + Object.entries(entryInfos) + .filter(([, entryInfo]) => + threadTypeIsThick(threadInfos[entryInfo.threadID]?.type), + ) + .map(([id]) => id), + ), + [entryInfos, threadInfos], + ); + const prevEntryIDsRef = React.useRef<$ReadOnlySet>(new Set()); + + const queuedEntryOperations = useSelector( + state => state.queuedDMOperations.entryQueue, + ); + + React.useEffect(() => { + const prevEntryIDs = prevEntryIDsRef.current; + prevEntryIDsRef.current = entryIDs; + + for (const entryID in queuedEntryOperations) { + if (!entryIDs.has(entryID) || prevEntryIDs.has(entryID)) { + continue; + } + processOperationsQueue(queuedEntryOperations[entryID]); + dispatch({ + type: clearQueuedEntryDMOpsActionType, + payload: { + entryID, + }, + }); + } + }, [dispatch, entryIDs, processOperationsQueue, queuedEntryOperations]); + + const queuedMembershipOperations = useSelector( + state => state.queuedDMOperations.membershipQueue, + ); + + const runningMembershipOperations = React.useRef>>( + new Map(), + ); + React.useEffect(() => { + for (const threadID in queuedMembershipOperations) { + if (!threadInfos[threadID]) { + continue; + } + + const queuedMemberIDs = new Set( + Object.keys(queuedMembershipOperations[threadID]), + ); + if (!runningMembershipOperations.current.has(threadID)) { + runningMembershipOperations.current.set(threadID, new Set()); + } + for (const member of threadInfos[threadID].members) { + if ( + !queuedMemberIDs.has(member.id) || + runningMembershipOperations.current.get(threadID)?.has(member.id) + ) { + continue; + } + + runningMembershipOperations.current.get(threadID)?.add(member.id); + + processOperationsQueue(queuedMembershipOperations[threadID][member.id]); + + dispatch({ + type: clearQueuedMembershipDMOpsActionType, + payload: { + threadID, + userID: member.id, + }, + }); + runningMembershipOperations.current.get(threadID)?.delete(member.id); + } + } + }, [ + dispatch, + processOperationsQueue, + queuedMembershipOperations, + threadInfos, + ]); return null; }