Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc): cancel cannot work #3382

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions packages/connection/__test__/common/frame-decoder.test.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
/* eslint-disable no-console */
import { BinaryReader } from '@furyjs/fury/dist/lib/reader';
import { BinaryWriter } from '@furyjs/fury/dist/lib/writer';

import { LengthFieldBasedFrameDecoder, indicator } from '../../src/common/connection/drivers/frame-decoder';

const writer = BinaryWriter({});

export function prependLengthField(content: Uint8Array) {
writer.reset();
writer.buffer(indicator);
writer.uint32(content.byteLength);
writer.buffer(content);
return writer.dump();
}

function round(x: number, count: number) {
return Math.round(x * 10 ** count) / 10 ** count;
}
Expand Down Expand Up @@ -46,7 +35,7 @@ console.timeEnd('createPayload');
// 1m
const pressure = 1024 * 1024;

const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [prependLengthField(v), v] as const);
const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [LengthFieldBasedFrameDecoder.construct(v), v] as const);

const size = purePackets.reduce((acc, v) => acc + v[0].byteLength, 0);

Expand All @@ -59,7 +48,7 @@ purePackets.forEach((v) => {
});

const mixedPackets = [p1m, p5m].map((v) => {
const sumiPacket = prependLengthField(v);
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v);
const newPacket = createPayload(1024 + sumiPacket.byteLength);
newPacket.set(sumiPacket, 1024);
return [newPacket, v] as const;
Expand All @@ -70,7 +59,7 @@ const packets = [...purePackets, ...mixedPackets];
describe('frame decoder', () => {
it('can create frame', () => {
const content = new Uint8Array([1, 2, 3]);
const packet = prependLengthField(content);
const packet = LengthFieldBasedFrameDecoder.construct(content);
const reader = BinaryReader({});

reader.reset(packet);
Expand Down Expand Up @@ -127,7 +116,7 @@ describe('frame decoder', () => {

it('can decode a stream it has no valid length info', (done) => {
const v = createPayload(1024);
const sumiPacket = prependLengthField(v);
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v);

const decoder = new LengthFieldBasedFrameDecoder();
decoder.onData((data) => {
Expand Down
22 changes: 21 additions & 1 deletion packages/connection/__test__/common/rpc/sumi-rpc.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Type } from '@furyjs/fury';

import { METHOD_NOT_REGISTERED } from '@opensumi/ide-connection/lib/common/constants';
import { Deferred } from '@opensumi/ide-core-common';
import { CancellationToken, CancellationTokenSource, Deferred, sleep } from '@opensumi/ide-core-common';
import { IReadableStream, listenReadable } from '@opensumi/ide-utils/lib/stream';

import { test } from './common-tester';
Expand Down Expand Up @@ -96,4 +96,24 @@ describe('sumi rpc only', () => {
await deferred.promise;
expect(msg).toBe(message);
});

it('can cancel a call', (done) => {
const method = 'cancelTest';
pair.connection2.onRequest(method, async (t: CancellationToken) => {
while (!t.isCancellationRequested) {
// regular cancellation requires async for it to work
await sleep(0);
}

done();
});

pair.connection1.listen();
pair.connection2.listen();

const source = new CancellationTokenSource();

pair.connection1.sendRequest(method, source.token);
source.cancel();
});
});
1 change: 0 additions & 1 deletion packages/connection/src/common/buffers/index.ts

This file was deleted.

65 changes: 0 additions & 65 deletions packages/connection/src/common/buffers/pool.ts

This file was deleted.

12 changes: 12 additions & 0 deletions packages/connection/src/common/connection/drivers/frame-decoder.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { BinaryWriter } from '@furyjs/fury/dist/lib/writer';

import { Emitter, readUInt32LE } from '@opensumi/ide-core-common';

import { Buffers } from '../../buffers/buffers';
Expand Down Expand Up @@ -155,4 +157,14 @@ export class LengthFieldBasedFrameDecoder {
this.dataEmitter.dispose();
this.buffers.dispose();
}

static writer = BinaryWriter({});

static construct(content: Uint8Array) {
LengthFieldBasedFrameDecoder.writer.reset();
LengthFieldBasedFrameDecoder.writer.buffer(indicator);
LengthFieldBasedFrameDecoder.writer.uint32(content.byteLength);
LengthFieldBasedFrameDecoder.writer.buffer(content);
return LengthFieldBasedFrameDecoder.writer.dump();
}
}
15 changes: 5 additions & 10 deletions packages/connection/src/common/connection/drivers/stream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { IDisposable, writeUInt32LE } from '@opensumi/ide-core-common';

import { pool } from '../../buffers';
import { IDisposable } from '@opensumi/ide-core-common';

import { BaseConnection } from './base';
import { LengthFieldBasedFrameDecoder, indicator } from './frame-decoder';
import { LengthFieldBasedFrameDecoder } from './frame-decoder';

import type { Readable, Writable } from 'stream';

Expand All @@ -23,13 +21,10 @@ export class StreamConnection extends BaseConnection<Uint8Array> {
}

send(data: Uint8Array): void {
this.writable.write(indicator);
const buf = pool.alloc(4);
writeUInt32LE(buf, data.byteLength, 0);
this.writable.write(buf, () => {
pool.free(buf);
const result = LengthFieldBasedFrameDecoder.construct(data);
this.writable.write(result, () => {
// TODO: logger error
});
this.writable.write(data);
}

onMessage(cb: (data: Uint8Array) => void): IDisposable {
Expand Down
7 changes: 7 additions & 0 deletions packages/connection/src/common/fury-extends/any.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export class AnySerializer {
write(data: any) {
const { writer } = this;
const type = typeof data;
writer.reserve(1);

switch (type) {
case 'undefined':
writer.uint8(ProtocolType.Undefined);
Expand All @@ -32,10 +34,12 @@ export class AnySerializer {
writer.stringOfVarUInt32(data);
break;
case 'boolean':
writer.reserve(1);
writer.uint8(ProtocolType.Boolean);
writer.uint8(data ? 1 : 0);
break;
case 'number':
writer.reserve(8);
if ((data | 0) === data) {
writer.uint8(ProtocolType.Int32);
writer.int32(data);
Expand All @@ -45,19 +49,22 @@ export class AnySerializer {
}
break;
case 'bigint':
writer.reserve(8);
writer.uint8(ProtocolType.BigInt);
writer.int64(data);
break;
case 'object':
if (data === null) {
writer.uint8(ProtocolType.Null);
} else if (Array.isArray(data)) {
writer.reserve(4);
writer.uint8(ProtocolType.Array);
writer.varUInt32(data.length);
for (const element of data) {
this.write(element);
}
} else if (isUint8Array(data)) {
writer.reserve(4);
writer.uint8(ProtocolType.Buffer);
writer.varUInt32(data.byteLength);
writer.buffer(data);
Expand Down
5 changes: 3 additions & 2 deletions packages/connection/src/common/rpc/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,8 @@ export class SumiConnection implements IDisposable {
// skip version, currently only have version 1
reader.skip(1);

const opType = reader.uint8();
const opType = reader.uint8() as OperationType;
const requestId = reader.uint32();
const method = reader.stringOfVarUInt32();

if (this._timeoutHandles.has(requestId)) {
// Ignore some jest test scenarios where clearTimeout is not defined.
Expand All @@ -190,6 +189,7 @@ export class SumiConnection implements IDisposable {

switch (opType) {
case OperationType.Response: {
const method = reader.stringOfVarUInt32();
const status = reader.uint16();

const runCallback = (headers: IResponseHeaders, error?: any, result?: any) => {
Expand Down Expand Up @@ -246,6 +246,7 @@ export class SumiConnection implements IDisposable {
case OperationType.Notification:
// fall through
case OperationType.Request: {
const method = reader.stringOfVarUInt32();
const headers = this.io.requestHeadersSerializer.read() as IRequestHeaders;
const args = this.io.getProcessor(method).readRequest();

Expand Down
Loading