Skip to content

Commit

Permalink
fix demux function types again
Browse files Browse the repository at this point in the history
  • Loading branch information
leonitousconforti committed May 7, 2024
1 parent 8b50c91 commit db60612
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 62 deletions.
48 changes: 24 additions & 24 deletions docs/modules/Demux.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ the correct sink.
```ts
export declare const demuxMultiplexedSocket: (<A1, A2, E1, E2, E3>(
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
) => (
socket: MultiplexedStreamSocket
) => Effect.Effect<readonly [stdout: A1, stderr: A2], Socket.SocketError | E1 | E2 | E3, never>) &
(<A1, A2, E1, E2, E3>(
socket: MultiplexedStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
) => Effect.Effect<readonly [stdout: A1, stderr: A2], Socket.SocketError | E1 | E2 | E3, never>)
```
Expand All @@ -99,15 +99,15 @@ combined on the same sink.
**Signature**
```ts
export declare const demuxRawSocket: (<A, E1, E2>(
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A, string | Uint8Array, never, E2, never>
) => (socket: RawStreamSocket) => Effect.Effect<A, Socket.SocketError | E1 | E2, never>) &
(<A, E1, E2>(
export declare const demuxRawSocket: (<A1, E1, E2>(
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
) => (socket: RawStreamSocket) => Effect.Effect<A1, Socket.SocketError | E1 | E2, never>) &
(<A1, E1, E2>(
socket: RawStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A, string | Uint8Array, never, E2, never>
) => Effect.Effect<A, Socket.SocketError | E1 | E2, never>)
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
) => Effect.Effect<A1, Socket.SocketError | E1 | E2, never>)
```
Added in v1.0.0
Expand All @@ -126,23 +126,23 @@ and one for stderr.
export declare const demuxSocket: {
<A1, E1, E2>(
socket: RawStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A1, string | Uint8Array, never, E2, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
): Effect.Effect<A1, Socket.SocketError | E1 | E2, never>
<A1, E1, E2>(
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A1, string | Uint8Array, never, E2, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
): (socket: RawStreamSocket) => Effect.Effect<A1, Socket.SocketError | E1 | E2, never>
<A1, A2, E1, E2, E3>(
socket: MultiplexedStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
): Effect.Effect<readonly [stdout: A1, stderr: A2], Socket.SocketError | E1 | E2 | E3, never>
<A1, A2, E1, E2, E3>(
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
): (
socket: MultiplexedStreamSocket
) => Effect.Effect<readonly [stdout: A1, stderr: A2], Socket.SocketError | E1 | E2 | E3, never>
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "the-moby-effect",
"version": "1.45.0-alpha.1",
"version": "1.45.0-alpha.2",
"description": "Moby/Docker API client built using effect-ts",
"keywords": [
"moby",
Expand Down
79 changes: 42 additions & 37 deletions src/Demux.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,23 +148,28 @@ export enum MultiplexedStreamSocketHeaderType {
* @see https://docs.docker.com/engine/api/v1.43/#tag/Container/operation/ContainerAttach
*/
export const demuxRawSocket = Function.dual<
<A, E1, E2>(
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A, string | Uint8Array, never, E2, never>
) => (socket: RawStreamSocket) => Effect.Effect<A, E1 | E2 | Socket.SocketError, never>,
<A, E1, E2>(
<A1, E1, E2>(
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
) => (socket: RawStreamSocket) => Effect.Effect<A1, E1 | E2 | Socket.SocketError, never>,
<A1, E1, E2>(
socket: RawStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A, string | Uint8Array, never, E2, never>
) => Effect.Effect<A, E1 | E2 | Socket.SocketError, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
) => Effect.Effect<A1, E1 | E2 | Socket.SocketError, never>
>(
3,
<A, E1, E2>(
<A1, E1, E2>(
socket: RawStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A, string | Uint8Array, never, E2, never>
): Effect.Effect<A, E1 | E2 | Socket.SocketError, never> =>
Function.pipe(source, Stream.pipeThroughChannel(Socket.toChannel(socket)), Stream.run(sink))
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
): Effect.Effect<A1, E1 | E2 | Socket.SocketError, never> =>
Function.pipe(
source,
Stream.pipeThroughChannel(Socket.toChannel(socket)),
Stream.decodeText(),
Stream.run(sink)
)
);

/**
Expand All @@ -180,39 +185,39 @@ export const demuxRawSocket = Function.dual<
*/
export const demuxMultiplexedSocket = Function.dual<
<A1, A2, E1, E2, E3>(
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
) => (
socket: MultiplexedStreamSocket
) => Effect.Effect<readonly [stdout: A1, stderr: A2], E1 | E2 | E3 | Socket.SocketError, never>,
<A1, A2, E1, E2, E3>(
socket: MultiplexedStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
) => Effect.Effect<readonly [stdout: A1, stderr: A2], E1 | E2 | E3 | Socket.SocketError, never>
>(
4,
<A1, A2, E1, E2, E3>(
socket: MultiplexedStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
): Effect.Effect<readonly [stdout: A1, stderr: A2], E1 | E2 | E3 | Socket.SocketError, never> =>
Function.pipe(
source,
Stream.pipeThroughChannel(Socket.toChannel(socket)),
Stream.mapConcat((data) => {
let offset = 0;
const output: Array<{ type: MultiplexedStreamSocketHeaderType; contents: Uint8Array }> = [];
const output: Array<{ type: MultiplexedStreamSocketHeaderType; contents: string }> = [];

while (offset <= data.length - 8) {
const header = new DataView(data.buffer);
const type = header.getUint8(0);
const length = header.getUint32(4);
const thisMessage = data.slice(8, 8 + length);
output.push({ type, contents: thisMessage });
output.push({ type, contents: thisMessage.toString() });
offset += 8 + length;
}

Expand Down Expand Up @@ -255,33 +260,33 @@ export const demuxMultiplexedSocket = Function.dual<
export const demuxSocket: {
<A1, E1, E2>(
socket: RawStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A1, string | Uint8Array, never, E2, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
): Effect.Effect<A1, E1 | E2 | Socket.SocketError, never>;
<A1, E1, E2>(
source: Stream.Stream<Uint8Array, E1, never>,
sink: Sink.Sink<A1, string | Uint8Array, never, E2, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink: Sink.Sink<A1, string, never, E2, never>
): (socket: RawStreamSocket) => Effect.Effect<A1, E1 | E2 | Socket.SocketError, never>;
<A1, A2, E1, E2, E3>(
socket: MultiplexedStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
): Effect.Effect<readonly [stdout: A1, stderr: A2], E1 | E2 | E3 | Socket.SocketError, never>;
<A1, A2, E1, E2, E3>(
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
): (
socket: MultiplexedStreamSocket
) => Effect.Effect<readonly [stdout: A1, stderr: A2], E1 | E2 | E3 | Socket.SocketError, never>;
} = Function.dual(
(arguments_) => arguments_[0][Socket.TypeId],
<A1, A2, E1, E2, E3>(
socket: RawStreamSocket | MultiplexedStreamSocket,
source: Stream.Stream<Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string | Uint8Array, never, E2, never>,
sink2: Sink.Sink<A2, string | Uint8Array, never, E3, never>
source: Stream.Stream<string | Uint8Array, E1, never>,
sink1: Sink.Sink<A1, string, never, E2, never>,
sink2: Sink.Sink<A2, string, never, E3, never>
): Effect.Effect<A1 | readonly [stdout: A1, stderr: A2], E1 | E2 | E3 | Socket.SocketError, never> => {
switch (socket["content-type"]) {
case "application/vnd.docker.raw-stream": {
Expand Down

0 comments on commit db60612

Please sign in to comment.