From c5cc00a9e0f1c90ee2cb57fe6c3767a285f4d8e3 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Thu, 27 May 2021 00:43:27 -0700 Subject: [PATCH] feat: overhaul stream store API to better fit actual use in kernel --- .../swing-store-lmdb/src/lmdbSwingStore.js | 82 +++++------- packages/swing-store-lmdb/test/test-state.js | 118 +++++------------ .../src/simpleSwingStore.js | 80 +++++------- .../swing-store-simple/test/test-state.js | 119 +++++------------- 4 files changed, 131 insertions(+), 268 deletions(-) diff --git a/packages/swing-store-lmdb/src/lmdbSwingStore.js b/packages/swing-store-lmdb/src/lmdbSwingStore.js index c7084ae72e1..2105ff2568b 100644 --- a/packages/swing-store-lmdb/src/lmdbSwingStore.js +++ b/packages/swing-store-lmdb/src/lmdbSwingStore.js @@ -15,7 +15,6 @@ const encoder = new util.TextEncoder(); * @typedef { import('@agoric/swing-store-simple').KVStore } KVStore * @typedef { import('@agoric/swing-store-simple').StreamPosition } StreamPosition * @typedef { import('@agoric/swing-store-simple').StreamStore } StreamStore - * @typedef { import('@agoric/swing-store-simple').StreamWriter } StreamWriter * @typedef { import('@agoric/swing-store-simple').SwingStore } SwingStore */ @@ -227,11 +226,10 @@ function makeSwingStore(dirPath, forceReset = false) { * * @returns {Iterable} an iterator for the items in the named stream */ - function openReadStream(streamName, startPosition, endPosition) { + function readStream(streamName, startPosition, endPosition) { insistStreamName(streamName); - const status = streamStatus.get(streamName); assert( - !status, + !streamStatus.get(streamName), X`can't read stream ${q(streamName)} because it's already in use`, ); insistStreamPosition(startPosition); @@ -277,9 +275,8 @@ function makeSwingStore(dirPath, forceReset = false) { function* reader() { try { while (true) { - const statusInner = streamStatus.get(streamName); assert( - statusInner === readStatus, + streamStatus.get(streamName) === readStatus, X`can't read stream ${q(streamName)}, it's been closed`, ); const line = /** @type {string|false} */ (innerReader.next()); @@ -299,9 +296,8 @@ function makeSwingStore(dirPath, forceReset = false) { } catch (e) { console.log(e); } finally { - const statusEnd = streamStatus.get(streamName); assert( - statusEnd === readStatus, + streamStatus.get(streamName) === readStatus, X`can't read stream ${q(streamName)}, it's been closed`, ); closeStream(streamName); @@ -313,61 +309,49 @@ function makeSwingStore(dirPath, forceReset = false) { } /** - * Obtain a writer for a stream. + * Write to a stream. * * @param {string} streamName The stream to be written + * @param {string} item The item to write + * @param {Object} position The position to write the item * - * @returns {StreamWriter} a writer for the named stream + * @returns {Object} the new position after writing */ - function openWriteStream(streamName) { + function writeStreamItem(streamName, item, position) { insistStreamName(streamName); - const status = streamStatus.get(streamName); - assert( - !status, - X`can't write stream ${q(streamName)} because it's already in use`, - ); - streamStatus.set(streamName, 'write'); + insistStreamPosition(position); - // XXX fdTemp is a workaround for a flaw in TypeScript's type inference - // It should be fd, which it should be changed to when and if they fix tsc. - let fdTemp = streamFds.get(streamName); - if (!fdTemp) { + let fd = streamFds.get(streamName); + if (!fd) { const filePath = `${dirPath}/${streamName}`; const mode = fs.existsSync(filePath) ? 'r+' : 'w'; - fdTemp = fs.openSync(filePath, mode); - streamFds.set(streamName, fdTemp); + fd = fs.openSync(filePath, mode); + streamFds.set(streamName, fd); + streamStatus.set(streamName, 'write'); + } else { + const status = streamStatus.get(streamName); + if (!status) { + streamStatus.set(streamName, 'write'); + } else { + assert( + status === 'write', + X`can't write stream ${q(streamName)} because it's already in use`, + ); + } } - const fd = fdTemp; activeStreamFds.add(fd); - /** - * Write to a stream. - * - * @param {string} item The item to write - * @param {Object} position The position to write the item - * - * @returns {Object} the new position after writing - */ - function write(item, position) { - assert.typeof(item, 'string'); - assert( - streamFds.get(streamName) === fd, - X`can't write to closed stream ${q(streamName)}`, - ); - insistStreamPosition(position); - const buf = encoder.encode(`${item}\n`); - fs.writeSync(fd, buf, 0, buf.length, position.offset); - return harden({ - offset: position.offset + buf.length, - itemCount: position.itemCount + 1, - }); - } - return write; + const buf = encoder.encode(`${item}\n`); + fs.writeSync(fd, buf, 0, buf.length, position.offset); + return harden({ + offset: position.offset + buf.length, + itemCount: position.itemCount + 1, + }); } const streamStore = harden({ - openReadStream, - openWriteStream, + readStream, + writeStreamItem, closeStream, STREAM_START, }); diff --git a/packages/swing-store-lmdb/test/test-state.js b/packages/swing-store-lmdb/test/test-state.js index ab14565e575..a3bd3721efd 100644 --- a/packages/swing-store-lmdb/test/test-state.js +++ b/packages/swing-store-lmdb/test/test-state.js @@ -71,72 +71,37 @@ test('streamStore read/write', t => { t.is(isSwingStore(dbDir), false); const { streamStore, commit, close } = initSwingStore(dbDir); - let s1pos = streamStore.STREAM_START; - const writer1 = streamStore.openWriteStream('st1'); - s1pos = writer1('first', s1pos); - s1pos = writer1('second', s1pos); + const start = streamStore.STREAM_START; + let s1pos = start; + s1pos = streamStore.writeStreamItem('st1', 'first', s1pos); + s1pos = streamStore.writeStreamItem('st1', 'second', s1pos); const s1posAlt = { ...s1pos }; - const writer2 = streamStore.openWriteStream('st2'); - s1pos = writer1('third', s1pos); + s1pos = streamStore.writeStreamItem('st1', 'third', s1pos); let s2pos = streamStore.STREAM_START; - s2pos = writer2('oneth', s2pos); - s1pos = writer1('fourth', s1pos); - s2pos = writer2('twoth', s2pos); + s2pos = streamStore.writeStreamItem('st2', 'oneth', s2pos); + s1pos = streamStore.writeStreamItem('st1', 'fourth', s1pos); + s2pos = streamStore.writeStreamItem('st2', 'twoth', s2pos); const s2posAlt = { ...s2pos }; - s2pos = writer2('threeth', s2pos); - s2pos = writer2('fourst', s2pos); + s2pos = streamStore.writeStreamItem('st2', 'threeth', s2pos); + s2pos = streamStore.writeStreamItem('st2', 'fourst', s2pos); streamStore.closeStream('st1'); streamStore.closeStream('st2'); - const reader1 = streamStore.openReadStream( - 'st1', - streamStore.STREAM_START, - s1pos, - ); - const reads1 = []; - for (const item of reader1) { - reads1.push(item); - } - t.deepEqual(reads1, ['first', 'second', 'third', 'fourth']); - const writer2alt = streamStore.openWriteStream('st2'); - s2pos = writer2alt('re3', s2posAlt); + const reader1 = streamStore.readStream('st1', start, s1pos); + t.deepEqual(Array.from(reader1), ['first', 'second', 'third', 'fourth']); + s2pos = streamStore.writeStreamItem('st2', 're3', s2posAlt); streamStore.closeStream('st2'); - const reader2 = streamStore.openReadStream( - 'st2', - streamStore.STREAM_START, - s2pos, - ); - const reads2 = []; - for (const item of reader2) { - reads2.push(item); - } - t.deepEqual(reads2, ['oneth', 'twoth', 're3']); - - const reader1alt = streamStore.openReadStream('st1', s1posAlt, s1pos); - const reads1alt = []; - for (const item of reader1alt) { - reads1alt.push(item); - } - t.deepEqual(reads1alt, ['third', 'fourth']); - - const writerEmpty = streamStore.openWriteStream('empty'); - const emptyPos = writerEmpty('filler', streamStore.STREAM_START); + const reader2 = streamStore.readStream('st2', start, s2pos); + t.deepEqual(Array.from(reader2), ['oneth', 'twoth', 're3']); + + const reader1alt = streamStore.readStream('st1', s1posAlt, s1pos); + t.deepEqual(Array.from(reader1alt), ['third', 'fourth']); + + const emptyPos = streamStore.writeStreamItem('empty', 'filler', start); streamStore.closeStream('empty'); - const readerEmpty = streamStore.openReadStream('empty', emptyPos, emptyPos); - const readsEmpty = []; - for (const item of readerEmpty) { - readsEmpty.push(item); - } - t.deepEqual(readsEmpty, []); - const readerEmpty2 = streamStore.openReadStream( - 'empty', - streamStore.STREAM_START, - streamStore.STREAM_START, - ); - const readsEmpty2 = []; - for (const item of readerEmpty2) { - readsEmpty2.push(item); - } - t.deepEqual(readsEmpty2, []); + const readerEmpty = streamStore.readStream('empty', emptyPos, emptyPos); + t.deepEqual(Array.from(readerEmpty), []); + const readerEmpty2 = streamStore.readStream('empty', start, start); + t.deepEqual(Array.from(readerEmpty2), []); commit(); close(); @@ -148,35 +113,20 @@ test('streamStore mode interlock', t => { fs.rmdirSync(dbDir, { recursive: true }); t.is(isSwingStore(dbDir), false); const { streamStore, commit, close } = initSwingStore(dbDir); + const start = streamStore.STREAM_START; - const writer = streamStore.openWriteStream('st1'); - const s1pos = writer('first', streamStore.STREAM_START); - t.throws( - () => streamStore.openReadStream('st1', streamStore.STREAM_START, s1pos), - { - message: `can't read stream "st1" because it's already in use`, - }, - ); - t.throws(() => streamStore.openWriteStream('st1', s1pos), { - message: `can't write stream "st1" because it's already in use`, + const s1pos = streamStore.writeStreamItem('st1', 'first', start); + + t.throws(() => streamStore.readStream('st1', start, s1pos), { + message: `can't read stream "st1" because it's already in use`, }); streamStore.closeStream('st1'); - t.throws(() => writer('second', streamStore.STREAM_START), { - message: `can't write to closed stream "st1"`, - }); - const reader = streamStore.openReadStream( - 'st1', - streamStore.STREAM_START, - s1pos, - ); - t.throws( - () => streamStore.openReadStream('st1', streamStore.STREAM_START, s1pos), - { - message: `can't read stream "st1" because it's already in use`, - }, - ); - t.throws(() => streamStore.openWriteStream('st1'), { + const reader = streamStore.readStream('st1', start, s1pos); + t.throws(() => streamStore.readStream('st1', start, s1pos), { + message: `can't read stream "st1" because it's already in use`, + }); + t.throws(() => streamStore.writeStreamItem('st1', start, s1pos), { message: `can't write stream "st1" because it's already in use`, }); streamStore.closeStream('st1'); diff --git a/packages/swing-store-simple/src/simpleSwingStore.js b/packages/swing-store-simple/src/simpleSwingStore.js index d82d5791405..f5beb13e656 100644 --- a/packages/swing-store-simple/src/simpleSwingStore.js +++ b/packages/swing-store-simple/src/simpleSwingStore.js @@ -20,13 +20,9 @@ import { assert, details as X, q } from '@agoric/assert'; * }} StreamPosition * * @typedef {{ - * (item: string, position: StreamPosition): StreamPosition - * }} StreamWriter - * - * @typedef {{ - * openWriteStream: (name: string) => StreamWriter, - * openReadStream: (name: string, startPosition: StreamPosition, endPosition: StreamPosition) => Iterable, - * closeStream: (name: string) => void, + * writeStreamItem: (streamName: string, item: string, position: StreamPosition) => StreamPosition, + * readStream: (streamName: string, startPosition: StreamPosition, endPosition: StreamPosition) => Iterable, + * closeStream: (streamName: string) => void, * STREAM_START: StreamPosition, * }} StreamStore * @@ -267,12 +263,11 @@ function makeSwingStore(dirPath, forceReset = false) { * * @yields {string} an iterator for the items in the named stream */ - function openReadStream(streamName, startPosition, endPosition) { + function readStream(streamName, startPosition, endPosition) { insistStreamName(streamName); const stream = streams.get(streamName) || []; - const status = streamStatus.get(streamName); assert( - !status, + !streamStatus.get(streamName), X`can't read stream ${q(streamName)} because it's already in use`, ); insistStreamPosition(startPosition); @@ -289,18 +284,16 @@ function makeSwingStore(dirPath, forceReset = false) { let pos = startPosition.itemCount; function* reader() { while (pos < stream.length) { - const statusInner = streamStatus.get(streamName); assert( - statusInner === readStatus, + streamStatus.get(streamName) === readStatus, X`can't read stream ${q(streamName)}, it's been closed`, ); const result = stream[pos]; pos += 1; yield result; } - const statusEnd = streamStatus.get(streamName); assert( - statusEnd === readStatus, + streamStatus.get(streamName) === readStatus, X`can't read stream ${q(streamName)}, it's been closed`, ); streamStatus.delete(streamName); @@ -310,53 +303,40 @@ function makeSwingStore(dirPath, forceReset = false) { } /** - * Obtain a writer for a stream. + * Write to a stream. * * @param {string} streamName The stream to be written + * @param {string} item The item to write + * @param {Object} position The position to write the item + * + * @returns {Object} the new position after writing */ - function openWriteStream(streamName) { + function writeStreamItem(streamName, item, position) { insistStreamName(streamName); - let streamTemp = streams.get(streamName); - if (!streamTemp) { - streamTemp = []; - streams.set(streamName, streamTemp); + insistStreamPosition(position); + let stream = streams.get(streamName); + if (!stream) { + stream = []; streamStatus.set(streamName, 'write'); + streams.set(streamName, stream); } else { const status = streamStatus.get(streamName); - assert( - !status, - X`can't write stream ${q(streamName)} because it's already in use`, - ); - } - const stream = streamTemp; - const writeStatus = `write-${statusCounter}`; - statusCounter += 1; - streamStatus.set(streamName, writeStatus); - - /** - * Write to a stream. - * - * @param {string} item The item to write - * @param {Object} position The position to write the item - * - * @returns {Object} the new position after writing - */ - function write(item, position) { - const status = streamStatus.get(streamName); - assert( - status === writeStatus, - X`can't write to closed stream ${q(streamName)}`, - ); - insistStreamPosition(position); - stream[position.itemCount] = item; - return harden({ itemCount: position.itemCount + 1 }); + if (!status) { + streamStatus.set(streamName, 'write'); + } else { + assert( + status === 'write', + X`can't write stream ${q(streamName)} because it's already in use`, + ); + } } - return write; + stream[position.itemCount] = item; + return harden({ itemCount: position.itemCount + 1 }); } const streamStore = harden({ - openReadStream, - openWriteStream, + readStream, + writeStreamItem, closeStream, STREAM_START, }); diff --git a/packages/swing-store-simple/test/test-state.js b/packages/swing-store-simple/test/test-state.js index 70f669e65df..c267c6e6498 100644 --- a/packages/swing-store-simple/test/test-state.js +++ b/packages/swing-store-simple/test/test-state.js @@ -76,72 +76,37 @@ test('rejectLMDB', t => { test('streamStore read/write', t => { const { streamStore, commit, close } = initSwingStore(); - let s1pos = streamStore.STREAM_START; - const writer1 = streamStore.openWriteStream('st1'); - s1pos = writer1('first', s1pos); - s1pos = writer1('second', s1pos); + const start = streamStore.STREAM_START; + let s1pos = start; + s1pos = streamStore.writeStreamItem('st1', 'first', s1pos); + s1pos = streamStore.writeStreamItem('st1', 'second', s1pos); const s1posAlt = { ...s1pos }; - const writer2 = streamStore.openWriteStream('st2'); - s1pos = writer1('third', s1pos); - let s2pos = { itemCount: 0 }; - s2pos = writer2('oneth', s2pos); - s1pos = writer1('fourth', s1pos); - s2pos = writer2('twoth', s2pos); + s1pos = streamStore.writeStreamItem('st1', 'third', s1pos); + let s2pos = streamStore.STREAM_START; + s2pos = streamStore.writeStreamItem('st2', 'oneth', s2pos); + s1pos = streamStore.writeStreamItem('st1', 'fourth', s1pos); + s2pos = streamStore.writeStreamItem('st2', 'twoth', s2pos); const s2posAlt = { ...s2pos }; - s2pos = writer2('threeth', s2pos); - s2pos = writer2('fourst', s2pos); + s2pos = streamStore.writeStreamItem('st2', 'threeth', s2pos); + s2pos = streamStore.writeStreamItem('st2', 'fourst', s2pos); streamStore.closeStream('st1'); streamStore.closeStream('st2'); - const reader1 = streamStore.openReadStream( - 'st1', - streamStore.STREAM_START, - s1pos, - ); - const reads1 = []; - for (const item of reader1) { - reads1.push(item); - } - t.deepEqual(reads1, ['first', 'second', 'third', 'fourth']); - const writer2alt = streamStore.openWriteStream('st2'); - s2pos = writer2alt('re3', s2posAlt); + const reader1 = streamStore.readStream('st1', start, s1pos); + t.deepEqual(Array.from(reader1), ['first', 'second', 'third', 'fourth']); + s2pos = streamStore.writeStreamItem('st2', 're3', s2posAlt); streamStore.closeStream('st2'); - const reader2 = streamStore.openReadStream( - 'st2', - streamStore.STREAM_START, - s2pos, - ); - const reads2 = []; - for (const item of reader2) { - reads2.push(item); - } - t.deepEqual(reads2, ['oneth', 'twoth', 're3']); - - const reader1alt = streamStore.openReadStream('st1', s1posAlt, s1pos); - const reads1alt = []; - for (const item of reader1alt) { - reads1alt.push(item); - } - t.deepEqual(reads1alt, ['third', 'fourth']); - - const writerEmpty = streamStore.openWriteStream('empty'); - const emptyPos = writerEmpty('filler', streamStore.STREAM_START); + const reader2 = streamStore.readStream('st2', start, s2pos); + t.deepEqual(Array.from(reader2), ['oneth', 'twoth', 're3']); + + const reader1alt = streamStore.readStream('st1', s1posAlt, s1pos); + t.deepEqual(Array.from(reader1alt), ['third', 'fourth']); + + const emptyPos = streamStore.writeStreamItem('empty', 'filler', start); streamStore.closeStream('empty'); - const readerEmpty = streamStore.openReadStream('empty', emptyPos, emptyPos); - const readsEmpty = []; - for (const item of readerEmpty) { - readsEmpty.push(item); - } - t.deepEqual(readsEmpty, []); - const readerEmpty2 = streamStore.openReadStream( - 'empty', - streamStore.STREAM_START, - streamStore.STREAM_START, - ); - const readsEmpty2 = []; - for (const item of readerEmpty2) { - readsEmpty2.push(item); - } - t.deepEqual(readsEmpty2, []); + const readerEmpty = streamStore.readStream('empty', emptyPos, emptyPos); + t.deepEqual(Array.from(readerEmpty), []); + const readerEmpty2 = streamStore.readStream('empty', start, start); + t.deepEqual(Array.from(readerEmpty2), []); commit(); close(); @@ -149,36 +114,20 @@ test('streamStore read/write', t => { test('streamStore mode interlock', t => { const { streamStore, commit, close } = initSwingStore(); + const start = streamStore.STREAM_START; - const writer = streamStore.openWriteStream('st1'); - const s1pos = writer('first', streamStore.STREAM_START); + const s1pos = streamStore.writeStreamItem('st1', 'first', start); - t.throws( - () => streamStore.openReadStream('st1', streamStore.STREAM_START, s1pos), - { - message: `can't read stream "st1" because it's already in use`, - }, - ); - t.throws(() => streamStore.openWriteStream('st1', s1pos), { - message: `can't write stream "st1" because it's already in use`, + t.throws(() => streamStore.readStream('st1', start, s1pos), { + message: `can't read stream "st1" because it's already in use`, }); streamStore.closeStream('st1'); - t.throws(() => writer('second', streamStore.STREAM_START), { - message: `can't write to closed stream "st1"`, - }); - const reader = streamStore.openReadStream( - 'st1', - streamStore.STREAM_START, - s1pos, - ); - t.throws( - () => streamStore.openReadStream('st1', streamStore.STREAM_START, s1pos), - { - message: `can't read stream "st1" because it's already in use`, - }, - ); - t.throws(() => streamStore.openWriteStream('st1'), { + const reader = streamStore.readStream('st1', start, s1pos); + t.throws(() => streamStore.readStream('st1', start, s1pos), { + message: `can't read stream "st1" because it's already in use`, + }); + t.throws(() => streamStore.writeStreamItem('st1', start, s1pos), { message: `can't write stream "st1" because it's already in use`, }); streamStore.closeStream('st1');