Skip to content

Commit

Permalink
change(core): removed internal TypedSubscription and associated types…
Browse files Browse the repository at this point in the history
…, as these were used by JetStream legacy APIs. Simplified internals of QueuedIteratorImpl since lots of the extra cruft was due to TypedSubscription support.

Signed-off-by: Alberto Ricart <alberto@synadia.com>
  • Loading branch information
aricart committed Oct 18, 2024
1 parent 552fb20 commit 4778f44
Show file tree
Hide file tree
Showing 10 changed files with 16 additions and 951 deletions.
10 changes: 0 additions & 10 deletions core/src/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
export { NatsConnectionImpl } from "./nats.ts";
export { Nuid, nuid } from "./nuid.ts";

export type { TypedSubscriptionOptions } from "./types.ts";

export { MsgImpl } from "./msg.ts";
export { getResolveFn, setTransportFactory } from "./transport.ts";
export type { Transport, TransportFactory } from "./transport.ts";
Expand Down Expand Up @@ -69,12 +67,6 @@ export {
export type { Codec } from "./codec.ts";
export { JSONCodec, StringCodec } from "./codec.ts";
export * from "./nkeys.ts";
export type {
DispatchedFn,
IngestionFilterFn,
IngestionFilterFnResult,
ProtocolFilterFn,
} from "./queued_iterator.ts";
export { QueuedIteratorImpl } from "./queued_iterator.ts";
export type { MsgArg, ParserEvent } from "./parser.ts";
export { Kind, Parser, State } from "./parser.ts";
Expand All @@ -83,8 +75,6 @@ export { Bench, Metric } from "./bench.ts";
export type { BenchOpts } from "./bench.ts";
export { TD, TE } from "./encoders.ts";
export { ipV4, isIP, parseIP } from "./ipparser.ts";
export { checkFn, TypedSubscription } from "./typedsub.ts";
export type { MsgAdapter, TypedCallback } from "./typedsub.ts";

export type { SemVer } from "./semver.ts";
export { compare, Feature, Features, parseSemVer } from "./semver.ts";
Expand Down
7 changes: 0 additions & 7 deletions core/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@ export type {
ConnectionOptions,
Deferred,
Delay,
DispatchedFn,
IngestionFilterFn,
IngestionFilterFnResult,
JwtAuth,
Msg,
MsgAdapter,
MsgCallback,
MsgHdrs,
Nanos,
Expand All @@ -72,7 +68,6 @@ export type {
NoAuth,
Payload,
Perf,
ProtocolFilterFn,
Publisher,
PublishOptions,
QueuedIterator,
Expand All @@ -91,7 +86,5 @@ export type {
Timeout,
TlsOptions,
TokenAuth,
TypedCallback,
TypedSubscriptionOptions,
UserPass,
} from "./internal_mod.ts";
41 changes: 0 additions & 41 deletions core/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import type { Deferred, Timeout } from "./util.ts";
import { DataBuffer } from "./databuffer.ts";
import { Servers } from "./servers.ts";
import type { ServerImpl } from "./servers.ts";
import type {
DispatchedFn,
IngestionFilterFn,
IngestionFilterFnResult,
ProtocolFilterFn,
} from "./queued_iterator.ts";
import { QueuedIteratorImpl } from "./queued_iterator.ts";
import type { MsgHdrsImpl } from "./headers.ts";
import { MuxSubscription } from "./muxsubscription.ts";
Expand Down Expand Up @@ -161,41 +155,6 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
}
}

setPrePostHandlers(
opts: {
ingestionFilterFn?: IngestionFilterFn<Msg>;
protocolFilterFn?: ProtocolFilterFn<Msg>;
dispatchedFn?: DispatchedFn<Msg>;
},
) {
if (this.noIterator) {
const uc = this.callback;

const ingestion = opts.ingestionFilterFn
? opts.ingestionFilterFn
: (): IngestionFilterFnResult => {
return { ingest: true, protocol: false };
};
const filter = opts.protocolFilterFn ? opts.protocolFilterFn : () => {
return true;
};
const dispatched = opts.dispatchedFn ? opts.dispatchedFn : () => {};
this.callback = (err: NatsError | null, msg: Msg) => {
const { ingest } = ingestion(msg);
if (!ingest) {
return;
}
if (filter(msg)) {
uc(err, msg);
dispatched(msg);
}
};
} else {
this.protocolFilterFn = opts.protocolFilterFn;
this.dispatchedFn = opts.dispatchedFn;
}
}

callback(err: NatsError | null, msg: Msg) {
this.cancelTimeout();
err ? this.stop(err) : this.push(msg);
Expand Down
78 changes: 11 additions & 67 deletions core/src/queued_iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,10 @@ import type { QueuedIterator } from "./core.ts";
import { ErrorCode, NatsError } from "./core.ts";
import type { CallbackFn, Dispatcher } from "./core.ts";

export type IngestionFilterFnResult = { ingest: boolean; protocol: boolean };

/**
* IngestionFilterFn prevents a value from being ingested by the
* iterator. It is executed on `push`. If ingest is false the value
* shouldn't be pushed. If protcol is true, the value is a protcol
* value
*
* @param: data is the value
* @src: is the source of the data if set.
*/
export type IngestionFilterFn<T = unknown> = (
data: T | null,
src?: unknown,
) => IngestionFilterFnResult;
/**
* ProtocolFilterFn allows filtering of values that shouldn't be presented
* to the iterator. ProtocolFilterFn is executed when a value is about to be presented
*
* @param data: the value
* @returns boolean: true if the value should presented to the iterator
*/
export type ProtocolFilterFn<T = unknown> = (data: T | null) => boolean;
/**
* DispatcherFn allows for values to be processed after being presented
* to the iterator. Note that if the ProtocolFilter rejected the value
* it will _not_ be presented to the DispatchedFn. Any processing should
* instead have been handled by the ProtocolFilterFn.
* @param data: the value
*/
export type DispatchedFn<T = unknown> = (data: T | null) => void;

export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
inflight: number;
processed: number;
// FIXME: this is updated by the protocol
// this is updated by the protocol
received: number;
noIterator: boolean;
iterClosed: Deferred<void | Error>;
Expand All @@ -62,13 +30,11 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
yields: (T | CallbackFn)[];
filtered: number;
pendingFiltered: number;
ingestionFilterFn?: IngestionFilterFn<T>;
protocolFilterFn?: ProtocolFilterFn<T>;
dispatchedFn?: DispatchedFn<T>;
ctx?: unknown;
_data?: unknown; //data is for use by extenders in any way they like
err?: Error;
time: number;
profile: boolean;
yielding: boolean;
didBreak: boolean;

Expand All @@ -86,6 +52,7 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
this.time = 0;
this.yielding = false;
this.didBreak = false;
this.profile = false;
}

[Symbol.asyncIterator](): AsyncIterator<T> {
Expand All @@ -110,22 +77,8 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
}
return;
}
if (typeof v === "function") {
this.yields.push(v);
this.signal.resolve();
return;
}
const { ingest, protocol } = this.ingestionFilterFn
? this.ingestionFilterFn(v, this.ctx || this)
: { ingest: true, protocol: false };
if (ingest) {
if (protocol) {
this.filtered++;
this.pendingFiltered++;
}
this.yields.push(v);
this.signal.resolve();
}
this.yields.push(v);
this.signal.resolve();
}

async *iterate(): AsyncIterableIterator<T> {
Expand Down Expand Up @@ -163,21 +116,12 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
}
continue;
}
// only pass messages that pass the filter
const ok = this.protocolFilterFn
? this.protocolFilterFn(yields[i] as T)
: true;
if (ok) {
this.processed++;
const start = Date.now();
yield yields[i] as T;
this.time = Date.now() - start;
if (this.dispatchedFn && yields[i]) {
this.dispatchedFn(yields[i] as T);
}
} else {
this.pendingFiltered--;
}

this.processed++;
const start = this.profile ? Date.now() : 0;
yield yields[i] as T;
this.time = this.profile ? Date.now() - start : 0;

this.inflight--;
}
// yielding could have paused and microtask
Expand Down
Loading

0 comments on commit 4778f44

Please sign in to comment.