Skip to content

Commit

Permalink
add an abstraction layer for state machine handling to avoid duplicat…
Browse files Browse the repository at this point in the history
…e code
  • Loading branch information
moesterheld committed Dec 17, 2024
1 parent 14ef711 commit 5400eb8
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import org.graylog.datanode.opensearch.configuration.beans.impl.OpensearchDefaultConfigFilesBean;
import org.graylog.datanode.opensearch.configuration.beans.impl.OpensearchSecurityConfigurationBean;
import org.graylog.datanode.opensearch.configuration.beans.impl.SearchableSnapshotsConfigurationBean;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachineProvider;
import org.graylog.datanode.opensearch.statemachine.tracer.ClusterNodeStateTracer;
import org.graylog.datanode.opensearch.statemachine.tracer.OpensearchStateMachineTransitionLogger;
import org.graylog.datanode.opensearch.statemachine.tracer.OpensearchWatchdog;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTransitionLogger;
import org.graylog.datanode.process.configuration.beans.DatanodeConfigurationBean;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;

public class OpensearchProcessBindings extends AbstractModule {

Expand Down Expand Up @@ -82,10 +84,12 @@ protected void configure() {
bind(DatanodeTrustManagerProvider.class);

// tracer
Multibinder<StateMachineTracer> tracerBinder = Multibinder.newSetBinder(binder(), StateMachineTracer.class);
TypeLiteral<StateMachineTracer<OpensearchState, OpensearchEvent>> tracerType = new TypeLiteral<>() {};
Multibinder<StateMachineTracer<OpensearchState, OpensearchEvent>> tracerBinder =
Multibinder.newSetBinder(binder(), tracerType);
tracerBinder.addBinding().to(ClusterNodeStateTracer.class).asEagerSingleton();
tracerBinder.addBinding().to(OpensearchWatchdog.class).asEagerSingleton();
tracerBinder.addBinding().to(StateMachineTransitionLogger.class).asEagerSingleton();
tracerBinder.addBinding().to(OpensearchStateMachineTransitionLogger.class).asEagerSingleton();
tracerBinder.addBinding().to(ConfigureMetricsIndexSettings.class).asEagerSingleton();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.periodicals.MetricsCollector;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.graylog.storage.opensearch2.DataStreamAdapterOS2;
import org.graylog.storage.opensearch2.ism.IsmApi;
import org.graylog2.cluster.nodes.DataNodeDto;
Expand All @@ -50,7 +50,7 @@
import java.util.Map;
import java.util.stream.Collectors;

public class ConfigureMetricsIndexSettings implements StateMachineTracer {
public class ConfigureMetricsIndexSettings implements StateMachineTracer<OpensearchState, OpensearchEvent> {

private final Logger log = LoggerFactory.getLogger(ConfigureMetricsIndexSettings.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@
*/
package org.graylog.datanode.opensearch.statemachine;

import com.github.oxo42.stateless4j.StateMachine;
import com.github.oxo42.stateless4j.StateMachineConfig;
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracerAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.graylog.datanode.process.statemachine.ProcessStateMachine;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;

import java.util.Set;

public class OpensearchStateMachine extends StateMachine<OpensearchState, OpensearchEvent> {

private static final Logger LOG = LoggerFactory.getLogger(OpensearchStateMachine.class);
public class OpensearchStateMachine extends ProcessStateMachine<OpensearchState, OpensearchEvent> {

/**
* How many times can the OS rest api call fail before we switch to the failed state
Expand All @@ -37,14 +32,13 @@ public class OpensearchStateMachine extends StateMachine<OpensearchState, Opense
public static final int MAX_REST_STARTUP_FAILURES = 5;
public static final int MAX_REBOOT_FAILURES = 3;

StateMachineTracerAggregator tracerAggregator = new StateMachineTracerAggregator();

public OpensearchStateMachine(OpensearchState initialState, StateMachineConfig<OpensearchState, OpensearchEvent> config) {
super(initialState, config);
setTrace(tracerAggregator);
public OpensearchStateMachine(OpensearchState initialState,
StateMachineConfig<OpensearchState, OpensearchEvent> config,
Set<StateMachineTracer<OpensearchState, OpensearchEvent>> tracer) {
super(initialState, config, tracer);
}

public static OpensearchStateMachine createNew(OpensearchProcess process, Set<StateMachineTracer> tracer) {
public static OpensearchStateMachine createNew(OpensearchProcess process, Set<StateMachineTracer<OpensearchState, OpensearchEvent>> tracer) {
final FailuresCounter restFailureCounter = FailuresCounter.oneBased(MAX_REST_TEMPORARY_FAILURES);
final FailuresCounter startupFailuresCounter = FailuresCounter.oneBased(MAX_REST_STARTUP_FAILURES);
final FailuresCounter rebootCounter = FailuresCounter.oneBased(MAX_REBOOT_FAILURES);
Expand Down Expand Up @@ -130,29 +124,12 @@ public static OpensearchStateMachine createNew(OpensearchProcess process, Set<St
.permit(OpensearchEvent.RESET, OpensearchState.WAITING_FOR_CONFIGURATION, process::reset)
.ignore(OpensearchEvent.PROCESS_STOPPED);

OpensearchStateMachine stateMachine = new OpensearchStateMachine(OpensearchState.WAITING_FOR_CONFIGURATION, config);
tracer.forEach(t -> {
t.setStateMachine(stateMachine);
stateMachine.getTracerAggregator().addTracer(t);
});
return stateMachine;
}

public StateMachineTracerAggregator getTracerAggregator() {
return tracerAggregator;
}

private void fire(OpensearchEvent trigger, OpensearchEvent errorEvent) {
try {
super.fire(trigger);
} catch (Exception e) {
LOG.error("Failed to fire event " + trigger, e);
super.fire(errorEvent);
}
return new OpensearchStateMachine(OpensearchState.WAITING_FOR_CONFIGURATION, config, tracer);
}

@Override
public void fire(OpensearchEvent trigger) {
fire(trigger, OpensearchEvent.HEALTH_CHECK_FAILED);
protected OpensearchEvent getErrorEvent() {
return OpensearchEvent.HEALTH_CHECK_FAILED;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;

import java.util.Set;

public class OpensearchStateMachineProvider implements Provider<OpensearchStateMachine> {
private final OpensearchStateMachine opensearchStateMachine;

@Inject
public OpensearchStateMachineProvider(Set<StateMachineTracer> tracer, OpensearchProcess process) {
public OpensearchStateMachineProvider(Set<StateMachineTracer<OpensearchState, OpensearchEvent>> tracer, OpensearchProcess process) {
this.opensearchStateMachine = OpensearchStateMachine.createNew(process, tracer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import jakarta.inject.Inject;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.graylog2.cluster.NodeNotFoundException;
import org.graylog2.cluster.nodes.DataNodeDto;
import org.graylog2.cluster.nodes.NodeService;
Expand All @@ -27,7 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterNodeStateTracer implements StateMachineTracer {
public class ClusterNodeStateTracer implements StateMachineTracer<OpensearchState, OpensearchEvent> {

private final Logger log = LoggerFactory.getLogger(ClusterNodeStateTracer.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,9 @@
*/
package org.graylog.datanode.opensearch.statemachine.tracer;

import com.github.oxo42.stateless4j.delegates.Trace;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;

/**
* The tracer allows to observe triggered event (before) and transitions (after) of the {@link OpensearchStateMachine}
*/
public interface StateMachineTracer extends Trace<OpensearchState, OpensearchEvent> {

default void setStateMachine(OpensearchStateMachine stateMachine) {
}
import org.graylog.datanode.process.statemachine.tracer.StateMachineTransitionLogger;

public class OpensearchStateMachineTransitionLogger extends StateMachineTransitionLogger<OpensearchState, OpensearchEvent> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
*/
package org.graylog.datanode.opensearch.statemachine.tracer;

import com.github.oxo42.stateless4j.StateMachine;
import jakarta.inject.Inject;
import org.graylog.datanode.opensearch.statemachine.FailuresCounter;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This process watchdog follows transitions of the state machine and will try to restart the process in case of termination.
* If the process is actually stopped, it won't restart it and will automatically deactivate itself.
*/
public class OpensearchWatchdog implements StateMachineTracer {
public class OpensearchWatchdog implements StateMachineTracer<OpensearchState, OpensearchEvent> {

private static final Logger LOG = LoggerFactory.getLogger(OpensearchWatchdog.class);

Expand Down Expand Up @@ -97,7 +99,7 @@ public boolean isActive() {
}

@Override
public void setStateMachine(OpensearchStateMachine stateMachine) {
this.stateMachine = stateMachine;
public void setStateMachine(StateMachine<OpensearchState, OpensearchEvent> stateMachine) {
this.stateMachine = (OpensearchStateMachine) stateMachine;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package org.graylog.datanode.process;

import com.github.oxo42.stateless4j.delegates.Trace;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.tracer.StateMachineTracer;

public interface ManagableProcess<T, EVENT, STATE> {

void configure(T configuration);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.process.statemachine;

import com.github.oxo42.stateless4j.StateMachine;
import com.github.oxo42.stateless4j.StateMachineConfig;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracer;
import org.graylog.datanode.process.statemachine.tracer.StateMachineTracerAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

public abstract class ProcessStateMachine<STATE, EVENT> extends StateMachine<STATE, EVENT> {

private final Logger LOG = LoggerFactory.getLogger(ProcessStateMachine.class);

private final StateMachineTracerAggregator<STATE, EVENT> tracerAggregator = new StateMachineTracerAggregator<>();

public ProcessStateMachine(STATE initialState,
StateMachineConfig<STATE, EVENT> config,
Set<StateMachineTracer<STATE, EVENT>> tracer) {
super(initialState, config);
addTracer(tracer);
setTrace(tracerAggregator);
}

private void addTracer(Set<StateMachineTracer<STATE, EVENT>> tracers) {
tracers.forEach(tracer -> {
tracer.setStateMachine(this);
tracerAggregator.addTracer(tracer);
});
}

protected abstract EVENT getErrorEvent();

private void fire(EVENT trigger, EVENT errorEvent) {
try {
super.fire(trigger);
} catch (Exception e) {
LOG.error("Failed to fire event " + trigger, e);
super.fire(errorEvent);
}
}

@Override
public void fire(EVENT trigger) {
fire(trigger, getErrorEvent());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.process.statemachine.tracer;

import com.github.oxo42.stateless4j.StateMachine;
import com.github.oxo42.stateless4j.delegates.Trace;

public interface StateMachineTracer<STATE, EVENT> extends Trace<STATE, EVENT> {

default void setStateMachine(StateMachine<STATE, EVENT> stateMachine) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,30 @@
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.opensearch.statemachine.tracer;

import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.opensearch.statemachine.OpensearchStateMachine;
package org.graylog.datanode.process.statemachine.tracer;

import java.util.LinkedList;
import java.util.List;

public class StateMachineTracerAggregator implements StateMachineTracer {
public class StateMachineTracerAggregator<STATE, EVENT> implements StateMachineTracer<STATE, EVENT> {

private final List<StateMachineTracer> delegates = new LinkedList<>();
private final List<StateMachineTracer<STATE, EVENT>> delegates = new LinkedList<>();

public void addTracer(StateMachineTracer tracer) {
public void addTracer(StateMachineTracer<STATE, EVENT> tracer) {
delegates.add(tracer);
}

public void removeTracer(StateMachineTracer tracer) {
public void removeTracer(StateMachineTracer<STATE, EVENT> tracer) {
delegates.remove(tracer);
}

@Override
public void trigger(OpensearchEvent processEvent) {
public void trigger(EVENT processEvent) {
delegates.forEach(d -> d.trigger(processEvent));
}

@Override
public void transition(OpensearchEvent processEvent, OpensearchState s1, OpensearchState s2) {
public void transition(EVENT processEvent, STATE s1, STATE s2) {
delegates.forEach(d -> d.transition(processEvent, s1, s2));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.opensearch.statemachine.tracer;
package org.graylog.datanode.process.statemachine.tracer;

import jakarta.inject.Inject;
import org.graylog.datanode.opensearch.statemachine.OpensearchEvent;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateMachineTransitionLogger implements StateMachineTracer {
public class StateMachineTransitionLogger<STATE, EVENT> implements StateMachineTracer<STATE, EVENT> {

private static final Logger LOG = LoggerFactory.getLogger(StateMachineTransitionLogger.class);

Expand All @@ -31,12 +29,12 @@ public StateMachineTransitionLogger() {
}

@Override
public void trigger(OpensearchEvent trigger) {
public void trigger(EVENT trigger) {

}

@Override
public void transition(OpensearchEvent trigger, OpensearchState source, OpensearchState destination) {
public void transition(EVENT trigger, STATE source, STATE destination) {
if (!source.equals(destination)) {
LOG.debug("Triggered {}, source state: {}, destination: {}", trigger, source, destination);
}
Expand Down
Loading

0 comments on commit 5400eb8

Please sign in to comment.