Skip to content

Commit

Permalink
Merge pull request neo4j#724 from oskarhane/read-ww
Browse files Browse the repository at this point in the history
Make every type of Bolt transactions use web workers
  • Loading branch information
pe4cey authored Mar 22, 2018
2 parents 4d824b8 + 490d906 commit 3186ee9
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 70 deletions.
12 changes: 5 additions & 7 deletions src/shared/modules/commands/helpers/cypher.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ export const handleCypherCommand = (
params = {},
shouldUseCypherThread = false
) => {
const [id, request] = bolt.routedWriteTransaction(
action.cmd,
params,
action.requestId,
true,
shouldUseCypherThread
)
const [id, request] = bolt.routedWriteTransaction(action.cmd, params, {
useCypherThread: shouldUseCypherThread,
requestId: action.requestId,
cancelable: true
})
put(send('cypher', id))
return [id, request]
}
11 changes: 9 additions & 2 deletions src/shared/modules/cypher/cypherDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { getActiveConnectionData } from 'shared/modules/connections/connectionsD
import { getCausalClusterAddresses } from './queriesProcedureHelper'
import { getEncryptionMode } from 'services/bolt/boltHelpers'
import { flatten } from 'services/utils'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'

const NAME = 'cypher'
export const CYPHER_REQUEST = NAME + '/REQUEST'
Expand Down Expand Up @@ -68,7 +69,9 @@ export const cypherRequestEpic = (some$, store) =>
some$.ofType(CYPHER_REQUEST).mergeMap(action => {
if (!action.$$responseChannel) return Rx.Observable.of(null)
return bolt
.directTransaction(action.query, action.params || undefined)
.directTransaction(action.query, action.params || undefined, {
useCypherThread: shouldUseCypherThread(store.getState())
})
.then(r => ({ type: action.$$responseChannel, success: true, result: r }))
.catch(e => ({
type: action.$$responseChannel,
Expand All @@ -90,7 +93,11 @@ export const clusterCypherRequestEpic = (some$, store) =>
.mergeMap(action => {
if (!action.$$responseChannel) return Rx.Observable.of(null)
return bolt
.directTransaction(getCausalClusterAddresses)
.directTransaction(
getCausalClusterAddresses,
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
)
.then(res => {
const addresses = flatten(
res.records.map(record => record.get('addresses'))
Expand Down
17 changes: 15 additions & 2 deletions src/shared/modules/dbMeta/dbMetaDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
setRetainCredentials,
setAuthEnabled
} from 'shared/modules/connections/connectionsDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'

export const NAME = 'meta'
export const UPDATE = 'meta/UPDATE'
Expand Down Expand Up @@ -279,7 +280,15 @@ export const dbMetaEpic = (some$, store) =>
// Labels, types and propertyKeys
.mergeMap(() =>
Rx.Observable
.fromPromise(bolt.routedReadTransaction(metaQuery))
.fromPromise(
bolt.routedReadTransaction(
metaQuery,
{},
{
useCypherThread: shouldUseCypherThread(store.getState())
}
)
)
.catch(e => Rx.Observable.of(null))
)
.filter(r => r)
Expand Down Expand Up @@ -372,7 +381,11 @@ export const dbMetaEpic = (some$, store) =>
.mergeMap(() =>
Rx.Observable
.fromPromise(
bolt.directTransaction('CALL dbms.cluster.role() YIELD role')
bolt.directTransaction(
'CALL dbms.cluster.role() YIELD role',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
)
)
.catch(e => Rx.Observable.of(null))
.do(res => {
Expand Down
7 changes: 6 additions & 1 deletion src/shared/modules/features/featuresDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import bolt from 'services/bolt/bolt'
import { APP_START, WEB } from 'shared/modules/app/appDuck'
import { CONNECTION_SUCCESS } from 'shared/modules/connections/connectionsDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'

export const NAME = 'features'
export const RESET = 'features/RESET'
Expand Down Expand Up @@ -66,7 +67,11 @@ export const featuresDiscoveryEpic = (action$, store) => {
.ofType(CONNECTION_SUCCESS)
.mergeMap(() => {
return bolt
.routedReadTransaction('CALL dbms.procedures YIELD name')
.routedReadTransaction(
'CALL dbms.procedures YIELD name',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
)
.then(res => {
store.dispatch(
updateFeatures(res.records.map(record => record.get('name')))
Expand Down
11 changes: 8 additions & 3 deletions src/shared/modules/jmx/jmxDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
connectionLossFilter
} from 'shared/modules/connections/connectionsDuck'
import { FORCE_FETCH } from 'shared/modules/dbMeta/dbMetaDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'

export const NAME = 'jmx'
export const UPDATE = NAME + '/UPDATE'
Expand Down Expand Up @@ -58,9 +59,13 @@ export const getJmxValues = ({ jmx }, arr) => {
* Helpers
*/

const fetchJmxValues = () => {
const fetchJmxValues = store => {
return bolt
.directTransaction('CALL dbms.queryJmx("org.neo4j:*")')
.directTransaction(
'CALL dbms.queryJmx("org.neo4j:*")',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
)
.then(res => {
const converters = {
intChecker: bolt.neo4j.isInt,
Expand Down Expand Up @@ -120,7 +125,7 @@ export const jmxEpic = (some$, store) =>
.merge(some$.ofType(FORCE_FETCH))
.mergeMap(() =>
Rx.Observable
.fromPromise(fetchJmxValues())
.fromPromise(fetchJmxValues(store))
.catch(e => Rx.Observable.of(null))
)
.filter(r => r)
Expand Down
169 changes: 117 additions & 52 deletions src/shared/services/bolt/bolt.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
CYPHER_RESPONSE_MESSAGE,
POST_CANCEL_TRANSACTION_MESSAGE
} from './boltWorkerMessages'

/* eslint-disable import/no-webpack-loader-syntax */
import BoltWorkerModule from 'worker-loader?inline!./boltWorker.js'
/* eslint-enable import/no-webpack-loader-syntax */
Expand Down Expand Up @@ -67,62 +66,91 @@ function cancelTransaction (id, cb) {
}
}

function routedWriteTransaction (
input,
parameters,
requestId = null,
cancelable = false,
useCypherThread
) {
function routedWriteTransaction (input, parameters, requestMetaData = {}) {
const {
useCypherThread = false,
requestId = null,
cancelable = false
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
const boltWorker = new BoltWorkerModule()
boltWorkerRegister[id] = boltWorker

const workerFinalizer = getWorkerFinalizer(
boltWorkerRegister,
cancellationRegister,
id
const workFn = runCypherMessage(
input,
parameters,
boltConnection.ROUTED_WRITE_CONNECTION,
id,
cancelable,
{
...connectionProperties,
inheritedUseRouting: boltConnection.useRouting()
}
)
const workerPromise = setupBoltWorker(id, workFn)
return [id, workerPromise]
} else {
return boltConnection.routedWriteTransaction(
input,
parameters,
requestId,
cancelable
)
}
}

const workerPromise = new Promise((resolve, reject) => {
boltWorker.postMessage(
runCypherMessage(input, parameters, id, cancelable, {
...connectionProperties,
inheritedUseRouting: boltConnection.useRouting()
})
)
boltWorker.onmessage = msg => {
if (msg.data.type === CYPHER_ERROR_MESSAGE) {
workerFinalizer(boltWorker)
reject(msg.data.error)
} else if (msg.data.type === CYPHER_RESPONSE_MESSAGE) {
let records = msg.data.result.records.map(record => {
const typedRecord = new neo4j.types.Record(
record.keys,
record._fields,
record._fieldLookup
)
if (typedRecord._fields) {
typedRecord._fields = mappings.applyGraphTypes(
typedRecord._fields
)
}
return typedRecord
})

let summary = mappings.applyGraphTypes(msg.data.result.summary)
workerFinalizer(boltWorker)
resolve({ summary, records })
} else if (msg.data.type === POST_CANCEL_TRANSACTION_MESSAGE) {
workerFinalizer(boltWorker)
}
function routedReadTransaction (input, parameters, requestMetaData = {}) {
const {
useCypherThread = false,
requestId = null,
cancelable = false
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
const workFn = runCypherMessage(
input,
parameters,
boltConnection.ROUTED_READ_CONNECTION,
id,
cancelable,
{
...connectionProperties,
inheritedUseRouting: boltConnection.useRouting()
}
})
)
const workerPromise = setupBoltWorker(id, workFn)
return workerPromise
} else {
return boltConnection.routedReadTransaction(
input,
parameters,
requestId,
cancelable
)
}
}

return [id, workerPromise]
function directTransaction (input, parameters, requestMetaData = {}) {
const {
useCypherThread = false,
requestId = null,
cancelable = false
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
const workFn = runCypherMessage(
input,
parameters,
boltConnection.DIRECT_CONNECTION,
id,
cancelable,
{
...connectionProperties,
inheritedUseRouting: false
}
)
const workerPromise = setupBoltWorker(id, workFn)
return workerPromise
} else {
return boltConnection.routedWriteTransaction(
return boltConnection.directTransaction(
input,
parameters,
requestId,
Expand All @@ -131,6 +159,43 @@ function routedWriteTransaction (
}
}

function setupBoltWorker (id, workFn) {
const boltWorker = new BoltWorkerModule()
const onFinished = registerBoltWorker(id, boltWorker)
const workerPromise = new Promise((resolve, reject) => {
boltWorker.postMessage(workFn)
boltWorker.onmessage = msg => {
if (msg.data.type === CYPHER_ERROR_MESSAGE) {
onFinished(boltWorker)
reject(msg.data.error)
} else if (msg.data.type === CYPHER_RESPONSE_MESSAGE) {
let records = msg.data.result.records.map(record => {
const typedRecord = new neo4j.types.Record(
record.keys,
record._fields,
record._fieldLookup
)
if (typedRecord._fields) {
typedRecord._fields = mappings.applyGraphTypes(typedRecord._fields)
}
return typedRecord
})
let summary = mappings.applyGraphTypes(msg.data.result.summary)
onFinished(boltWorker)
resolve({ summary, records })
} else if (msg.data.type === POST_CANCEL_TRANSACTION_MESSAGE) {
onFinished(boltWorker)
}
}
})
return workerPromise
}

function registerBoltWorker (id, boltWorker) {
boltWorkerRegister[id] = boltWorker
return getWorkerFinalizer(boltWorkerRegister, cancellationRegister, id)
}

function getWorkerFinalizer (workerRegister, cancellationRegister, workerId) {
return worker => {
if (cancellationRegister[workerId]) {
Expand All @@ -153,8 +218,8 @@ export default {
connectionProperties = null
boltConnection.closeConnection()
},
directTransaction: boltConnection.directTransaction,
routedReadTransaction: boltConnection.routedReadTransaction,
directTransaction,
routedReadTransaction,
routedWriteTransaction,
cancelTransaction,
useRoutingConfig: shouldWe => boltConnection.setUseRoutingConfig(shouldWe),
Expand Down
4 changes: 4 additions & 0 deletions src/shared/services/bolt/boltConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import { v1 as neo4j } from 'neo4j-driver-alias'
import { v4 } from 'uuid'
import { BoltConnectionError, createErrorObject } from '../exceptions'

export const DIRECT_CONNECTION = 'DIRECT_CONNECTION'
export const ROUTED_WRITE_CONNECTION = 'ROUTED_WRITE_CONNECTION'
export const ROUTED_READ_CONNECTION = 'ROUTED_READ_CONNECTION'

const runningQueryRegister = {}

let _drivers = null
Expand Down
Loading

0 comments on commit 3186ee9

Please sign in to comment.