Skip to content

Commit

Permalink
Server: Lazy-load storage drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
laurent22 committed Nov 10, 2021
1 parent 4deeed0 commit 7431da9
Show file tree
Hide file tree
Showing 14 changed files with 192 additions and 152 deletions.
Binary file modified packages/server/schema.sqlite
Binary file not shown.
16 changes: 4 additions & 12 deletions packages/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as Koa from 'koa';
import * as fs from 'fs-extra';
import Logger, { LoggerWrapper, TargetType } from '@joplin/lib/Logger';
import config, { initConfig, runningInDocker } from './config';
import { migrateLatest, waitForConnection, sqliteDefaultDir, latestMigration, DbConnection } from './db';
import { migrateLatest, waitForConnection, sqliteDefaultDir, latestMigration } from './db';
import { AppContext, Env, KoaNext } from './utils/types';
import FsDriverNode from '@joplin/lib/fs-driver-node';
import routeHandler from './middleware/routeHandler';
Expand All @@ -17,11 +17,10 @@ import startServices from './utils/startServices';
import { credentialFile } from './utils/testing/testUtils';
import apiVersionHandler from './middleware/apiVersionHandler';
import clickJackingHandler from './middleware/clickJackingHandler';
import newModelFactory, { Options } from './models/factory';
import newModelFactory from './models/factory';
import setupCommands from './utils/setupCommands';
import { RouteResponseFormat, routeResponseFormat } from './utils/routeUtils';
import { parseEnv } from './env';
import storageDriverFromConfig from './models/items/storage/storageDriverFromConfig';

interface Argv {
env?: Env;
Expand Down Expand Up @@ -222,13 +221,6 @@ async function main() {
fs.writeFileSync(pidFile, `${process.pid}`);
}

const newModelFactoryOptions = async (db: DbConnection): Promise<Options> => {
return {
storageDriver: await storageDriverFromConfig(config().storageDriver, db, { assignDriverId: env !== 'buildTypes' }),
storageDriverFallback: await storageDriverFromConfig(config().storageDriverFallback, db, { assignDriverId: env !== 'buildTypes' }),
};
};

let runCommandAndExitApp = true;

if (selectedCommand) {
Expand All @@ -245,7 +237,7 @@ async function main() {
});
} else {
const connectionCheck = await waitForConnection(config().database);
const models = newModelFactory(connectionCheck.connection, config(), await newModelFactoryOptions(connectionCheck.connection));
const models = newModelFactory(connectionCheck.connection, config());

await selectedCommand.run(commandArgv, {
db: connectionCheck.connection,
Expand Down Expand Up @@ -275,7 +267,7 @@ async function main() {
appLogger().info('Connection check:', connectionCheckLogInfo);
const ctx = app.context as AppContext;

await setupAppContext(ctx, env, connectionCheck.connection, appLogger, await newModelFactoryOptions(connectionCheck.connection));
await setupAppContext(ctx, env, connectionCheck.connection, appLogger);

await initializeJoplinUtils(config(), ctx.joplinBase.models, ctx.joplinBase.services.mustache);

Expand Down
10 changes: 10 additions & 0 deletions packages/server/src/migrations/20211105183559_storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ export async function up(db: DbConnection): Promise<any> {
await db.schema.createTable('storages', (table: Knex.CreateTableBuilder) => {
table.increments('id').unique().primary().notNullable();
table.text('connection_string').notNullable();
table.bigInteger('updated_time').notNullable();
table.bigInteger('created_time').notNullable();
});

const now = Date.now();

await db('storages').insert({
connection_string: 'Type=Database',
updated_time: now,
created_time: now,
});

// First we create the column and set a default so as to populate the
Expand All @@ -21,6 +27,10 @@ export async function up(db: DbConnection): Promise<any> {
await db.schema.alterTable('items', (table: Knex.CreateTableBuilder) => {
table.integer('content_storage_id').notNullable().alter();
});

await db.schema.alterTable('storages', (table: Knex.CreateTableBuilder) => {
table.unique(['connection_string']);
});
}

export async function down(db: DbConnection): Promise<any> {
Expand Down
77 changes: 55 additions & 22 deletions packages/server/src/models/ItemModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import { ChangePreviousItem } from './ChangeModel';
import { unique } from '../utils/array';
import StorageDriverBase, { Context } from './items/storage/StorageDriverBase';
import { DbConnection } from '../db';
import { Config, StorageDriverMode } from '../utils/types';
import { NewModelFactoryHandler, Options } from './factory';
import { Config, StorageDriverConfig, StorageDriverMode } from '../utils/types';
import { NewModelFactoryHandler } from './factory';
import storageDriverFromConfig from './items/storage/storageDriverFromConfig';

const mimeUtils = require('@joplin/lib/mime-utils.js').mime;

Expand Down Expand Up @@ -49,14 +50,16 @@ export interface ItemLoadOptions extends LoadOptions {
export default class ItemModel extends BaseModel<Item> {

private updatingTotalSizes_: boolean = false;
private storageDriver_: StorageDriverBase = null;
private storageDriverFallback_: StorageDriverBase = null;
private storageDriverConfig_: StorageDriverConfig;
private storageDriverConfigFallback_: StorageDriverConfig;

public constructor(db: DbConnection, modelFactory: NewModelFactoryHandler, config: Config, options: Options) {
private static storageDrivers_: Map<StorageDriverConfig, StorageDriverBase> = new Map();

public constructor(db: DbConnection, modelFactory: NewModelFactoryHandler, config: Config) {
super(db, modelFactory, config);

this.storageDriver_ = options.storageDriver;
this.storageDriverFallback_ = options.storageDriverFallback;
this.storageDriverConfig_ = config.storageDriver;
this.storageDriverConfigFallback_ = config.storageDriverFallback;
}

protected get tableName(): string {
Expand All @@ -75,6 +78,26 @@ export default class ItemModel extends BaseModel<Item> {
return Object.keys(databaseSchema[this.tableName]).filter(f => f !== 'content');
}

private async storageDriverFromConfig(config: StorageDriverConfig): Promise<StorageDriverBase> {
let driver = ItemModel.storageDrivers_.get(config);

if (!driver) {
driver = await storageDriverFromConfig(config, this.db);
ItemModel.storageDrivers_.set(config, driver);
}

return driver;
}

public async storageDriver(): Promise<StorageDriverBase> {
return this.storageDriverFromConfig(this.storageDriverConfig_);
}

public async storageDriverFallback(): Promise<StorageDriverBase> {
if (!this.storageDriverConfigFallback_) return null;
return this.storageDriverFromConfig(this.storageDriverConfigFallback_);
}

public async checkIfAllowed(user: User, action: AclAction, resource: Item = null): Promise<void> {
if (action === AclAction.Create) {
if (!(await this.models().shareUser().isShareParticipant(resource.jop_share_id, user.id))) throw new ErrorForbidden('user has no access to this share');
Expand Down Expand Up @@ -136,25 +159,31 @@ export default class ItemModel extends BaseModel<Item> {
}

private async storageDriverWrite(itemId: Uuid, content: Buffer, context: Context) {
await this.storageDriver_.write(itemId, content, context);
const storageDriver = await this.storageDriver();
const storageDriverFallback = await this.storageDriverFallback();

await storageDriver.write(itemId, content, context);

if (this.storageDriverFallback_) {
if (this.storageDriverFallback_.mode === StorageDriverMode.ReadWrite) {
await this.storageDriverFallback_.write(itemId, content, context);
} else if (this.storageDriverFallback_.mode === StorageDriverMode.ReadOnly) {
await this.storageDriverFallback_.write(itemId, Buffer.from(''), context);
if (storageDriverFallback) {
if (storageDriverFallback.mode === StorageDriverMode.ReadWrite) {
await storageDriverFallback.write(itemId, content, context);
} else if (storageDriverFallback.mode === StorageDriverMode.ReadOnly) {
await storageDriverFallback.write(itemId, Buffer.from(''), context);
} else {
throw new Error(`Unsupported fallback mode: ${this.storageDriverFallback_.mode}`);
throw new Error(`Unsupported fallback mode: ${storageDriverFallback.mode}`);
}
}
}

private async storageDriverRead(itemId: Uuid, context: Context) {
if (await this.storageDriver_.exists(itemId, context)) {
return this.storageDriver_.read(itemId, context);
const storageDriver = await this.storageDriver();
const storageDriverFallback = await this.storageDriverFallback();

if (await storageDriver.exists(itemId, context)) {
return storageDriver.read(itemId, context);
} else {
if (!this.storageDriverFallback_) throw new Error(`Content does not exist but fallback content driver is not defined: ${itemId}`);
return this.storageDriverFallback_.read(itemId, context);
if (!storageDriverFallback) throw new Error(`Content does not exist but fallback content driver is not defined: ${itemId}`);
return storageDriverFallback.read(itemId, context);
}
}

Expand Down Expand Up @@ -417,7 +446,8 @@ export default class ItemModel extends BaseModel<Item> {
try {
const content = itemToSave.content;
delete itemToSave.content;
itemToSave.content_storage_id = this.storageDriver_.storageId;

itemToSave.content_storage_id = (await this.storageDriver()).storageId;

itemToSave.content_size = content ? content.byteLength : 0;

Expand Down Expand Up @@ -624,14 +654,17 @@ export default class ItemModel extends BaseModel<Item> {
const ids = typeof id === 'string' ? [id] : id;
if (!ids.length) return;

const storageDriver = await this.storageDriver();
const storageDriverFallback = await this.storageDriverFallback();

const shares = await this.models().share().byItemIds(ids);

await this.withTransaction(async () => {
await this.models().share().delete(shares.map(s => s.id));
await this.models().userItem().deleteByItemIds(ids);
await this.models().itemResource().deleteByItemIds(ids);
await this.storageDriver_.delete(ids, { models: this.models() });
if (this.storageDriverFallback_) await this.storageDriverFallback_.delete(ids, { models: this.models() });
await storageDriver.delete(ids, { models: this.models() });
if (storageDriverFallback) await storageDriverFallback.delete(ids, { models: this.models() });

await super.delete(ids, options);
}, 'ItemModel::delete');
Expand Down Expand Up @@ -679,7 +712,7 @@ export default class ItemModel extends BaseModel<Item> {
let previousItem: ChangePreviousItem = null;

if (item.content && !item.content_storage_id) {
item.content_storage_id = this.storageDriver_.storageId;
item.content_storage_id = (await this.storageDriver()).storageId;
}

if (isNew) {
Expand Down
20 changes: 5 additions & 15 deletions packages/server/src/models/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,39 +72,29 @@ import SubscriptionModel from './SubscriptionModel';
import UserFlagModel from './UserFlagModel';
import EventModel from './EventModel';
import { Config } from '../utils/types';
import StorageDriverBase from './items/storage/StorageDriverBase';
import LockModel from './LockModel';
import StorageModel from './StorageModel';

export interface Options {
storageDriver: StorageDriverBase;
storageDriverFallback?: StorageDriverBase;
}

export type NewModelFactoryHandler = (db: DbConnection)=> Models;

export class Models {

private db_: DbConnection;
private config_: Config;
private options_: Options;

public constructor(db: DbConnection, config: Config, options: Options) {
public constructor(db: DbConnection, config: Config) {
this.db_ = db;
this.config_ = config;
this.options_ = options;

// if (!options.storageDriver) throw new Error('StorageDriver is required');

this.newModelFactory = this.newModelFactory.bind(this);
}

private newModelFactory(db: DbConnection) {
return new Models(db, this.config_, this.options_);
return new Models(db, this.config_);
}

public item() {
return new ItemModel(this.db_, this.newModelFactory, this.config_, this.options_);
return new ItemModel(this.db_, this.newModelFactory, this.config_);
}

public user() {
Expand Down Expand Up @@ -177,6 +167,6 @@ export class Models {

}

export default function newModelFactory(db: DbConnection, config: Config, options: Options): Models {
return new Models(db, config, options);
export default function newModelFactory(db: DbConnection, config: Config): Models {
return new Models(db, config);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { clientType } from '../../../db';
import { afterAllTests, beforeAllDb, beforeEachDb, db, expectNotThrow, expectThrow, models } from '../../../utils/testing/testUtils';
import { StorageDriverMode } from '../../../utils/types';
import { StorageDriverConfig, StorageDriverMode, StorageDriverType } from '../../../utils/types';
import StorageDriverDatabase from './StorageDriverDatabase';
import StorageDriverMemory from './StorageDriverMemory';
import { shouldDeleteContent, shouldNotCreateItemIfContentNotSaved, shouldNotUpdateItemIfContentNotSaved, shouldSupportFallbackDriver, shouldSupportFallbackDriverInReadWriteMode, shouldUpdateContentStorageIdAfterSwitchingDriver, shouldWriteToContentAndReadItBack } from './testUtils';

const newDriver = () => {
Expand All @@ -11,6 +10,12 @@ const newDriver = () => {
});
};

const newConfig = (): StorageDriverConfig => {
return {
type: StorageDriverType.Database,
};
};

describe('StorageDriverDatabase', function() {

beforeAll(async () => {
Expand All @@ -26,23 +31,19 @@ describe('StorageDriverDatabase', function() {
});

test('should write to content and read it back', async function() {
const driver = newDriver();
await shouldWriteToContentAndReadItBack(driver);
await shouldWriteToContentAndReadItBack(newConfig());
});

test('should delete the content', async function() {
const driver = newDriver();
await shouldDeleteContent(driver);
await shouldDeleteContent(newConfig());
});

test('should not create the item if the content cannot be saved', async function() {
const driver = newDriver();
await shouldNotCreateItemIfContentNotSaved(driver);
await shouldNotCreateItemIfContentNotSaved(newConfig());
});

test('should not update the item if the content cannot be saved', async function() {
const driver = newDriver();
await shouldNotUpdateItemIfContentNotSaved(driver);
await shouldNotUpdateItemIfContentNotSaved(newConfig());
});

test('should fail if the item row does not exist', async function() {
Expand All @@ -56,15 +57,15 @@ describe('StorageDriverDatabase', function() {
});

test('should support fallback content drivers', async function() {
await shouldSupportFallbackDriver(newDriver(), new StorageDriverMemory(2));
await shouldSupportFallbackDriver(newConfig(), { type: StorageDriverType.Memory });
});

test('should support fallback content drivers in rw mode', async function() {
await shouldSupportFallbackDriverInReadWriteMode(newDriver(), new StorageDriverMemory(2, { mode: StorageDriverMode.ReadWrite }));
await shouldSupportFallbackDriverInReadWriteMode(newConfig(), { type: StorageDriverType.Memory, mode: StorageDriverMode.ReadWrite });
});

test('should update content storage ID after switching driver', async function() {
await shouldUpdateContentStorageIdAfterSwitchingDriver(newDriver(), new StorageDriverMemory(2));
await shouldUpdateContentStorageIdAfterSwitchingDriver(newConfig(), { type: StorageDriverType.Memory });
});

});
20 changes: 12 additions & 8 deletions packages/server/src/models/items/storage/StorageDriverFs.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { pathExists, remove } from 'fs-extra';
import { afterAllTests, beforeAllDb, beforeEachDb, expectNotThrow, expectThrow, tempDirPath } from '../../../utils/testing/testUtils';
import { StorageDriverConfig, StorageDriverType } from '../../../utils/types';
import StorageDriverFs from './StorageDriverFs';
import { shouldDeleteContent, shouldNotCreateItemIfContentNotSaved, shouldNotUpdateItemIfContentNotSaved, shouldWriteToContentAndReadItBack } from './testUtils';

Expand All @@ -9,6 +10,13 @@ const newDriver = () => {
return new StorageDriverFs(1, { path: basePath_ });
};

const newConfig = (): StorageDriverConfig => {
return {
type: StorageDriverType.Filesystem,
path: basePath_,
};
};

describe('StorageDriverFs', function() {

beforeAll(async () => {
Expand All @@ -30,23 +38,19 @@ describe('StorageDriverFs', function() {
});

test('should write to content and read it back', async function() {
const driver = newDriver();
await shouldWriteToContentAndReadItBack(driver);
await shouldWriteToContentAndReadItBack(newConfig());
});

test('should delete the content', async function() {
const driver = newDriver();
await shouldDeleteContent(driver);
await shouldDeleteContent(newConfig());
});

test('should not create the item if the content cannot be saved', async function() {
const driver = newDriver();
await shouldNotCreateItemIfContentNotSaved(driver);
await shouldNotCreateItemIfContentNotSaved(newConfig());
});

test('should not update the item if the content cannot be saved', async function() {
const driver = newDriver();
await shouldNotUpdateItemIfContentNotSaved(driver);
await shouldNotUpdateItemIfContentNotSaved(newConfig());
});

test('should write to a file and read it back', async function() {
Expand Down
Loading

0 comments on commit 7431da9

Please sign in to comment.