-
Notifications
You must be signed in to change notification settings - Fork 1
/
writeahead.js
308 lines (291 loc) · 11.9 KB
/
writeahead.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
'use strict'
const assert = require('assert')
const path = require('path')
const fs = require('fs').promises
const { Recorder, Player } = require('transcript')
const Destructible = require('destructible')
const Operation = require('operation')
const Interrupt = require('interrupt')
const Keyify = require('keyify')
const Fracture = require('fracture')
const Sequester = require('sequester')
class WriteAhead {
static Error = Interrupt.create('WriteAhead.Error', {
'IO_ERROR': 'i/o error',
'NO_LOGS': 'attempt to write when no logs exist',
'OPEN_ERROR': {
code: 'IO_ERROR',
message: 'unable to open file'
},
'BLOCK_SHORT_READ': {
code: 'IO_ERROR',
message: 'incomplete read of block'
},
'BLOCK_MISSING': {
code: 'IO_ERROR',
message: 'block did not parse correctly'
},
'INVALID_CHECKSUM': {
code: 'IO_ERROR',
message: 'block contained an invalid checksum'
}
})
//
// Unlike other modules, WriteAhead manages its own Turnstile. It only ever
// needs one background strand so we can just give it a single strand
// Turnstile and not have to worry about the deadlock issues of a shared
// Turnstile.
//
constructor (destructible, turnstile, { directory, logs, checksum, blocks, open, position, sync }) {
this.destructible = destructible
this.deferrable = destructible.durable($ => $(), { countdown: 1 }, 'deferrable')
// Create a Fracture using a private Turnstile.
this.turnstile = turnstile
this._fracture = new Fracture(this.destructible.durable($ => $(), 'appender'), {
turnstile: this.turnstile,
value: name => {
switch (name) {
case 'write':
return { blocks: [], sync: false }
case 'rotate':
case 'shift':
return {}
}
},
worker: this._background.bind(this)
})
this._fracture.deferrable.increment()
this.destructible.destruct(() => this.deferrable.decrement())
this.deferrable.destruct(() => {
this.deferrable.ephemeral($ => $(), 'shutdown', async () => {
await this._fracture.drain()
this._fracture.deferrable.decrement()
if (this._open != null) {
const open = this._open
this._open = null
await this.sync.sync(open)
await WriteAhead.Error.resolve(open.handle.close(), 'IO_ERROR', open.properties)
}
})
})
this.directory = directory
this._logs = logs
this._recorder = Recorder.create(checksum)
this._blocks = blocks
this._checksum = checksum
this._open = open
this._position = position
this.sync = sync
}
get position () {
return this._position
}
static async open ({ directory, checksum = () => 0, sync = new Operation.Sync }) {
const dir = await fs.readdir(directory)
const ids = dir.filter(file => /^\d+$/.test(file))
.map(log => +log)
.sort((left, right) => left - right)
const player = new Player(checksum)
const blocks = {}
for (const id of ids) {
blocks[id] = {}
const filename = path.join(directory, String(id))
let position = 0, remainder = 0
await Operation.read(filename, Buffer.alloc(1024 * 1024), ({ buffer }) => {
let offset = 0
for (;;) {
const [ entry ] = player.split(buffer.slice(offset), 1)
if (entry == null) {
break
}
const keys = JSON.parse(String(entry.parts[0]))
.map(key => Keyify.stringify(key))
const length = entry.sizes.reduce((sum, value) => sum + value, 0)
for (const key of keys) {
blocks[id][key] || (blocks[id][key] = [])
blocks[id][key].push({ position, length })
}
position += length
offset += length - remainder
remainder = 0
}
remainder = buffer.length - offset
})
}
const logs = ids.map(id => ({ id, shifted: false, sequester: new Sequester }))
// Order here is so that handle open comes last so I don't have to add a
// catch block with a close.
if (logs.length == 0) {
blocks[0] = []
const log = {
id: 0,
shifted: false,
sequester: new Sequester
}
logs.push(log)
const filename = path.join(directory, String(log.id))
await WriteAhead.Error.resolve(fs.writeFile(filename, '', { flags: 'wx' }), 'IO_ERROR', { filename })
}
const log = logs[logs.length - 1]
const filename = path.join(directory, String(log.id))
const stat = await WriteAhead.Error.resolve(fs.stat(filename), 'IO_ERROR', { filename })
const position = stat.size
const open = await Operation.open(filename, sync.flag)
return { directory, logs, checksum, blocks, open, position, sync }
}
//
// Returns an asynchronous iterator over the blocks for the given key.
//
async *get (key) {
const keyified = Keyify.stringify(key)
const player = new Player(this._checksum)
const shared = this._logs.slice()
for (const log of shared) {
await log.sequester.share()
}
const logs = shared.filter(log => ! log.shifted)
try {
for (let i = 0; i < logs.length; i++) {
const log = this._logs[i]
const filename = path.join(this.directory, String(log.id))
const handle = await WriteAhead.Error.resolve(fs.open(filename, 'r'), 'OPEN_ERROR', { filename })
const got = this._blocks[log.id][keyified] || []
try {
for (let j = 0, J = got.length; j < J; j++) {
const block = got[j]
if (block.buffer != null) {
yield block.buffer
} else {
const buffer = Buffer.alloc(block.length)
const { bytesRead } = await WriteAhead.Error.resolve(handle.read(buffer, 0, buffer.length, block.position), 'READ_ERROR', { filename })
WriteAhead.Error.assert(bytesRead == buffer.length, 'BLOCK_SHORT_READ', { filename })
const entries = player.split(buffer)
WriteAhead.Error.assert(entries.length == 1, 'BLOCK_MISSING', { filename })
yield entries[0].parts[1]
}
}
} finally {
await WriteAhead.Error.resolve(handle.close(), 'CLOSE_ERROR', { filename })
}
}
} finally {
shared.map(log => log.sequester.unlock())
}
}
// Write a batch of entries to the write-ahead log. `entries` is an array of
// application specific objects to log. `converter` converts the entry to a
// set of keys, header and body.
//
// The keys are used to reconstruct a file stream from the write-ahead log.
// The body of the message will be included in the stream constructed for
// any of the keys in the set of keys. It is up to the application to fish
// out the relevant content from the body for a given key.
//
write (stack, entries, sync = false) {
this.deferrable.operational()
const log = this._logs[this._logs.length - 1], blocks = []
for (const entry of entries) {
const { keys, buffer } = entry
const block = { position: 0, length: 0, buffer }
for (const key of keys.map(key => Keyify.stringify(key)) ) {
this._blocks[log.id][key] || (this._blocks[log.id][key] = [])
this._blocks[log.id][key].push(block)
}
blocks.push({ keys, block })
}
return this._fracture.enqueue(stack, 'write', value => {
value.blocks.push.apply(value.blocks, blocks)
value.sync = sync
})
}
async _write (blocks) {
if (this.deferrable.errored) {
throw new Error
}
WriteAhead.Error.assert(this._open != null, 'NO_LOGS')
const records = []
for (const { keys, block } of blocks) {
const record = (this._recorder)([[ Buffer.from(JSON.stringify(keys)) ], [ block.buffer ]])
records.push(record)
block.position = this._position
block.length = record.length
this._position += record.length
}
if (records.length != 0) {
await Operation.writev(this._open, records)
}
for (const { block } of blocks) {
block.buffer = null
}
this.writing = false
}
async _background ({ canceled, key, value, pause }) {
switch (key) {
case 'write': {
await this._write(value.blocks)
if (value.sync) {
await this.sync.sync(this._open)
}
}
break
case 'rotate': {
const write = await pause('write')
try {
// Before we do anything asynchronous, we need to ensure that new
// writes are queued where the new log is supposed to be.
const log = {
id: this._logs.length == 0 ? 0 : this._logs[this._logs.length - 1].id + 1,
shifted: false,
sequester: new Sequester
}
this._logs.push(log)
this._blocks[log.id] = {}
const gathered = []
for (const value of write.values) {
gathered.push(value.blocks)
value.blocks = []
}
for (const blocks of gathered) {
await this._write(blocks)
}
this._position = 0
if (this._logs.length != 1) {
const open = this._open
this._open = null
await this.sync.sync(open)
await Operation.close(open)
}
this._open = await Operation.open(path.join(this.directory, String(log.id)), this.sync.flag)
} finally {
write.resume()
}
}
break
case 'shift': {
// TODO Not deleting blocks! Leak!
if (this._logs.length != 0) {
await this._logs[0].sequester.exclude()
const log = this._logs.shift()
log.shifted = true
log.sequester.unlock()
if (this._logs.length == 0) {
const open = this._open
this._open = null
await this.sync.sync(open)
await Operation.close(open)
}
const filename = path.join(this.directory, String(log.id))
await WriteAhead.Error.resolve(fs.unlink(filename), 'IO_ERROR', { filename })
}
}
break
}
}
rotate (stack) {
return this._fracture.enqueue(stack, 'rotate')
}
shift (stack) {
return this._fracture.enqueue(stack, 'shift')
}
}
module.exports = WriteAhead