Skip to content

Commit

Permalink
Add transaction metadata to queries
Browse files Browse the repository at this point in the history
This is not for all places yet, this is for the recurring background queries + user executed queries.
If we pass metadata along to a pre neo4j 3.5.0 server, it returns a error message, that’s why we need to figure out the server version before start sending.
  • Loading branch information
oskarhane committed Oct 18, 2018
1 parent 7290233 commit 57c3fe3
Show file tree
Hide file tree
Showing 14 changed files with 1,666 additions and 79 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
"moduleNameMapper": {
"\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga|html)$": "<rootDir>/__mocks__/fileMock.js",
"\\.(css|less)$": "<rootDir>/__mocks__/styleMock.js",
"^neo4j-driver$": "neo4j-driver",
"^browser-styles(.*)$": "<rootDir>/src/browser/styles$1",
"^browser-components(.*)$": "<rootDir>/src/browser/components$1",
"worker-loader": "<rootDir>/__mocks__/workerLoaderMock.js"
"worker-loader": "<rootDir>/__mocks__/workerLoaderMock.js",
"project-root(.*)$": "<rootDir>$1"
},
"modulePaths": [
"<rootDir>/src",
Expand Down Expand Up @@ -143,7 +143,7 @@
"firebase": "^4.3.0",
"isomorphic-fetch": "^2.2.1",
"jsonic": "^0.3.0",
"neo4j-driver": "^1.7.0-beta02",
"neo4j-driver": "^1.7.0-rc2",
"react": "^16.4.1",
"react-addons-pure-render-mixin": "^15.0.2",
"react-dnd": "^2.5.1",
Expand Down
19 changes: 14 additions & 5 deletions src/browser/modules/Stream/Queries/QueriesFrame.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ export class QueriesFrame extends Component {
['User', '8%'],
['Query', 'auto'],
['Params', '7%'],
['Meta', '8%'],
['Meta', 'auto'],
['Elapsed time', '95px'],
['Kill', '95px']
]
const tableRows = queries.map(query => {
const tableRows = queries.map((query, i) => {
return (
<tr key='rows'>
<tr key={i}>
<StyledTd
key='host'
title={query.host}
Expand All @@ -228,7 +228,11 @@ export class QueriesFrame extends Component {
<StyledTd key='params' width={tableHeaderSizes[3][1]}>
<Code>{JSON.stringify(query.parameters, null, 2)}</Code>
</StyledTd>
<StyledTd key='meta' width={tableHeaderSizes[4][1]}>
<StyledTd
key='meta'
title={JSON.stringify(query.metaData, null, 2)}
width={tableHeaderSizes[4][1]}
>
<Code>{JSON.stringify(query.metaData, null, 2)}</Code>
</StyledTd>
<StyledTd key='time' width={tableHeaderSizes[5][1]}>
Expand Down Expand Up @@ -321,4 +325,9 @@ const mapStateToProps = state => {
}
}

export default withBus(connect(mapStateToProps, null)(QueriesFrame))
export default withBus(
connect(
mapStateToProps,
null
)(QueriesFrame)
)
3 changes: 2 additions & 1 deletion src/shared/modules/commands/commandsDuck.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ describe('commandsDuck', () => {
node: {
color: '#000'
}
}
},
meta: {}
})
})
afterEach(() => {
Expand Down
6 changes: 4 additions & 2 deletions src/shared/modules/commands/helpers/cypher.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export const handleCypherCommand = (
action,
put,
params = {},
shouldUseCypherThread = false
shouldUseCypherThread = false,
txMetadata = undefined
) => {
const paramsToNeo4jType = Object.keys(params).map(k => ({
[k]: applyGraphTypes(params[k])
Expand All @@ -38,7 +39,8 @@ export const handleCypherCommand = (
{
useCypherThread: shouldUseCypherThread,
requestId: action.requestId,
cancelable: true
cancelable: true,
...txMetadata
}
)
put(send('cypher', id))
Expand Down
11 changes: 9 additions & 2 deletions src/shared/modules/currentUser/currentUserDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
CONNECTION_SUCCESS,
DISCONNECTION_SUCCESS
} from 'shared/modules/connections/connectionsDuck'
import { getBackgroundTxMetadata } from 'shared/services/bolt/txMetadata'
import { canSendTxMetadata } from '../features/featuresDuck'

export const NAME = 'user'
export const UPDATE_CURRENT_USER = NAME + '/UPDATE_CURRENT_USER'
Expand Down Expand Up @@ -89,10 +91,15 @@ export const getCurrentUserEpic = (some$, store) =>
bolt.directTransaction(
'CALL dbms.security.showCurrentUser()',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
{
useCypherThread: shouldUseCypherThread(store.getState()),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
)
.catch(e => ({ type: CLEAR }))
.catch(e => Rx.Observable.of(null))
.map(result => {
if (!result) return { type: CLEAR }
const keys = result.records[0].keys
Expand Down
55 changes: 33 additions & 22 deletions src/shared/modules/dbMeta/dbMetaDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import {
onLostConnection
} from 'shared/modules/connections/connectionsDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'
import { getBackgroundTxMetadata } from 'shared/services/bolt/txMetadata'
import { canSendTxMetadata } from '../features/featuresDuck'

export const NAME = 'meta'
export const UPDATE = 'meta/UPDATE'
Expand Down Expand Up @@ -65,7 +67,8 @@ export function getMetaInContext (state, context) {
}
}

export const getVersion = state => state[NAME].server.version
export const getVersion = state =>
(state[NAME] || {}).server ? (state[NAME] || {}).server.version : 0
export const getEdition = state => state[NAME].server.edition
export const getDbName = state => state[NAME].server.dbName
export const getStoreSize = state => state[NAME].server.storeSize
Expand Down Expand Up @@ -285,37 +288,45 @@ export const dbMetaEpic = (some$, store) =>
.merge(some$.ofType(CONNECTION_SUCCESS))
.mergeMap(() => {
return (
Rx.Observable
.timer(1, 20000)
Rx.Observable.timer(1, 20000)
.merge(some$.ofType(FORCE_FETCH))
// Labels, types and propertyKeys
.mergeMap(() =>
Rx.Observable
.fromPromise(
bolt.routedReadTransaction(
metaQuery,
{},
{
useCypherThread: shouldUseCypherThread(store.getState()),
onLostConnection: onLostConnection(store.dispatch)
}
)
Rx.Observable.fromPromise(
bolt.routedReadTransaction(
metaQuery,
{},
{
useCypherThread: shouldUseCypherThread(store.getState()),
onLostConnection: onLostConnection(store.dispatch),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
.catch(e => Rx.Observable.of(null))
).catch(e => {
return Rx.Observable.of(null)
})
)
.filter(r => r)
.do(res => store.dispatch(updateMeta(res)))
// Cluster role
.mergeMap(() =>
Rx.Observable
.fromPromise(
bolt.directTransaction(
'CALL dbms.cluster.role() YIELD role',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
)
Rx.Observable.fromPromise(
bolt.directTransaction(
'CALL dbms.cluster.role() YIELD role',
{},
{
useCypherThread: shouldUseCypherThread(store.getState()),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
.catch(e => Rx.Observable.of(null))
)
.catch(e => {
return Rx.Observable.of(null)
})
.do(res => {
if (!res) return Rx.Observable.of(null)
const role = res.records[0].get(0)
Expand Down
26 changes: 23 additions & 3 deletions src/shared/modules/features/featuresDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,36 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import semver from 'semver'
import Rx from 'rxjs/Rx'
import bolt from 'services/bolt/bolt'
import { APP_START, WEB } from 'shared/modules/app/appDuck'
import { CONNECTION_SUCCESS } from 'shared/modules/connections/connectionsDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'
import { getBackgroundTxMetadata } from 'shared/services/bolt/txMetadata'
import { getVersion } from '../dbMeta/dbMetaDuck'

export const NAME = 'features'
export const RESET = 'features/RESET'
export const UPDATE_ALL_FEATURES = 'features/UPDATE_ALL_FEATURES'
const NEO4J_TX_METADATA_VERSION = '3.5.0-alpha01'

export const getAvailableProcedures = state => state[NAME].availableProcedures
export const isACausalCluster = state =>
getAvailableProcedures(state).includes('dbms.cluster.overview')
export const canAssignRolesToUser = state =>
getAvailableProcedures(state).includes('dbms.security.addRoleToUser')
export const useBrowserSync = state => !!state[NAME].browserSync
export const canSendTxMetadata = state => {
const serverVersion = getVersion(state)
if (!serverVersion) {
return false
}
if (semver.gt(serverVersion, NEO4J_TX_METADATA_VERSION)) {
return true
}
return false
}

const initialState = {
availableProcedures: [],
Expand Down Expand Up @@ -70,16 +85,21 @@ export const featuresDiscoveryEpic = (action$, store) => {
.routedReadTransaction(
'CALL dbms.procedures YIELD name',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
{
useCypherThread: shouldUseCypherThread(store.getState()),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
.then(res => {
store.dispatch(
updateFeatures(res.records.map(record => record.get('name')))
)
return null
return Rx.Observable.of(null)
})
.catch(e => {
return null
return Rx.Observable.of(null)
})
})
.mapTo({ type: 'NOOP' })
Expand Down
37 changes: 21 additions & 16 deletions src/shared/modules/jmx/jmxDuck.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import {
connectionLossFilter
} from 'shared/modules/connections/connectionsDuck'
import { FORCE_FETCH } from 'shared/modules/dbMeta/dbMetaDuck'
import { canSendTxMetadata } from 'shared/modules/features/featuresDuck'
import { shouldUseCypherThread } from 'shared/modules/settings/settingsDuck'
import { getBackgroundTxMetadata } from 'shared/services/bolt/txMetadata'

export const NAME = 'jmx'
export const UPDATE = NAME + '/UPDATE'
Expand Down Expand Up @@ -64,27 +66,31 @@ const fetchJmxValues = store => {
.directTransaction(
'CALL dbms.queryJmx("org.neo4j:*")',
{},
{ useCypherThread: shouldUseCypherThread(store.getState()) }
{
useCypherThread: shouldUseCypherThread(store.getState()),
...getBackgroundTxMetadata({
hasServerSupport: canSendTxMetadata(store.getState())
})
}
)
.then(res => {
const converters = {
intChecker: bolt.neo4j.isInt,
intConverter: val => val.toString(),
objectConverter: extractFromNeoObjects
}
return toObjects(
res.records,
converters
).map(([name, description, attributes]) => {
return {
name,
description,
attributes
return toObjects(res.records, converters).map(
([name, description, attributes]) => {
return {
name,
description,
attributes
}
}
})
)
})
.catch(e => {
return null
return Rx.Observable.of(null)
})
}

Expand Down Expand Up @@ -120,13 +126,12 @@ export const jmxEpic = (some$, store) =>
.filter(s => s.state === CONNECTED_STATE)
.merge(some$.ofType(CONNECTION_SUCCESS))
.mergeMap(() => {
return Rx.Observable
.timer(0, 20000)
return Rx.Observable.timer(0, 20000)
.merge(some$.ofType(FORCE_FETCH))
.mergeMap(() =>
Rx.Observable
.fromPromise(fetchJmxValues(store))
.catch(e => Rx.Observable.of(null))
Rx.Observable.fromPromise(fetchJmxValues(store)).catch(e =>
Rx.Observable.of(null)
)
)
.filter(r => r)
.do(res => store.dispatch(updateJmxValues(res)))
Expand Down
18 changes: 12 additions & 6 deletions src/shared/services/bolt/bolt.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ function routedWriteTransaction (input, parameters, requestMetaData = {}) {
useCypherThread = false,
requestId = null,
cancelable = false,
onLostConnection = () => {}
onLostConnection = () => {},
txMetadata = undefined
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
Expand All @@ -90,7 +91,8 @@ function routedWriteTransaction (input, parameters, requestMetaData = {}) {
generateBoltHost(
connectionProperties ? connectionProperties.host : ''
)
)
),
txMetadata
}
)
const workerPromise = setupBoltWorker(id, workFn, onLostConnection)
Expand All @@ -110,7 +112,8 @@ function routedReadTransaction (input, parameters, requestMetaData = {}) {
useCypherThread = false,
requestId = null,
cancelable = false,
onLostConnection = () => {}
onLostConnection = () => {},
txMetadata = undefined
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
Expand All @@ -126,7 +129,8 @@ function routedReadTransaction (input, parameters, requestMetaData = {}) {
generateBoltHost(
connectionProperties ? connectionProperties.host : ''
)
)
),
txMetadata
}
)
const workerPromise = setupBoltWorker(id, workFn, onLostConnection)
Expand All @@ -146,7 +150,8 @@ function directTransaction (input, parameters, requestMetaData = {}) {
useCypherThread = false,
requestId = null,
cancelable = false,
onLostConnection = () => {}
onLostConnection = () => {},
txMetadata = undefined
} = requestMetaData
if (useCypherThread && window.Worker) {
const id = requestId || v4()
Expand All @@ -162,7 +167,8 @@ function directTransaction (input, parameters, requestMetaData = {}) {
generateBoltHost(
connectionProperties ? connectionProperties.host : ''
)
)
),
txMetadata
}
)
const workerPromise = setupBoltWorker(id, workFn, onLostConnection)
Expand Down
Loading

0 comments on commit 57c3fe3

Please sign in to comment.