diff --git a/src/shared/modules/commands/helpers/cypher.js b/src/shared/modules/commands/helpers/cypher.js index b63d829718b..d80397074a5 100644 --- a/src/shared/modules/commands/helpers/cypher.js +++ b/src/shared/modules/commands/helpers/cypher.js @@ -27,13 +27,11 @@ export const handleCypherCommand = ( params = {}, shouldUseCypherThread = false ) => { - const [id, request] = bolt.routedWriteTransaction( - action.cmd, - params, - action.requestId, - true, - shouldUseCypherThread - ) + const [id, request] = bolt.routedWriteTransaction(action.cmd, params, { + useCypherThread: shouldUseCypherThread, + requestId: action.requestId, + cancelable: true + }) put(send('cypher', id)) return [id, request] } diff --git a/src/shared/modules/cypher/cypherDuck.js b/src/shared/modules/cypher/cypherDuck.js index f6c58990977..e4c12ad8364 100644 --- a/src/shared/modules/cypher/cypherDuck.js +++ b/src/shared/modules/cypher/cypherDuck.js @@ -25,6 +25,7 @@ import { getActiveConnectionData } from 'shared/modules/connections/connectionsD import { getCausalClusterAddresses } from './queriesProcedureHelper' import { getEncryptionMode } from 'services/bolt/boltHelpers' import { flatten } from 'services/utils' +import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck' const NAME = 'cypher' export const CYPHER_REQUEST = NAME + '/REQUEST' @@ -68,7 +69,9 @@ export const cypherRequestEpic = (some$, store) => some$.ofType(CYPHER_REQUEST).mergeMap(action => { if (!action.$$responseChannel) return Rx.Observable.of(null) return bolt - .directTransaction(action.query, action.params || undefined) + .directTransaction(action.query, action.params || undefined, { + useCypherThread: shouldUseCypherThread(store.getState()) + }) .then(r => ({ type: action.$$responseChannel, success: true, result: r })) .catch(e => ({ type: action.$$responseChannel, @@ -90,7 +93,11 @@ export const clusterCypherRequestEpic = (some$, store) => .mergeMap(action => { if (!action.$$responseChannel) return Rx.Observable.of(null) return bolt - .directTransaction(getCausalClusterAddresses) + .directTransaction( + getCausalClusterAddresses, + {}, + { useCypherThread: shouldUseCypherThread(store.getState()) } + ) .then(res => { const addresses = flatten( res.records.map(record => record.get('addresses')) diff --git a/src/shared/modules/dbMeta/dbMetaDuck.js b/src/shared/modules/dbMeta/dbMetaDuck.js index 7180a900c07..cc0741534c2 100644 --- a/src/shared/modules/dbMeta/dbMetaDuck.js +++ b/src/shared/modules/dbMeta/dbMetaDuck.js @@ -33,6 +33,7 @@ import { setRetainCredentials, setAuthEnabled } from 'shared/modules/connections/connectionsDuck' +import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck' export const NAME = 'meta' export const UPDATE = 'meta/UPDATE' @@ -279,7 +280,15 @@ export const dbMetaEpic = (some$, store) => // Labels, types and propertyKeys .mergeMap(() => Rx.Observable - .fromPromise(bolt.routedReadTransaction(metaQuery)) + .fromPromise( + bolt.routedReadTransaction( + metaQuery, + {}, + { + useCypherThread: shouldUseCypherThread(store.getState()) + } + ) + ) .catch(e => Rx.Observable.of(null)) ) .filter(r => r) @@ -372,7 +381,11 @@ export const dbMetaEpic = (some$, store) => .mergeMap(() => Rx.Observable .fromPromise( - bolt.directTransaction('CALL dbms.cluster.role() YIELD role') + bolt.directTransaction( + 'CALL dbms.cluster.role() YIELD role', + {}, + { useCypherThread: shouldUseCypherThread(store.getState()) } + ) ) .catch(e => Rx.Observable.of(null)) .do(res => { diff --git a/src/shared/modules/features/featuresDuck.js b/src/shared/modules/features/featuresDuck.js index 8c7bf5a5a10..eb42e2821aa 100644 --- a/src/shared/modules/features/featuresDuck.js +++ b/src/shared/modules/features/featuresDuck.js @@ -21,6 +21,7 @@ import bolt from 'services/bolt/bolt' import { APP_START, WEB } from 'shared/modules/app/appDuck' import { CONNECTION_SUCCESS } from 'shared/modules/connections/connectionsDuck' +import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck' export const NAME = 'features' export const RESET = 'features/RESET' @@ -66,7 +67,11 @@ export const featuresDiscoveryEpic = (action$, store) => { .ofType(CONNECTION_SUCCESS) .mergeMap(() => { return bolt - .routedReadTransaction('CALL dbms.procedures YIELD name') + .routedReadTransaction( + 'CALL dbms.procedures YIELD name', + {}, + { useCypherThread: shouldUseCypherThread(store.getState()) } + ) .then(res => { store.dispatch( updateFeatures(res.records.map(record => record.get('name'))) diff --git a/src/shared/modules/jmx/jmxDuck.js b/src/shared/modules/jmx/jmxDuck.js index 2dd23f07c65..6aacb71215a 100644 --- a/src/shared/modules/jmx/jmxDuck.js +++ b/src/shared/modules/jmx/jmxDuck.js @@ -31,6 +31,7 @@ import { connectionLossFilter } from 'shared/modules/connections/connectionsDuck' import { FORCE_FETCH } from 'shared/modules/dbMeta/dbMetaDuck' +import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck' export const NAME = 'jmx' export const UPDATE = NAME + '/UPDATE' @@ -58,9 +59,13 @@ export const getJmxValues = ({ jmx }, arr) => { * Helpers */ -const fetchJmxValues = () => { +const fetchJmxValues = store => { return bolt - .directTransaction('CALL dbms.queryJmx("org.neo4j:*")') + .directTransaction( + 'CALL dbms.queryJmx("org.neo4j:*")', + {}, + { useCypherThread: shouldUseCypherThread(store.getState()) } + ) .then(res => { const converters = { intChecker: bolt.neo4j.isInt, @@ -120,7 +125,7 @@ export const jmxEpic = (some$, store) => .merge(some$.ofType(FORCE_FETCH)) .mergeMap(() => Rx.Observable - .fromPromise(fetchJmxValues()) + .fromPromise(fetchJmxValues(store)) .catch(e => Rx.Observable.of(null)) ) .filter(r => r) diff --git a/src/shared/services/bolt/bolt.js b/src/shared/services/bolt/bolt.js index 7b938564f84..1557f807d8e 100644 --- a/src/shared/services/bolt/bolt.js +++ b/src/shared/services/bolt/bolt.js @@ -29,7 +29,6 @@ import { CYPHER_RESPONSE_MESSAGE, POST_CANCEL_TRANSACTION_MESSAGE } from './boltWorkerMessages' - /* eslint-disable import/no-webpack-loader-syntax */ import BoltWorkerModule from 'worker-loader?inline!./boltWorker.js' /* eslint-enable import/no-webpack-loader-syntax */ @@ -67,62 +66,91 @@ function cancelTransaction (id, cb) { } } -function routedWriteTransaction ( - input, - parameters, - requestId = null, - cancelable = false, - useCypherThread -) { +function routedWriteTransaction (input, parameters, requestMetaData = {}) { + const { + useCypherThread = false, + requestId = null, + cancelable = false + } = requestMetaData if (useCypherThread && window.Worker) { const id = requestId || v4() - const boltWorker = new BoltWorkerModule() - boltWorkerRegister[id] = boltWorker - - const workerFinalizer = getWorkerFinalizer( - boltWorkerRegister, - cancellationRegister, - id + const workFn = runCypherMessage( + input, + parameters, + boltConnection.ROUTED_WRITE_CONNECTION, + id, + cancelable, + { + ...connectionProperties, + inheritedUseRouting: boltConnection.useRouting() + } ) + const workerPromise = setupBoltWorker(id, workFn) + return [id, workerPromise] + } else { + return boltConnection.routedWriteTransaction( + input, + parameters, + requestId, + cancelable + ) + } +} - const workerPromise = new Promise((resolve, reject) => { - boltWorker.postMessage( - runCypherMessage(input, parameters, id, cancelable, { - ...connectionProperties, - inheritedUseRouting: boltConnection.useRouting() - }) - ) - boltWorker.onmessage = msg => { - if (msg.data.type === CYPHER_ERROR_MESSAGE) { - workerFinalizer(boltWorker) - reject(msg.data.error) - } else if (msg.data.type === CYPHER_RESPONSE_MESSAGE) { - let records = msg.data.result.records.map(record => { - const typedRecord = new neo4j.types.Record( - record.keys, - record._fields, - record._fieldLookup - ) - if (typedRecord._fields) { - typedRecord._fields = mappings.applyGraphTypes( - typedRecord._fields - ) - } - return typedRecord - }) - - let summary = mappings.applyGraphTypes(msg.data.result.summary) - workerFinalizer(boltWorker) - resolve({ summary, records }) - } else if (msg.data.type === POST_CANCEL_TRANSACTION_MESSAGE) { - workerFinalizer(boltWorker) - } +function routedReadTransaction (input, parameters, requestMetaData = {}) { + const { + useCypherThread = false, + requestId = null, + cancelable = false + } = requestMetaData + if (useCypherThread && window.Worker) { + const id = requestId || v4() + const workFn = runCypherMessage( + input, + parameters, + boltConnection.ROUTED_READ_CONNECTION, + id, + cancelable, + { + ...connectionProperties, + inheritedUseRouting: boltConnection.useRouting() } - }) + ) + const workerPromise = setupBoltWorker(id, workFn) + return workerPromise + } else { + return boltConnection.routedReadTransaction( + input, + parameters, + requestId, + cancelable + ) + } +} - return [id, workerPromise] +function directTransaction (input, parameters, requestMetaData = {}) { + const { + useCypherThread = false, + requestId = null, + cancelable = false + } = requestMetaData + if (useCypherThread && window.Worker) { + const id = requestId || v4() + const workFn = runCypherMessage( + input, + parameters, + boltConnection.DIRECT_CONNECTION, + id, + cancelable, + { + ...connectionProperties, + inheritedUseRouting: false + } + ) + const workerPromise = setupBoltWorker(id, workFn) + return workerPromise } else { - return boltConnection.routedWriteTransaction( + return boltConnection.directTransaction( input, parameters, requestId, @@ -131,6 +159,43 @@ function routedWriteTransaction ( } } +function setupBoltWorker (id, workFn) { + const boltWorker = new BoltWorkerModule() + const onFinished = registerBoltWorker(id, boltWorker) + const workerPromise = new Promise((resolve, reject) => { + boltWorker.postMessage(workFn) + boltWorker.onmessage = msg => { + if (msg.data.type === CYPHER_ERROR_MESSAGE) { + onFinished(boltWorker) + reject(msg.data.error) + } else if (msg.data.type === CYPHER_RESPONSE_MESSAGE) { + let records = msg.data.result.records.map(record => { + const typedRecord = new neo4j.types.Record( + record.keys, + record._fields, + record._fieldLookup + ) + if (typedRecord._fields) { + typedRecord._fields = mappings.applyGraphTypes(typedRecord._fields) + } + return typedRecord + }) + let summary = mappings.applyGraphTypes(msg.data.result.summary) + onFinished(boltWorker) + resolve({ summary, records }) + } else if (msg.data.type === POST_CANCEL_TRANSACTION_MESSAGE) { + onFinished(boltWorker) + } + } + }) + return workerPromise +} + +function registerBoltWorker (id, boltWorker) { + boltWorkerRegister[id] = boltWorker + return getWorkerFinalizer(boltWorkerRegister, cancellationRegister, id) +} + function getWorkerFinalizer (workerRegister, cancellationRegister, workerId) { return worker => { if (cancellationRegister[workerId]) { @@ -153,8 +218,8 @@ export default { connectionProperties = null boltConnection.closeConnection() }, - directTransaction: boltConnection.directTransaction, - routedReadTransaction: boltConnection.routedReadTransaction, + directTransaction, + routedReadTransaction, routedWriteTransaction, cancelTransaction, useRoutingConfig: shouldWe => boltConnection.setUseRoutingConfig(shouldWe), diff --git a/src/shared/services/bolt/boltConnection.js b/src/shared/services/bolt/boltConnection.js index 985bbbc2d41..1a94947046d 100644 --- a/src/shared/services/bolt/boltConnection.js +++ b/src/shared/services/bolt/boltConnection.js @@ -22,6 +22,10 @@ import { v1 as neo4j } from 'neo4j-driver-alias' import { v4 } from 'uuid' import { BoltConnectionError, createErrorObject } from '../exceptions' +export const DIRECT_CONNECTION = 'DIRECT_CONNECTION' +export const ROUTED_WRITE_CONNECTION = 'ROUTED_WRITE_CONNECTION' +export const ROUTED_READ_CONNECTION = 'ROUTED_READ_CONNECTION' + const runningQueryRegister = {} let _drivers = null diff --git a/src/shared/services/bolt/boltWorker.js b/src/shared/services/bolt/boltWorker.js index 3a60cc1412f..acad4c60cb4 100644 --- a/src/shared/services/bolt/boltWorker.js +++ b/src/shared/services/bolt/boltWorker.js @@ -24,7 +24,12 @@ import { ensureConnection, routedWriteTransaction, cancelTransaction, - closeConnection + closeConnection, + routedReadTransaction, + directTransaction, + DIRECT_CONNECTION, + ROUTED_WRITE_CONNECTION, + ROUTED_READ_CONNECTION } from './boltConnection' import { cypherErrorMessage, @@ -34,6 +39,18 @@ import { CANCEL_TRANSACTION_MESSAGE } from './boltWorkerMessages' +const connectionTypeMap = { + [ROUTED_WRITE_CONNECTION]: { + create: routedWriteTransaction, + getPromise: res => res[1] + }, + [ROUTED_READ_CONNECTION]: { + create: routedReadTransaction, + getPromise: res => res + }, + [DIRECT_CONNECTION]: { create: directTransaction, getPromise: res => res } +} + const onmessage = function (message) { const messageType = message.data.type @@ -41,6 +58,7 @@ const onmessage = function (message) { const { input, parameters, + connectionType, requestId, cancelable, connectionProperties @@ -52,13 +70,14 @@ const onmessage = function (message) { ) }) .then(() => { - const [, request] = routedWriteTransaction( + const res = connectionTypeMap[connectionType].create( input, parameters, requestId, cancelable ) - request + connectionTypeMap[connectionType] + .getPromise(res) .then(r => { self.postMessage(cypherResponseMessage(r)) closeConnection() diff --git a/src/shared/services/bolt/boltWorkerMessages.js b/src/shared/services/bolt/boltWorkerMessages.js index b99db2458ef..1dcbdd1beef 100644 --- a/src/shared/services/bolt/boltWorkerMessages.js +++ b/src/shared/services/bolt/boltWorkerMessages.js @@ -18,6 +18,8 @@ * along with this program. If not, see . */ import { recursivelyTypeGraphItems } from './boltMappings' +import { ROUTED_WRITE_CONNECTION } from './boltConnection' + export const RUN_CYPHER_MESSAGE = 'RUN_CYPHER_MESSAGE' export const CANCEL_TRANSACTION_MESSAGE = 'CANCEL_TRANSACTION_MESSAGE' export const CYPHER_ERROR_MESSAGE = 'CYPHER_ERROR_MESSAGE' @@ -27,6 +29,7 @@ export const POST_CANCEL_TRANSACTION_MESSAGE = 'POST_CANCEL_TRANSACTION_MESSAGE' export const runCypherMessage = ( input, parameters, + connectionType = ROUTED_WRITE_CONNECTION, requestId = null, cancelable = false, connectionProperties @@ -35,6 +38,7 @@ export const runCypherMessage = ( type: RUN_CYPHER_MESSAGE, input, parameters, + connectionType, requestId, cancelable, connectionProperties