Skip to content

Commit

Permalink
Custom healthcheck with filters
Browse files Browse the repository at this point in the history
Enable filtering with custom health checks based on node attributes:
```
opensearch.optimizedHealthcheck.filters: {
  attribute_key: "attribute_value",
}
```

Also, fixes issue that expects the response to array when it was a
dictionary.

Issue:
opensearch-project#2214
opensearch-project#2203

Signed-off-by: Kawika Avilla <kavilla414@gmail.com>
  • Loading branch information
kavilla committed Aug 31, 2022
1 parent 65005be commit 944c563
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 23 deletions.
5 changes: 4 additions & 1 deletion config/opensearch_dashboards.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
# This node attribute should assign all nodes of the same cluster an integer value that increments with each new cluster that is spun up
# e.g. in opensearch.yml file you would set the value to a setting using node.attr.cluster_id:
# Should only be enabled if there is a corresponding node attribute created in your OpenSearch config that matches the value here
#opensearch.optimizedHealthcheckId: "cluster_id"
#opensearch.optimizedHealthcheck.id: "cluster_id"
#opensearch.optimizedHealthcheck.filters: {
# attribute_key: "attribute_value",
#}

# If your OpenSearch is protected with basic authentication, these settings provide
# the username and password that the OpenSearch Dashboards server uses to perform maintenance on the OpenSearch Dashboards
Expand Down
12 changes: 8 additions & 4 deletions src/core/server/opensearch/opensearch_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ export const configSchema = schema.object({
requestTimeout: schema.duration({ defaultValue: '30s' }),
pingTimeout: schema.duration({ defaultValue: schema.siblingRef('requestTimeout') }),
logQueries: schema.boolean({ defaultValue: false }),
optimizedHealthcheckId: schema.maybe(schema.string()),
optimizedHealthcheck: schema.object({
id: schema.maybe(schema.string()),
filters: schema.maybe(schema.recordOf(schema.string(), schema.string(), { defaultValue: {} })),
}),
ssl: schema.object(
{
verificationMode: schema.oneOf(
Expand Down Expand Up @@ -158,7 +161,8 @@ const deprecations: ConfigDeprecationProvider = ({ renameFromRoot, renameFromRoo
renameFromRoot('elasticsearch.requestTimeout', 'opensearch.requestTimeout'),
renameFromRoot('elasticsearch.pingTimeout', 'opensearch.pingTimeout'),
renameFromRoot('elasticsearch.logQueries', 'opensearch.logQueries'),
renameFromRoot('elasticsearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheckId'),
renameFromRoot('elasticsearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheck.id'),
renameFromRoot('opensearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheck.id'),
renameFromRoot('elasticsearch.ssl', 'opensearch.ssl'),
renameFromRoot('elasticsearch.apiVersion', 'opensearch.apiVersion'),
renameFromRoot('elasticsearch.healthCheck', 'opensearch.healthCheck'),
Expand Down Expand Up @@ -226,7 +230,7 @@ export class OpenSearchConfig {
* Specifies whether Dashboards should only query the local OpenSearch node when
* all nodes in the cluster have the same node attribute value
*/
public readonly optimizedHealthcheckId?: string;
public readonly optimizedHealthcheck?: OpenSearchConfigType['optimizedHealthcheck'];

/**
* Hosts that the client will connect to. If sniffing is enabled, this list will
Expand Down Expand Up @@ -314,7 +318,7 @@ export class OpenSearchConfig {
this.ignoreVersionMismatch = rawConfig.ignoreVersionMismatch;
this.apiVersion = rawConfig.apiVersion;
this.logQueries = rawConfig.logQueries;
this.optimizedHealthcheckId = rawConfig.optimizedHealthcheckId;
this.optimizedHealthcheck = rawConfig.optimizedHealthcheck;
this.hosts = Array.isArray(rawConfig.hosts) ? rawConfig.hosts : [rawConfig.hosts];
this.requestHeadersWhitelist = Array.isArray(rawConfig.requestHeadersWhitelist)
? rawConfig.requestHeadersWhitelist
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/opensearch/opensearch_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class OpenSearchService

const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({
internalClient: this.client.asInternalUser,
optimizedHealthcheckId: config.optimizedHealthcheckId,
optimizedHealthcheck: config.optimizedHealthcheck,
log: this.log,
ignoreVersionMismatch: config.ignoreVersionMismatch,
opensearchVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ describe('pollOpenSearchNodesVersion', () => {

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand All @@ -203,7 +203,7 @@ describe('pollOpenSearchNodesVersion', () => {

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand Down Expand Up @@ -232,7 +232,7 @@ describe('pollOpenSearchNodesVersion', () => {

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand Down Expand Up @@ -268,7 +268,7 @@ describe('pollOpenSearchNodesVersion', () => {

const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 100,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand Down Expand Up @@ -308,7 +308,7 @@ describe('pollOpenSearchNodesVersion', () => {

const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 10,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,54 @@ import type { OpenSearchClient } from '../client';
*/
export const getNodeId = async (
internalClient: OpenSearchClient,
healthcheckAttributeName: string
healthcheck?: OptimizedHealthcheck
): Promise<string | null> => {
try {
let path = `nodes.*.attributes.${healthcheck?.id}`;
const filters = healthcheck?.filters;
if (filters) {
Object.keys(filters).forEach((key) => {
path += `,nodes.*.attributes.${key}`;
});
}

const state = (await internalClient.cluster.state({
metric: 'nodes',
filter_path: [`nodes.*.attributes.${healthcheckAttributeName}`],
filter_path: [path],
})) as ApiResponse;
/* Aggregate different cluster_ids from the OpenSearch nodes
* if all the nodes have the same cluster_id, retrieve nodes.info from _local node only
* Using _cluster/state/nodes to retrieve the cluster_id of each node from cluster manager node which is considered to be a lightweight operation
* else if the nodes have different cluster_ids then fan out the request to all nodes
* else there are no nodes in the cluster
*/
const sharedClusterId =
state.body.nodes.length > 0
? get(state.body.nodes[0], `attributes.${healthcheckAttributeName}`, null)
: null;
const nodes = state.body.nodes;
let nodeIds = Object.keys(nodes);
if (nodeIds.length === 0) {
return null;
}

if (filters) {
nodeIds.forEach((id) => {
Object.keys(filters).forEach((key) => {
const attributeValue = get(nodes[id], `attributes.${key}`, null);
if (attributeValue === filters[key]) {
delete nodes[id];
}
});
});

nodeIds = Object.keys(nodes);
if (nodeIds.length === 0) {
return null;
}
}

const sharedClusterId = get(nodes[nodeIds[0]], `attributes.${healthcheck?.id}`, null);

return sharedClusterId === null ||
state.body.nodes.find(
(node: any) => sharedClusterId !== get(node, `attributes.${healthcheckAttributeName}`, null)
nodes.find(
(node: any) => sharedClusterId !== get(node, `attributes.${healthcheck?.id}`, null)
)
? null
: '_local';
Expand All @@ -87,7 +115,7 @@ export const getNodeId = async (

export interface PollOpenSearchNodesVersionOptions {
internalClient: OpenSearchClient;
optimizedHealthcheckId?: string;
optimizedHealthcheck?: OptimizedHealthcheck;
log: Logger;
opensearchDashboardsVersion: string;
ignoreVersionMismatch: boolean;
Expand Down Expand Up @@ -118,6 +146,13 @@ export interface NodesVersionCompatibility {
nodesInfoRequestError?: Error;
}

export interface OptimizedHealthcheck {
id?: string;
filters?: {
[key: string]: string;
};
}

function getHumanizedNodeName(node: NodeInfo) {
const publishAddress = node?.http?.publish_address + ' ' || '';
return 'v' + node.version + ' @ ' + publishAddress + '(' + node.ip + ')';
Expand Down Expand Up @@ -201,7 +236,7 @@ function compareNodes(prev: NodesVersionCompatibility, curr: NodesVersionCompati

export const pollOpenSearchNodesVersion = ({
internalClient,
optimizedHealthcheckId,
optimizedHealthcheck,
log,
opensearchDashboardsVersion,
ignoreVersionMismatch,
Expand All @@ -216,8 +251,8 @@ export const pollOpenSearchNodesVersion = ({
* For better dashboards resilience, the behaviour is changed to only query the local node when all the nodes have the same cluster_id
* Using _cluster/state/nodes to retrieve the cluster_id of each node from the cluster manager node
*/
if (optimizedHealthcheckId) {
return from(getNodeId(internalClient, optimizedHealthcheckId)).pipe(
if (optimizedHealthcheck) {
return from(getNodeId(internalClient, optimizedHealthcheck)).pipe(
mergeMap((nodeId: any) =>
from(
internalClient.nodes.info<NodesInfo>({
Expand Down

0 comments on commit 944c563

Please sign in to comment.