diff --git a/docs/modules/Demux.ts.md b/docs/modules/Demux.ts.md index 48e2b090..8686838a 100644 --- a/docs/modules/Demux.ts.md +++ b/docs/modules/Demux.ts.md @@ -71,17 +71,19 @@ the correct sink. **Signature** ```ts -export declare const demuxMultiplexedSocket: (( +export declare const demuxMultiplexedSocket: (( source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink -) => (socket: MultiplexedStreamSocket) => Effect.Effect) & - (( + sink1: Sink.Sink, + sink2: Sink.Sink +) => ( + socket: MultiplexedStreamSocket +) => Effect.Effect) & + (( socket: MultiplexedStreamSocket, source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ) => Effect.Effect) + sink1: Sink.Sink, + sink2: Sink.Sink + ) => Effect.Effect) ``` Added in v1.0.0 @@ -97,15 +99,15 @@ combined on the same sink. **Signature** ```ts -export declare const demuxRawSocket: (( +export declare const demuxRawSocket: (( source: Stream.Stream, - sink: Sink.Sink -) => (socket: RawStreamSocket) => Effect.Effect) & - (( + sink: Sink.Sink +) => (socket: RawStreamSocket) => Effect.Effect) & + (( socket: RawStreamSocket, source: Stream.Stream, - sink: Sink.Sink - ) => Effect.Effect) + sink: Sink.Sink + ) => Effect.Effect) ``` Added in v1.0.0 @@ -122,26 +124,28 @@ and one for stderr. ```ts export declare const demuxSocket: { - ( + ( socket: RawStreamSocket, source: Stream.Stream, - sink: Sink.Sink - ): Effect.Effect - ( + sink: Sink.Sink + ): Effect.Effect + ( source: Stream.Stream, - sink: Sink.Sink - ): (socket: RawStreamSocket) => Effect.Effect - ( + sink: Sink.Sink + ): (socket: RawStreamSocket) => Effect.Effect + ( socket: MultiplexedStreamSocket, source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ): Effect.Effect - ( + sink1: Sink.Sink, + sink2: Sink.Sink + ): Effect.Effect + ( source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ): (socket: MultiplexedStreamSocket) => Effect.Effect + sink1: Sink.Sink, + sink2: Sink.Sink + ): ( + socket: MultiplexedStreamSocket + ) => Effect.Effect } ``` diff --git a/src/Demux.ts b/src/Demux.ts index 47860950..fdb05d9a 100644 --- a/src/Demux.ts +++ b/src/Demux.ts @@ -15,6 +15,7 @@ import * as Effect from "effect/Effect"; import * as Function from "effect/Function"; import * as Sink from "effect/Sink"; import * as Stream from "effect/Stream"; +import * as Tuple from "effect/Tuple"; import { IExposeSocketOnEffectClientResponse } from "./Requests.js"; @@ -147,22 +148,22 @@ export enum MultiplexedStreamSocketHeaderType { * @see https://docs.docker.com/engine/api/v1.43/#tag/Container/operation/ContainerAttach */ export const demuxRawSocket = Function.dual< - ( + ( source: Stream.Stream, - sink: Sink.Sink - ) => (socket: RawStreamSocket) => Effect.Effect, - ( + sink: Sink.Sink + ) => (socket: RawStreamSocket) => Effect.Effect, + ( socket: RawStreamSocket, source: Stream.Stream, - sink: Sink.Sink - ) => Effect.Effect + sink: Sink.Sink + ) => Effect.Effect >( 3, - ( + ( socket: RawStreamSocket, source: Stream.Stream, - sink: Sink.Sink - ): Effect.Effect => + sink: Sink.Sink + ): Effect.Effect => Function.pipe(source, Stream.pipeThroughChannel(Socket.toChannel(socket)), Stream.run(sink)) ); @@ -178,25 +179,27 @@ export const demuxRawSocket = Function.dual< * @see https://docs.docker.com/engine/api/v1.43/#tag/Container/operation/ContainerAttach */ export const demuxMultiplexedSocket = Function.dual< - ( + ( source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ) => (socket: MultiplexedStreamSocket) => Effect.Effect, - ( + sink1: Sink.Sink, + sink2: Sink.Sink + ) => ( + socket: MultiplexedStreamSocket + ) => Effect.Effect, + ( socket: MultiplexedStreamSocket, source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ) => Effect.Effect + sink1: Sink.Sink, + sink2: Sink.Sink + ) => Effect.Effect >( 4, - ( + ( socket: MultiplexedStreamSocket, source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ): Effect.Effect => + sink1: Sink.Sink, + sink2: Sink.Sink + ): Effect.Effect => Function.pipe( source, Stream.pipeThroughChannel(Socket.toChannel(socket)), @@ -221,16 +224,18 @@ export const demuxMultiplexedSocket = Function.dual< type === MultiplexedStreamSocketHeaderType.Stderr ), Stream.partition(({ type }) => type === MultiplexedStreamSocketHeaderType.Stderr), - Effect.map(([stdout, stderr]) => [ - stdout.pipe( - Stream.map(({ contents }) => contents), - Stream.run(sink1) - ), - stderr.pipe( - Stream.map(({ contents }) => contents), - Stream.run(sink2) - ), - ]), + Effect.map( + Tuple.mapBoth({ + onFirst: Function.compose( + Stream.map(({ contents }) => contents), + Stream.run(sink1) + ), + onSecond: Function.compose( + Stream.map(({ contents }) => contents), + Stream.run(sink2) + ), + }) + ), Effect.map(Effect.all), Effect.flatten, Effect.scoped @@ -248,34 +253,36 @@ export const demuxMultiplexedSocket = Function.dual< * @category Demux */ export const demuxSocket: { - ( + ( socket: RawStreamSocket, source: Stream.Stream, - sink: Sink.Sink - ): Effect.Effect; - ( + sink: Sink.Sink + ): Effect.Effect; + ( source: Stream.Stream, - sink: Sink.Sink - ): (socket: RawStreamSocket) => Effect.Effect; - ( + sink: Sink.Sink + ): (socket: RawStreamSocket) => Effect.Effect; + ( socket: MultiplexedStreamSocket, source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ): Effect.Effect; - ( + sink1: Sink.Sink, + sink2: Sink.Sink + ): Effect.Effect; + ( source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ): (socket: MultiplexedStreamSocket) => Effect.Effect; + sink1: Sink.Sink, + sink2: Sink.Sink + ): ( + socket: MultiplexedStreamSocket + ) => Effect.Effect; } = Function.dual( (arguments_) => arguments_[0][Socket.TypeId], - ( + ( socket: RawStreamSocket | MultiplexedStreamSocket, source: Stream.Stream, - sink1: Sink.Sink, - sink2: Sink.Sink - ): Effect.Effect => { + sink1: Sink.Sink, + sink2: Sink.Sink + ): Effect.Effect => { switch (socket["content-type"]) { case "application/vnd.docker.raw-stream": { return demuxRawSocket(socket, source, sink1);