Skip to content

Commit

Permalink
💥 Comply with sharedb spec
Browse files Browse the repository at this point in the history
This is a **BREAKING** change that:

 - adds tests against the [upstream `sharedb` DB test suite][1]
 - adds a CI build for the tests against current Node.js and Postgres
   versions
 - breaks the API to conform to the upstream tests, including adding
   metadata support

The breaks are:

 - Dropping non-null constraints on `snapshots.doc_type` and
   `snapshots.data`
 - Adding a new `snapshots.metadata` `json` column
 - Respecting `options.metadata` and `fields.$submit`, which were
   previously ignored on `getOps()`, and useless on `getSnapshot()`
   (which didn't store metadata)
 - `snapshot.m` is now `undefined` if not present, or `null` if
   unrequested (inline with the spec)

On top of this it also makes some bugfixes to conform to the spec:

 - Ignore unique key validations when committing, since this may happen
   during concurrent commits
 - `JSON.stringify()` JSON fields, which [break][2] if passed a raw
   array
 - Default `from = 0` if unset in `getOps()`

[1]: https://github.com/share/sharedb/blob/7abe65049add9b58e1df638aa34e7ca2c0a1fcfa/test/db.js#L25
[2]: brianc/node-postgres#442
  • Loading branch information
alecgibson committed Jun 10, 2024
1 parent 4b5b537 commit dda96e5
Show file tree
Hide file tree
Showing 8 changed files with 2,293 additions and 347 deletions.
49 changes: 49 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: Test

on:
push:
branches:
- main
- setup-ci # TODO: Remove
pull_request:
branches:
- main

jobs:
test:
name: Node.js ${{ matrix.node }} + PostgreSQL ${{ matrix.postgres }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
node:
- 16
- 18
- 20
postgres:
- 13
- 14
- 15
- 16
services:
postgres:
image: postgres:${{ matrix.postgres }}
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node }}
- name: Install
run: npm install
- name: Test
run: npm test
5 changes: 5 additions & 0 deletions .mocharc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
timeout: 5_000,
file: './test/setup.js',
spec: '**/*.spec.js',
};
191 changes: 86 additions & 105 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
var DB = require('sharedb').DB;
var pg = require('pg');

const PG_UNIQUE_VIOLATION = 23505;

// Postgres-backed ShareDB database

function PostgresDB(options) {
Expand All @@ -9,24 +11,33 @@ function PostgresDB(options) {

this.closed = false;

this.pg_config = options;
this.pool = new pg.Pool(options);
this._pool = new pg.Pool(options);
};
module.exports = PostgresDB;

PostgresDB.prototype = Object.create(DB.prototype);

PostgresDB.prototype.close = function(callback) {
this.closed = true;
this.pool.end();

if (callback) callback();
PostgresDB.prototype.close = async function(callback) {
let error;
try {
if (!this.closed) {
this.closed = true;
await this._pool.end();
}
} catch (err) {
error = err;
}

// FIXME: Don't swallow errors. Emit 'error' event?
if (callback) callback(error);
};


// Persists an op and snapshot if it is for the next version. Calls back with
// callback(err, succeeded)
PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, callback) {
PostgresDB.prototype.commit = async function(collection, id, op, snapshot, options, callback) {
let client;
try {
/*
* op: CreateOp {
* src: '24545654654646',
Expand All @@ -37,34 +48,29 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
* }
* snapshot: PostgresSnapshot
*/
this.pool.connect((err, client, done) => {
if (err) {
done(client);
callback(err);
return;
}
client = await this._pool.connect();
/*
* This query uses common table expression to upsert the snapshot table
* This query uses common table expression to upsert the snapshot table
* (iff the new version is exactly 1 more than the latest table or if
* the document id does not exists)
*
* It will then insert into the ops table if it is exactly 1 more than the
* It will then insert into the ops table if it is exactly 1 more than the
* latest table or it the first operation and iff the previous insert into
* the snapshot table is successful.
*
* This result of this query the version of the newly inserted operation
* If either the ops or the snapshot insert fails then 0 rows are returned
*
* If 0 zeros are return then the callback must return false
* If 0 zeros are return then the callback must return false
*
* Casting is required as postgres thinks that collection and doc_id are
* not varchar
*/
* not varchar
*/
const query = {
name: 'sdb-commit-op-and-snap',
text: `WITH snapshot_id AS (
INSERT INTO snapshots (collection, doc_id, doc_type, version, data)
SELECT $1::varchar collection, $2::varchar doc_id, $4 doc_type, $3 v, $5 d
INSERT INTO snapshots (collection, doc_id, version, doc_type, data, metadata)
SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $4 doc_type, $5 d, $6 m
WHERE $3 = (
SELECT version+1 v
FROM snapshots
Expand All @@ -76,11 +82,11 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
WHERE collection = $1 AND doc_id = $2
FOR UPDATE
)
ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4
ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4, metadata = $5
RETURNING version
)
INSERT INTO ops (collection, doc_id, version, operation)
SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $6 operation
SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $7 operation
WHERE (
$3 = (
SELECT max(version)+1
Expand All @@ -93,66 +99,53 @@ WHERE (
)
) AND EXISTS (SELECT 1 FROM snapshot_id)
RETURNING version`,
values: [collection,id,snapshot.v, snapshot.type, snapshot.data,op]
values: [collection, id, snapshot.v, snapshot.type, JSON.stringify(snapshot.data), JSON.stringify(snapshot.m), JSON.stringify(op)]
}
client.query(query, (err, res) => {
if (err) {
callback(err)
} else if(res.rows.length === 0) {
done(client);
callback(null,false)
}
else {
done(client);
callback(null,true)
}
})

})
const result = await client.query(query);
const success = result.rowCount > 0;
callback(null, success);
} catch (error) {
// Return non-success instead of duplicate key error, since this is
// expected to occur during simultaneous creates on the same id
if (error.code === PG_UNIQUE_VIOLATION) callback(null, false);
else callback(error);
} finally {
if (client) client.release(true);
}
};

// Get the named document from the database. The callback is called with (err,
// snapshot). A snapshot with a version of zero is returned if the docuemnt
// has never been created in the database.
PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, callback) {
this.pool.connect(function(err, client, done) {
if (err) {
done(client);
callback(err);
return;
}
client.query(
'SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1',
PostgresDB.prototype.getSnapshot = async function(collection, id, fields, options, callback) {
fields ||= {};
options ||= {};
const wantsMetadata = fields.$submit || options.metadata;
let client;
try {
client = await this._pool.connect();
const result = await client.query(
'SELECT version, data, doc_type, metadata FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1',
[collection, id],
function(err, res) {
done();
if (err) {
callback(err);
return;
}
if (res.rows.length) {
var row = res.rows[0]
var snapshot = new PostgresSnapshot(
id,
row.version,
row.doc_type,
row.data,
undefined // TODO: metadata
)
callback(null, snapshot);
} else {
var snapshot = new PostgresSnapshot(
id,
0,
null,
undefined,
undefined
)
callback(null, snapshot);
}
}
)
})
);

var row = result.rows[0]
const snapshot = {
id,
v: row?.version || 0,
type: row?.doc_type || null,
data: row?.data || undefined,
m: wantsMetadata ?
// Postgres returns null but ShareDB expects undefined
(row?.metadata || undefined) :
null,
};
callback(null, snapshot);
} catch (error) {
callback(error);
} finally {
client.release(true);
}
};

// Get operations between [from, to) noninclusively. (Ie, the range should
Expand All @@ -164,37 +157,25 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
// The version will be inferred from the parameters if it is missing.
//
// Callback should be called as callback(error, [list of ops]);
PostgresDB.prototype.getOps = function(collection, id, from, to, options, callback) {
this.pool.connect(function(err, client, done) {
if (err) {
done(client);
callback(err);
return;
}

PostgresDB.prototype.getOps = async function(collection, id, from, to, options, callback) {
from ||= 0;
options ||= {};
const wantsMetadata = options.metadata;
let client;
try {
client = await this._pool.connect();
var cmd = 'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version > $3 ';
var params = [collection, id, from];
if(to || to == 0) { cmd += ' AND version <= $4'; params.push(to)}
cmd += ' order by version';
client.query( cmd, params,
function(err, res) {
done();
if (err) {
callback(err);
return;
}
callback(null, res.rows.map(function(row) {
return row.operation;
}));
}
)
})
const result = await client.query(cmd, params);
callback(null, result.rows.map(({operation}) => {
if (!wantsMetadata) delete operation.m;
return operation;
}));
} catch (error) {
callback(error);
} finally {
client.release(true);
}
};

function PostgresSnapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
this.m = meta;
}
37 changes: 37 additions & 0 deletions index.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const PostgresDB = require('.');
const {Pool} = require('pg');
const fs = require('node:fs');

const DB_NAME = 'sharedbtest';

function create(callback) {
var db = new PostgresDB({database: DB_NAME});
callback(null, db);
};

describe('PostgresDB', function() {
let pool;
let client;

beforeEach(async () => {
pool = new Pool({database: 'postgres'});
client = await pool.connect();
await client.query(`DROP DATABASE IF EXISTS ${DB_NAME}`);
await client.query(`CREATE DATABASE ${DB_NAME}`);

const testPool = new Pool({database: DB_NAME});
const testClient = await testPool.connect();
const structure = fs.readFileSync('./structure.sql', 'utf8');
await testClient.query(structure);
await testClient.release(true);
await testPool.end();
});

afterEach(async function() {
await client.query(`DROP DATABASE IF EXISTS ${DB_NAME}`);
await client.release(true);
await pool.end();
});

require('sharedb/test/db')({create: create});
});
Loading

0 comments on commit dda96e5

Please sign in to comment.