Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various failing tests #624

Merged
merged 11 commits into from
Jun 17, 2024
2 changes: 1 addition & 1 deletion test/unit/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function testProtos(...requested: Proto[]) {
const set = new Set(requested)

/* Do not test with ipc if unsupported. */
if (!zmq.capability.ipc) {
if (zmq.capability.ipc !== true) {
set.delete("ipc")
}

Expand Down
19 changes: 10 additions & 9 deletions test/unit/socket-construction-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,16 @@ describe("socket construction", function () {
)
})

if (!zmq.capability.draft) {
it("should throw with draft type", function () {
assert.throws(
() => new (zmq.Socket as any)(14),
Error,
"Invalid argument",
)
})
}
it("should throw with draft type", function () {
if (zmq.capability.draft === true) {
this.skip()
}
assert.throws(
() => new (zmq.Socket as any)(14),
Error,
"Invalid argument",
)
})

it("should throw error on file descriptor limit", async function () {
const context = new zmq.Context({maxSockets: 10})
Expand Down
2 changes: 1 addition & 1 deletion test/unit/socket-curve-send-receive-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {testProtos, uniqAddress} from "./helpers"

for (const proto of testProtos("tcp", "ipc", "inproc")) {
describe(`socket with ${proto} curve send/receive`, function () {
if (!zmq.capability.curve) {
if (zmq.capability.curve !== true) {
return
}

Expand Down
87 changes: 44 additions & 43 deletions test/unit/socket-draft-dgram-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,60 +5,61 @@ import {assert} from "chai"
import {createSocket} from "dgram"
import {testProtos, uniqAddress} from "./helpers"

if (zmq.capability.draft) {
for (const proto of testProtos("udp")) {
describe(`draft socket with ${proto} dgram`, function () {
let dgram: draft.Datagram
for (const proto of testProtos("udp")) {
describe(`draft socket with ${proto} dgram`, function () {
if (zmq.capability.draft !== true) {
if (process.env.ZMQ_DRAFT === "true") {
throw new Error("Draft API requested but not available at runtime.")
}
return
}

beforeEach(function () {
dgram = new draft.Datagram()
})
let dgram: draft.Datagram

afterEach(function () {
dgram.close()
global.gc?.()
aminya marked this conversation as resolved.
Show resolved Hide resolved
})
beforeEach(function () {
dgram = new draft.Datagram()
})

describe("send/receive", function () {
it("should deliver messages", async function () {
const messages = ["foo", "bar", "baz", "qux"]
const address = uniqAddress(proto)
const port = parseInt(address.split(":").pop()!, 10)
afterEach(function () {
dgram.close()
global.gc?.()
})

await dgram.bind(address)
describe("send/receive", function () {
it("should deliver messages", async function () {
const messages = ["foo", "bar", "baz", "qux"]
const address = uniqAddress(proto)
const port = parseInt(address.split(":").pop()!, 10)

const echo = async () => {
for await (const [id, msg] of dgram) {
await dgram.send([id, msg])
}
}
await dgram.bind(address)

const received: string[] = []
const send = async () => {
for (const msg of messages) {
const client = createSocket("udp4")
await new Promise(resolve => {
client.on("message", res => {
received.push(res.toString())
client.close()
resolve(undefined)
})
const echo = async () => {
for await (const [id, msg] of dgram) {
await dgram.send([id, msg])
}
}

client.send(msg, port, "localhost")
const received: string[] = []
const send = async () => {
for (const msg of messages) {
const client = createSocket("udp4")
await new Promise(resolve => {
client.on("message", res => {
received.push(res.toString())
client.close()
resolve(undefined)
})
}

dgram.close()
client.send(msg, port, "localhost")
})
}

await Promise.all([echo(), send()])
assert.deepEqual(received, messages)
})
dgram.close()
}

await Promise.all([echo(), send()])
assert.deepEqual(received, messages)
})
})
}
} else {
if (process.env.ZMQ_DRAFT === "true") {
throw new Error("Draft API requested but not available at runtime.")
}
})
}
175 changes: 88 additions & 87 deletions test/unit/socket-draft-radio-dish-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,123 +4,124 @@ import * as draft from "../../src/draft"
import {assert} from "chai"
import {testProtos, uniqAddress} from "./helpers"

if (zmq.capability.draft) {
for (const proto of testProtos("tcp", "ipc", "inproc", "udp")) {
describe(`draft socket with ${proto} radio/dish`, function () {
let radio: draft.Radio
let dish: draft.Dish

beforeEach(function () {
radio = new draft.Radio()
dish = new draft.Dish()
})
for (const proto of testProtos("tcp", "ipc", "inproc", "udp")) {
describe(`draft socket with ${proto} radio/dish`, function () {
if (zmq.capability.draft !== true) {
if (process.env.ZMQ_DRAFT === "true") {
throw new Error("Draft API requested but not available at runtime.")
}
return
}

let radio: draft.Radio
let dish: draft.Dish

beforeEach(function () {
radio = new draft.Radio()
dish = new draft.Dish()
})

afterEach(function () {
global.gc?.()
radio.close()
dish.close()
global.gc?.()
})
afterEach(function () {
global.gc?.()
radio.close()
dish.close()
global.gc?.()
})

describe("send/receive", function () {
it("should deliver messages", async function () {
/* RADIO -> foo -> DISH
describe("send/receive", function () {
it("should deliver messages", async function () {
/* RADIO -> foo -> DISH
-> bar -> joined all
-> baz ->
-> qux ->
*/

const address = uniqAddress(proto)
const messages = ["foo", "bar", "baz", "qux"]
const address = uniqAddress(proto)
const messages = ["foo", "bar", "baz", "qux"]

/* Max 15 non-null bytes. */
const uuid = Buffer.from([
0xf6, 0x46, 0x1f, 0x03, 0xd2, 0x0d, 0xc8, 0x66, 0xe5, 0x5f, 0xf5,
0xa1, 0x65, 0x62, 0xb2,
])
/* Max 15 non-null bytes. */
const uuid = Buffer.from([
0xf6, 0x46, 0x1f, 0x03, 0xd2, 0x0d, 0xc8, 0x66, 0xe5, 0x5f, 0xf5,
0xa1, 0x65, 0x62, 0xb2,
])

const received: string[] = []
const received: string[] = []

dish.join(uuid)
dish.join(uuid)

await dish.bind(address)
await radio.connect(address)
await dish.bind(address)
await radio.connect(address)

const send = async () => {
/* Wait briefly before publishing to avoid slow joiner syndrome. */
await new Promise(resolve => {
setTimeout(resolve, 25)
})
for (const msg of messages) {
await radio.send(msg, {group: uuid})
}
const send = async () => {
/* Wait briefly before publishing to avoid slow joiner syndrome. */
await new Promise(resolve => {
setTimeout(resolve, 25)
})
for (const msg of messages) {
await radio.send(msg, {group: uuid})
}

const receive = async () => {
for await (const [msg, {group}] of dish) {
assert.instanceOf(msg, Buffer)
assert.instanceOf(group, Buffer)
assert.deepEqual(group, uuid)
received.push(msg.toString())
if (received.length === messages.length) {
break
}
}

const receive = async () => {
for await (const [msg, {group}] of dish) {
assert.instanceOf(msg, Buffer)
assert.instanceOf(group, Buffer)
assert.deepEqual(group, uuid)
received.push(msg.toString())
if (received.length === messages.length) {
break
}
}
}

await Promise.all([send(), receive()])
assert.deepEqual(received, messages)
})
await Promise.all([send(), receive()])
assert.deepEqual(received, messages)
})
})

describe("join/leave", function () {
it("should filter messages", async function () {
/* RADIO -> foo -X DISH
describe("join/leave", function () {
it("should filter messages", async function () {
/* RADIO -> foo -X DISH
-> bar -> joined "ba"
-> baz ->
-> qux -X
*/

const address = uniqAddress(proto)
const messages = ["foo", "bar", "baz", "qux"]
const received: string[] = []
const address = uniqAddress(proto)
const messages = ["foo", "bar", "baz", "qux"]
const received: string[] = []

/* Everything after null byte should be ignored. */
dish.join(Buffer.from("fo\x00ba"), Buffer.from("ba\x00fo"))
dish.leave(Buffer.from("fo"))
/* Everything after null byte should be ignored. */
dish.join(Buffer.from("fo\x00ba"), Buffer.from("ba\x00fo"))
dish.leave(Buffer.from("fo"))

await dish.bind(address)
await radio.connect(address)
await dish.bind(address)
await radio.connect(address)

const send = async () => {
/* Wait briefly before publishing to avoid slow joiner syndrome. */
await new Promise(resolve => {
setTimeout(resolve, 25)
})
for (const msg of messages) {
await radio.send(msg, {group: msg.slice(0, 2)})
}
const send = async () => {
/* Wait briefly before publishing to avoid slow joiner syndrome. */
await new Promise(resolve => {
setTimeout(resolve, 25)
})
for (const msg of messages) {
await radio.send(msg, {group: msg.slice(0, 2)})
}

const receive = async () => {
for await (const [msg, {group}] of dish) {
assert.instanceOf(msg, Buffer)
assert.deepEqual(group, msg.slice(0, 2))
received.push(msg.toString())
if (received.length === 2) {
break
}
}

const receive = async () => {
for await (const [msg, {group}] of dish) {
assert.instanceOf(msg, Buffer)
assert.deepEqual(group, msg.slice(0, 2))
received.push(msg.toString())
if (received.length === 2) {
break
}
}
}

await Promise.all([send(), receive()])
assert.deepEqual(received, ["bar", "baz"])
})
await Promise.all([send(), receive()])
assert.deepEqual(received, ["bar", "baz"])
})
})
}
} else {
if (process.env.ZMQ_DRAFT === "true") {
throw new Error("Draft API requested but not available at runtime.")
}
})
}
Loading