Skip to content

Commit

Permalink
Add transaction metadata to queries
Browse files Browse the repository at this point in the history
This is not for all places yet, this is for the recurring background queries + user executed queries.
If we pass metadata along to a pre neo4j 3.5.0 server, it returns a error message, that’s why we need to figure out the server version before start sending.
  • Loading branch information
oskarhane committed Oct 18, 2018
1 parent db5c727 commit e931dd4
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 72 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
"moduleNameMapper": {
"\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga|html)$": "<rootDir>/__mocks__/fileMock.js",
"\\.(css|less)$": "<rootDir>/__mocks__/styleMock.js",
"^neo4j-driver$": "neo4j-driver",
"^browser-styles(.*)$": "<rootDir>/src/browser/styles$1",
"^browser-components(.*)$": "<rootDir>/src/browser/components$1",
"worker-loader": "<rootDir>/__mocks__/workerLoaderMock.js"
"worker-loader": "<rootDir>/__mocks__/workerLoaderMock.js",
"project-root(.*)$": "<rootDir>$1"
},
"modulePaths": [
"<rootDir>/src",
Expand Down Expand Up @@ -148,7 +148,7 @@
"firebase": "^4.3.0",
"isomorphic-fetch": "^2.2.1",
"jsonic": "^0.3.0",
"neo4j-driver": "^1.7.0-beta02",
"neo4j-driver": "^1.7.0-rc2",
"react": "^16.4.1",
"react-addons-pure-render-mixin": "^15.0.2",
"react-dnd": "^2.5.1",
Expand Down
8 changes: 6 additions & 2 deletions src/browser/modules/Stream/Queries/QueriesFrame.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ export class QueriesFrame extends Component {
['User', '8%'],
['Query', 'auto'],
['Params', '7%'],
['Meta', '8%'],
['Meta', 'auto'],
['Elapsed time', '95px'],
['Kill', '95px']
]
Expand All @@ -236,7 +236,11 @@ export class QueriesFrame extends Component {
<StyledTd key='params' width={tableHeaderSizes[3][1]}>
<Code>{JSON.stringify(query.parameters, null, 2)}</Code>
</StyledTd>
<StyledTd key='meta' width={tableHeaderSizes[4][1]}>
<StyledTd
key='meta'
title={JSON.stringify(query.metaData, null, 2)}
width={tableHeaderSizes[4][1]}
>
<Code>{JSON.stringify(query.metaData, null, 2)}</Code>
</StyledTd>
<StyledTd key='time' width={tableHeaderSizes[5][1]}>
Expand Down
3 changes: 2 additions & 1 deletion src/shared/modules/commands/commandsDuck.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ describe('commandsDuck', () => {
node: {
color: '#000'
}
}
},
meta: {}
})
})
afterEach(() => {
Expand Down
6 changes: 4 additions & 2 deletions src/shared/modules/commands/helpers/cypher.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export const handleCypherCommand = (
action,
put,
params = {},
shouldUseCypherThread = false
shouldUseCypherThread = false,
txMetadata = undefined
) => {
const paramsToNeo4jType = Object.keys(params).map(k => ({
[k]: applyGraphTypes(params[k])
Expand All @@ -38,7 +39,8 @@ export const handleCypherCommand = (
{
useCypherThread: shouldUseCypherThread,
requestId: action.requestId,
cancelable: true
cancelable: true,
...txMetadata
}
)
put(send('cypher', id))
Expand Down
9 changes: 8 additions & 1 deletion src/shared/modules/currentUser/currentUserDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
CONNECTION_SUCCESS,
DISCONNECTION_SUCCESS
} from 'shared/modules/connections/connectionsDuck'
import { getBackgroundTxMetadata } from 'shared/services/bolt/txMetadata'
import { canSendTxMetadata } from '../features/featuresDuck'

export const NAME = 'user'
export const UPDATE_CURRENT_USER = NAME + '/UPDATE_CURRENT_USER'
Expand Down Expand Up @@ -89,7 +91,12 @@ export const getCurrentUserEpic = (some$, store) =>
bolt.directTransaction(
'CALL dbms.security.showCurrentUser()',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
{
useCypherThread: shouldUseCypherThread(store.getState()),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
)
.catch(() => Rx.Observable.of(null))
Expand Down
55 changes: 33 additions & 22 deletions src/shared/modules/dbMeta/dbMetaDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import {
onLostConnection
} from 'shared/modules/connections/connectionsDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'
import { getBackgroundTxMetadata } from 'shared/services/bolt/txMetadata'
import { canSendTxMetadata } from '../features/featuresDuck'

export const NAME = 'meta'
export const UPDATE = 'meta/UPDATE'
Expand Down Expand Up @@ -65,7 +67,8 @@ export function getMetaInContext (state, context) {
}
}

export const getVersion = state => state[NAME].server.version
export const getVersion = state =>
(state[NAME] || {}).server ? (state[NAME] || {}).server.version : 0
export const getEdition = state => state[NAME].server.edition
export const getDbName = state => state[NAME].server.dbName
export const getStoreSize = state => state[NAME].server.storeSize
Expand Down Expand Up @@ -285,37 +288,45 @@ export const dbMetaEpic = (some$, store) =>
.merge(some$.ofType(CONNECTION_SUCCESS))
.mergeMap(() => {
return (
Rx.Observable
.timer(1, 20000)
Rx.Observable.timer(1, 20000)
.merge(some$.ofType(FORCE_FETCH))
// Labels, types and propertyKeys
.mergeMap(() =>
Rx.Observable
.fromPromise(
bolt.routedReadTransaction(
metaQuery,
{},
{
useCypherThread: shouldUseCypherThread(store.getState()),
onLostConnection: onLostConnection(store.dispatch)
}
)
Rx.Observable.fromPromise(
bolt.routedReadTransaction(
metaQuery,
{},
{
useCypherThread: shouldUseCypherThread(store.getState()),
onLostConnection: onLostConnection(store.dispatch),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
.catch(e => Rx.Observable.of(null))
).catch(e => {
return Rx.Observable.of(null)
})
)
.filter(r => r)
.do(res => store.dispatch(updateMeta(res)))
// Cluster role
.mergeMap(() =>
Rx.Observable
.fromPromise(
bolt.directTransaction(
'CALL dbms.cluster.role() YIELD role',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
)
Rx.Observable.fromPromise(
bolt.directTransaction(
'CALL dbms.cluster.role() YIELD role',
{},
{
useCypherThread: shouldUseCypherThread(store.getState()),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
.catch(e => Rx.Observable.of(null))
)
.catch(e => {
return Rx.Observable.of(null)
})
.do(res => {
if (!res) return Rx.Observable.of(null)
const role = res.records[0].get(0)
Expand Down
26 changes: 23 additions & 3 deletions src/shared/modules/features/featuresDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,36 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import semver from 'semver'
import Rx from 'rxjs/Rx'
import bolt from 'services/bolt/bolt'
import { APP_START, WEB } from 'shared/modules/app/appDuck'
import { CONNECTION_SUCCESS } from 'shared/modules/connections/connectionsDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'
import { getBackgroundTxMetadata } from 'shared/services/bolt/txMetadata'
import { getVersion } from '../dbMeta/dbMetaDuck'

export const NAME = 'features'
export const RESET = 'features/RESET'
export const UPDATE_ALL_FEATURES = 'features/UPDATE_ALL_FEATURES'
const NEO4J_TX_METADATA_VERSION = '3.5.0-alpha01'

export const getAvailableProcedures = state => state[NAME].availableProcedures
export const isACausalCluster = state =>
getAvailableProcedures(state).includes('dbms.cluster.overview')
export const canAssignRolesToUser = state =>
getAvailableProcedures(state).includes('dbms.security.addRoleToUser')
export const useBrowserSync = state => !!state[NAME].browserSync
export const canSendTxMetadata = state => {
const serverVersion = getVersion(state)
if (!serverVersion) {
return false
}
if (semver.gt(serverVersion, NEO4J_TX_METADATA_VERSION)) {
return true
}
return false
}

const initialState = {
availableProcedures: [],
Expand Down Expand Up @@ -70,16 +85,21 @@ export const featuresDiscoveryEpic = (action$, store) => {
.routedReadTransaction(
'CALL dbms.procedures YIELD name',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
{
useCypherThread: shouldUseCypherThread(store.getState()),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
.then(res => {
store.dispatch(
updateFeatures(res.records.map(record => record.get('name')))
)
return null
return Rx.Observable.of(null)
})
.catch(e => {
return null
return Rx.Observable.of(null)
})
})
.mapTo({ type: 'NOOP' })
Expand Down
37 changes: 21 additions & 16 deletions src/shared/modules/jmx/jmxDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import {
connectionLossFilter
} from 'shared/modules/connections/connectionsDuck'
import { FORCE_FETCH } from 'shared/modules/dbMeta/dbMetaDuck'
import { canSendTxMetadata } from 'shared/modules/features/featuresDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'
import { getBackgroundTxMetadata } from 'shared/services/bolt/txMetadata'

export const NAME = 'jmx'
export const UPDATE = NAME + '/UPDATE'
Expand Down Expand Up @@ -64,27 +66,31 @@ const fetchJmxValues = store => {
.directTransaction(
'CALL dbms.queryJmx("org.neo4j:*")',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
{
useCypherThread: shouldUseCypherThread(store.getState()),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
.then(res => {
const converters = {
intChecker: bolt.neo4j.isInt,
intConverter: val => val.toString(),
objectConverter: extractFromNeoObjects
}
return toObjects(
res.records,
converters
).map(([name, description, attributes]) => {
return {
name,
description,
attributes
return toObjects(res.records, converters).map(
([name, description, attributes]) => {
return {
name,
description,
attributes
}
}
})
)
})
.catch(e => {
return null
return Rx.Observable.of(null)
})
}

Expand Down Expand Up @@ -120,13 +126,12 @@ export const jmxEpic = (some$, store) =>
.filter(s => s.state === CONNECTED_STATE)
.merge(some$.ofType(CONNECTION_SUCCESS))
.mergeMap(() => {
return Rx.Observable
.timer(0, 20000)
return Rx.Observable.timer(0, 20000)
.merge(some$.ofType(FORCE_FETCH))
.mergeMap(() =>
Rx.Observable
.fromPromise(fetchJmxValues(store))
.catch(e => Rx.Observable.of(null))
Rx.Observable.fromPromise(fetchJmxValues(store)).catch(e =>
Rx.Observable.of(null)
)
)
.filter(r => r)
.do(res => store.dispatch(updateJmxValues(res)))
Expand Down
18 changes: 12 additions & 6 deletions src/shared/services/bolt/bolt.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ function routedWriteTransaction (input, parameters, requestMetaData = {}) {
useCypherThread = false,
requestId = null,
cancelable = false,
onLostConnection = () => {}
onLostConnection = () => {},
txMetadata = undefined
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
Expand All @@ -90,7 +91,8 @@ function routedWriteTransaction (input, parameters, requestMetaData = {}) {
generateBoltHost(
connectionProperties ? connectionProperties.host : ''
)
)
),
txMetadata
}
)
const workerPromise = setupBoltWorker(id, workFn, onLostConnection)
Expand All @@ -110,7 +112,8 @@ function routedReadTransaction (input, parameters, requestMetaData = {}) {
useCypherThread = false,
requestId = null,
cancelable = false,
onLostConnection = () => {}
onLostConnection = () => {},
txMetadata = undefined
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
Expand All @@ -126,7 +129,8 @@ function routedReadTransaction (input, parameters, requestMetaData = {}) {
generateBoltHost(
connectionProperties ? connectionProperties.host : ''
)
)
),
txMetadata
}
)
const workerPromise = setupBoltWorker(id, workFn, onLostConnection)
Expand All @@ -146,7 +150,8 @@ function directTransaction (input, parameters, requestMetaData = {}) {
useCypherThread = false,
requestId = null,
cancelable = false,
onLostConnection = () => {}
onLostConnection = () => {},
txMetadata = undefined
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
Expand All @@ -162,7 +167,8 @@ function directTransaction (input, parameters, requestMetaData = {}) {
generateBoltHost(
connectionProperties ? connectionProperties.host : ''
)
)
),
txMetadata
}
)
const workerPromise = setupBoltWorker(id, workFn, onLostConnection)
Expand Down
Loading

0 comments on commit e931dd4

Please sign in to comment.