Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementation of useAbortSignal option for grpc-web #777

Merged
merged 7 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions integration/grpc-web-no-streaming-observable/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ export const Empty = {
* but with the streaming method removed.
*/
export interface DashState {
UserSettings(request: DeepPartial<Empty>, metadata?: grpc.Metadata): Observable<DashUserSettingsState>;
UserSettings(
request: DeepPartial<Empty>,
metadata?: grpc.Metadata,
abortSignal?: AbortSignal,
): Observable<DashUserSettingsState>;
}

export class DashStateClientImpl implements DashState {
Expand All @@ -332,8 +336,12 @@ export class DashStateClientImpl implements DashState {
this.UserSettings = this.UserSettings.bind(this);
}

UserSettings(request: DeepPartial<Empty>, metadata?: grpc.Metadata): Observable<DashUserSettingsState> {
return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), metadata);
UserSettings(
request: DeepPartial<Empty>,
metadata?: grpc.Metadata,
abortSignal?: AbortSignal,
): Observable<DashUserSettingsState> {
return this.rpc.unary(DashStateUserSettingsDesc, Empty.fromPartial(request), abortSignal, metadata);
}
}

Expand Down Expand Up @@ -374,6 +382,7 @@ interface Rpc {
methodDesc: T,
request: any,
metadata: grpc.Metadata | undefined,
abortSignal?: AbortSignal,
): Observable<any>;
}

Expand Down Expand Up @@ -405,13 +414,14 @@ export class GrpcWebImpl {
methodDesc: T,
_request: any,
metadata: grpc.Metadata | undefined,
abortSignal?: AbortSignal,
): Observable<any> {
const request = { ..._request, ...methodDesc.requestType };
const maybeCombinedMetadata = metadata && this.options.metadata
? new BrowserHeaders({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Observable((observer) => {
grpc.unary(methodDesc, {
const client = grpc.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -427,6 +437,14 @@ export class GrpcWebImpl {
}
},
});

const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) {
abortSignal.addEventListener("abort", abortHandler);
}
}).pipe(take(1));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
outputClientImpl=grpc-web,returnObservable=true
outputClientImpl=grpc-web,returnObservable=true,useAbortSignal=true
6 changes: 5 additions & 1 deletion integration/grpc-web/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,11 @@ export class GrpcWebImpl {
}
},
});
observer.add(() => client.close());
observer.add(() => {
if (!observer.closed) {
return client.close();
}
});
});
upStream();
}).pipe(share());
Expand Down
70 changes: 64 additions & 6 deletions src/generate-grpc-web.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export function generateGrpcClientImpl(
/** Creates the RPC methods that client code actually calls. */
function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, methodDesc: MethodDescriptorProto) {
assertInstanceOf(methodDesc, FormattedMethodDescriptor);
const { options } = ctx;
const { useAbortSignal } = options;
const requestMessage = rawRequestType(ctx, methodDesc);
const inputType = requestType(ctx, methodDesc, true);
const returns = responsePromiseOrObservable(ctx, methodDesc);
Expand All @@ -57,6 +59,7 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me
${methodDesc.formattedName}(
request: ${inputType},
metadata?: grpc.Metadata,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${returns} {
throw new Error('ts-proto does not yet support client streaming!');
}
Expand All @@ -68,10 +71,12 @@ function generateRpcMethod(ctx: Context, serviceDesc: ServiceDescriptorProto, me
${methodDesc.formattedName}(
request: ${inputType},
metadata?: grpc.Metadata,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${returns} {
return this.rpc.${method}(
${methodDescName(serviceDesc, methodDesc)},
${requestMessage}.fromPartial(request),
${useAbortSignal ? "abortSignal," : ""}
metadata,
);
}
Expand Down Expand Up @@ -165,6 +170,8 @@ export function addGrpcWebMisc(ctx: Context, hasStreamingMethods: boolean): Code
/** Makes an `Rpc` interface to decouple from the low-level grpc-web `grpc.invoke and grpc.unary`/etc. methods. */
function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStreamingMethods: boolean): Code {
const chunks: Code[] = [];
const { options } = ctx;
const { useAbortSignal } = options;

chunks.push(code`interface Rpc {`);

Expand All @@ -174,6 +181,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre
methodDesc: T,
request: any,
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${wrapper}<any>;
`);

Expand All @@ -183,6 +191,7 @@ function generateGrpcWebRpcType(ctx: Context, returnObservable: boolean, hasStre
methodDesc: T,
request: any,
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${observableType(ctx)}<any>;
`);
}
Expand Down Expand Up @@ -230,19 +239,33 @@ function generateGrpcWebImpl(ctx: Context, returnObservable: boolean, hasStreami
}

function createPromiseUnaryMethod(ctx: Context): Code {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
client.close();
reject(new Error("Aborted"));
}

if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";

return code`
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
metadata: grpc.Metadata | undefined
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): Promise<any> {
const request = { ..._request, ...methodDesc.requestType };
const maybeCombinedMetadata =
metadata && this.options.metadata
? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Promise((resolve, reject) => {
${grpc}.unary(methodDesc, {
${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -257,25 +280,39 @@ function createPromiseUnaryMethod(ctx: Context): Code {
}
},
});

${maybeAbortSignal}
});
}
`;
}

function createObservableUnaryMethod(ctx: Context): Code {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";
return code`
unary<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
metadata: grpc.Metadata | undefined
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${observableType(ctx)}<any> {
const request = { ..._request, ...methodDesc.requestType };
const maybeCombinedMetadata =
metadata && this.options.metadata
? new ${BrowserHeaders}({ ...this.options?.metadata.headersMap, ...metadata?.headersMap })
: metadata || this.options.metadata;
return new Observable(observer => {
${grpc}.unary(methodDesc, {
${useAbortSignal ? `const client =` : ""} ${grpc}.unary(methodDesc, {
request,
host: this.host,
metadata: maybeCombinedMetadata,
Expand All @@ -291,17 +328,34 @@ function createObservableUnaryMethod(ctx: Context): Code {
}
},
});


${maybeAbortSignal}

}).pipe(${take}(1));
}
`;
}

function createInvokeMethod(ctx: Context) {
const { options } = ctx;
const { useAbortSignal } = options;

const maybeAbortSignal = useAbortSignal
? `
const abortHandler = () => {
observer.error("Aborted");
client.close();
};
if (abortSignal) abortSignal.addEventListener("abort", abortHandler);`
: "";

return code`
invoke<T extends UnaryMethodDefinitionish>(
methodDesc: T,
_request: any,
metadata: grpc.Metadata | undefined
metadata: grpc.Metadata | undefined,
${useAbortSignal ? "abortSignal?: AbortSignal," : ""}
): ${observableType(ctx)}<any> {
const upStreamCodes = this.options.upStreamRetryCodes || [];
const DEFAULT_TIMEOUT_TIME: number = 3_000;
Expand Down Expand Up @@ -332,7 +386,11 @@ function createInvokeMethod(ctx: Context) {
}
},
});
observer.add(() => client.close());
observer.add(() => {
if (!observer.closed) return client.close()
});

${maybeAbortSignal}
});
upStream();
}).pipe(${share}());
Expand Down
6 changes: 3 additions & 3 deletions src/generate-services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ export function generateService(
const partialInput = options.outputClientImpl === "grpc-web";
const inputType = requestType(ctx, methodDesc, partialInput);
params.push(code`request: ${inputType}`);
if (options.useAbortSignal) {
params.push(code`abortSignal?: AbortSignal`);
}

// Use metadata as last argument for interface only configuration
if (options.outputClientImpl === "grpc-web") {
Expand All @@ -78,6 +75,9 @@ export function generateService(
const Metadata = imp(options.metadataType);
params.push(code`metadata?: ${Metadata}`);
}
if (options.useAbortSignal) {
params.push(code`abortSignal?: AbortSignal`);
}
if (options.addNestjsRestParameter) {
params.push(code`...rest: any`);
}
Expand Down