Skip to content

Commit

Permalink
Send content from storage peers in one message instead of streaming it
Browse files Browse the repository at this point in the history
  • Loading branch information
aeplay committed Dec 14, 2024
1 parent 507a00f commit b4a28c8
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changeset/gorgeous-fans-share.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"cojson-storage": patch
---

Stop the use of incremental streaming of large CoValue content from local storage peers that triggers sync protocol bug leading to redundant syncing from server peers.
7 changes: 0 additions & 7 deletions packages/cojson-storage/src/syncManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ export class SyncManager {

const firstNewTxIdx = peerKnownState.sessions[sessionRow.sessionID] || 0;

const signaturesAndIdxs = await this.dbClient.getSignatures(
sessionRow.rowID,
firstNewTxIdx,
);

const newTxsInSession = await this.dbClient.getNewTransactionInSession(
sessionRow.rowID,
firstNewTxIdx,
Expand All @@ -74,8 +69,6 @@ export class SyncManager {
newTxsInSession,
newContentMessages,
sessionRow,
signaturesAndIdxs,
peerKnownState,
firstNewTxIdx,
});
}
Expand Down
23 changes: 2 additions & 21 deletions packages/cojson-storage/src/syncUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,21 @@ export function collectNewTxs({
newTxsInSession,
newContentMessages,
sessionRow,
signaturesAndIdxs,
peerKnownState,
firstNewTxIdx,
}: {
newTxsInSession: TransactionRow[];
newContentMessages: CojsonInternalTypes.NewContentMessage[];
sessionRow: StoredSessionRow;
signaturesAndIdxs: SignatureAfterRow[];
peerKnownState: CojsonInternalTypes.CoValueKnownState;
firstNewTxIdx: number;
}) {
let idx = firstNewTxIdx;

for (const tx of newTxsInSession) {
let sessionEntry =
newContentMessages[newContentMessages.length - 1]!.new[
sessionRow.sessionID
];
if (!sessionEntry) {
sessionEntry = {
after: idx,
after: firstNewTxIdx,
lastSignature: "WILL_BE_REPLACED" as CojsonInternalTypes.Signature,
newTransactions: [],
};
Expand All @@ -46,20 +40,7 @@ export function collectNewTxs({
}

sessionEntry.newTransactions.push(tx.tx);

if (signaturesAndIdxs[0] && idx === signaturesAndIdxs[0].idx) {
sessionEntry.lastSignature = signaturesAndIdxs[0].signature;
signaturesAndIdxs.shift();
newContentMessages.push({
action: "content",
id: peerKnownState.id,
new: {},
priority: cojsonInternals.getPriorityFromHeader(undefined),
});
} else if (idx === firstNewTxIdx + newTxsInSession.length - 1) {
sessionEntry.lastSignature = sessionRow.lastSignature;
}
idx += 1;
sessionEntry.lastSignature = sessionRow.lastSignature;
}
}

Expand Down

0 comments on commit b4a28c8

Please sign in to comment.