Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support delta compression #282

Merged
merged 3 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@
"events": "^3.3.0",
"protobufjs": "^7.2.5"
}
}
}
12 changes: 12 additions & 0 deletions src/client.proto.json
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@
"keyType": "string",
"type": "string",
"id": 7
},
"delta": {
"type": "bool",
"id": 8
}
},
"reserved": [
Expand Down Expand Up @@ -566,6 +570,10 @@
"join_leave": {
"type": "bool",
"id": 11
},
"delta": {
"type": "string",
"id": 12
}
},
"reserved": [
Expand Down Expand Up @@ -621,6 +629,10 @@
"was_recovering": {
"type": "bool",
"id": 12
},
"delta": {
"type": "bool",
"id": 13
}
},
"reserved": [
Expand Down
192 changes: 192 additions & 0 deletions src/fossil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
Copyright 2014-2024 Dmitry Chestnykh (JavaScript port)
Copyright 2007 D. Richard Hipp (original C version)

Fossil SCM delta compression algorithm, this is only the applyDelta part extracted
from https://github.com/dchest/fossil-delta-js. The code was slightly modified
to strip unnecessary parts. The copyright on top of this file is from the original
repo on Github licensed under Simplified BSD License.
*/

// We accept plain arrays of bytes or Uint8Array.
type ByteArray = number[] | Uint8Array;


const zValue = [
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, -1, -1,
-1, -1, -1, -1, -1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, -1, -1, -1, -1, 36, -1, 37,
38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
57, 58, 59, 60, 61, 62, -1, -1, -1, 63, -1,
];

// Reader reads bytes, chars, ints from array.
class Reader {
public a: ByteArray;
public pos: number;

constructor(array: ByteArray) {
this.a = array; // source array
this.pos = 0; // current position in array
}

haveBytes() {
return this.pos < this.a.length;
}

getByte() {
const b = this.a[this.pos];
this.pos++;
if (this.pos > this.a.length) throw new RangeError("out of bounds");
return b;
}

getChar() {
return String.fromCharCode(this.getByte());
}

// Read base64-encoded unsigned integer.
getInt() {
let v = 0;
let c: number;
while (this.haveBytes() && (c = zValue[0x7f & this.getByte()]) >= 0) {
v = (v << 6) + c;
}
this.pos--;
return v >>> 0;
}
}

// Write writes an array.
class Writer {
private a: number[] = [];

toByteArray<T extends ByteArray>(sourceType: T): T {
if (Array.isArray(sourceType)) {
return this.a as T;
}
return new Uint8Array(this.a) as T;
}

// Copy from array at start to end.
putArray(a: ByteArray, start: number, end: number) {
this.a.push(...a.slice(start, end));
}
}

// Return a 32-bit checksum of the array.
function checksum(arr: ByteArray): number {
let sum0 = 0,
sum1 = 0,
sum2 = 0,
sum3 = 0,
z = 0,
N = arr.length;
//TODO measure if this unrolling is helpful.
while (N >= 16) {
sum0 = (sum0 + arr[z + 0]) | 0;
sum1 = (sum1 + arr[z + 1]) | 0;
sum2 = (sum2 + arr[z + 2]) | 0;
sum3 = (sum3 + arr[z + 3]) | 0;

sum0 = (sum0 + arr[z + 4]) | 0;
sum1 = (sum1 + arr[z + 5]) | 0;
sum2 = (sum2 + arr[z + 6]) | 0;
sum3 = (sum3 + arr[z + 7]) | 0;

sum0 = (sum0 + arr[z + 8]) | 0;
sum1 = (sum1 + arr[z + 9]) | 0;
sum2 = (sum2 + arr[z + 10]) | 0;
sum3 = (sum3 + arr[z + 11]) | 0;

sum0 = (sum0 + arr[z + 12]) | 0;
sum1 = (sum1 + arr[z + 13]) | 0;
sum2 = (sum2 + arr[z + 14]) | 0;
sum3 = (sum3 + arr[z + 15]) | 0;

z += 16;
N -= 16;
}
while (N >= 4) {
sum0 = (sum0 + arr[z + 0]) | 0;
sum1 = (sum1 + arr[z + 1]) | 0;
sum2 = (sum2 + arr[z + 2]) | 0;
sum3 = (sum3 + arr[z + 3]) | 0;
z += 4;
N -= 4;
}
sum3 = (((((sum3 + (sum2 << 8)) | 0) + (sum1 << 16)) | 0) + (sum0 << 24)) | 0;
switch (N) {
//@ts-ignore fallthrough is needed.
case 3:
sum3 = (sum3 + (arr[z + 2] << 8)) | 0; /* falls through */
//@ts-ignore fallthrough is needed.
case 2:
sum3 = (sum3 + (arr[z + 1] << 16)) | 0; /* falls through */
case 1:
sum3 = (sum3 + (arr[z + 0] << 24)) | 0; /* falls through */
}
return sum3 >>> 0;
}

/**
* Apply a delta byte array to a source byte array, returning the target byte array.
*/
export function applyDelta<T extends ByteArray>(
source: T,
delta: T
): T {
let total = 0;
const zDelta = new Reader(delta);
const lenSrc = source.length;
const lenDelta = delta.length;

const limit = zDelta.getInt();
if (zDelta.getChar() !== "\n")
throw new Error("size integer not terminated by '\\n'");
const zOut = new Writer();
while (zDelta.haveBytes()) {
const cnt = zDelta.getInt();
let ofst: number;

switch (zDelta.getChar()) {
case "@":
ofst = zDelta.getInt();
if (zDelta.haveBytes() && zDelta.getChar() !== ",")
throw new Error("copy command not terminated by ','");
total += cnt;
if (total > limit) throw new Error("copy exceeds output file size");
if (ofst + cnt > lenSrc)
throw new Error("copy extends past end of input");
zOut.putArray(source, ofst, ofst + cnt);
break;

case ":":
total += cnt;
if (total > limit)
throw new Error(
"insert command gives an output larger than predicted"
);
if (cnt > lenDelta)
throw new Error("insert count exceeds size of delta");
zOut.putArray(zDelta.a, zDelta.pos, zDelta.pos + cnt);
zDelta.pos += cnt;
break;

case ";":
{
const out = zOut.toByteArray(source);
if (cnt !== checksum(out))
throw new Error("bad checksum");
if (total !== limit)
throw new Error("generated size does not match predicted size");
return out;
}
default:
throw new Error("unknown delta operator");
}
}
throw new Error("unterminated delta");
}
17 changes: 17 additions & 0 deletions src/json.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { applyDelta } from './fossil';

/** @internal */
export class JsonCodec {
name() {
Expand All @@ -11,4 +13,19 @@ export class JsonCodec {
decodeReplies(data: string): any[] {
return data.trim().split('\n').map(r => JSON.parse(r));
}

applyDeltaIfNeeded(pub: any, prevValue: any) {
let newData: any, newPrevValue: any;
if (pub.delta) {
// JSON string delta.
const valueArray = applyDelta(prevValue, new TextEncoder().encode(pub.data));
newData = JSON.parse(new TextDecoder().decode(valueArray))
newPrevValue = valueArray;
} else {
// Full data as JSON string.
newData = JSON.parse(pub.data);
newPrevValue = new TextEncoder().encode(pub.data);
}
return { newData, newPrevValue }
}
}
19 changes: 18 additions & 1 deletion src/protobuf.codec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as protobuf from 'protobufjs/light'
import * as protoJSON from './client.proto.json';
import { applyDelta } from './fossil';

const proto = protobuf.Root.fromJSON(protoJSON);

const Command = proto.lookupType('protocol.Command');
Expand Down Expand Up @@ -52,4 +54,19 @@ export class ProtobufCodec {
ok: false
};
}
}

applyDeltaIfNeeded(pub: any, prevValue: any) {
let newData: any, newPrevValue: any;
if (pub.delta) {
// binary delta.
const valueArray = applyDelta(prevValue, pub.data);
newData = new Uint8Array(valueArray)
newPrevValue = valueArray;
} else {
// full binary data.
newData = pub.data;
newPrevValue = pub.data;
}
return { newData, newPrevValue }
}
}
28 changes: 27 additions & 1 deletion src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
private _epoch: string | null;
private _resubscribeAttempts: number;
private _promiseId: number;

private _delta: string;
private _delta_negotiated: boolean;
private _token: string;
private _data: any | null;
private _getData: null | ((ctx: SubscriptionDataContext) => Promise<any>);
Expand All @@ -35,6 +36,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
private _joinLeave: boolean;
// @ts-ignore – this is used by a client in centrifuge.ts.
private _inflight: boolean;
private _prevValue: any;

/** Subscription constructor should not be used directly, create subscriptions using Client method. */
constructor(centrifuge: Centrifuge, channel: string, options?: Partial<SubscriptionOptions>) {
Expand All @@ -60,6 +62,9 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._promiseId = 0;
this._inflight = false;
this._refreshTimeout = null;
this._delta = '';
this._delta_negotiated = false;
this._prevValue = null;
this._setOptions(options);
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (this._centrifuge._debugEnabled) {
Expand Down Expand Up @@ -222,6 +227,11 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
this._offset = result.offset || 0;
this._epoch = result.epoch || '';
}
if (result.delta) {
this._delta_negotiated = true;
} else {
this._delta_negotiated = false;
}

this._setState(SubscriptionState.Subscribed);
// @ts-ignore – we are hiding some methods from public API autocompletion.
Expand Down Expand Up @@ -377,6 +387,10 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}
}

if (this._delta) {
req.delta = this._delta;
}

const cmd = { subscribe: req };

this._inflight = true;
Expand Down Expand Up @@ -448,6 +462,12 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}

private _handlePublication(pub: any) {
if (this._delta && this._delta_negotiated) {
// @ts-ignore – we are hiding some methods from public API autocompletion.
const {newData, newPrevValue} = this._centrifuge._codec.applyDeltaIfNeeded(pub, this._prevValue)
pub.data = newData;
this._prevValue = newPrevValue;
}
// @ts-ignore – we are hiding some methods from public API autocompletion.
const ctx = this._centrifuge._getPublicationContext(this.channel, pub);
this.emit('publication', ctx);
Expand Down Expand Up @@ -568,6 +588,12 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
if (options.joinLeave === true) {
this._joinLeave = true;
}
if (options.delta) {
if (options.delta !== 'fossil') {
throw new Error('unsupported delta format');
}
this._delta = options.delta;
}
}

private _getOffset() {
Expand Down
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ export interface SubscriptionOptions {
recoverable: boolean;
/** ask server to send join/leave messages. */
joinLeave: boolean;
/** delta format to be used */
delta: 'fossil';
}

/** Stream postion describes position of publication inside a stream. */
Expand Down
Loading