Skip to content

Commit

Permalink
Add Scenario tests (#3847)
Browse files Browse the repository at this point in the history
* Add POC DMC restart test

* Cleanup

* More tests

* Add missing files

* Clean up scenario tests

* Address review suggestions
  • Loading branch information
uglide authored Jul 17, 2024
1 parent 28e7c73 commit fc603d2
Show file tree
Hide file tree
Showing 7 changed files with 513 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@
<version>2.38.0</version> <!-- 3.x requires Java 17 -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5-fluent</artifactId>
<version>5.3.1</version>
<scope>test</scope>
</dependency>

<!-- circuit breaker / failover -->
<dependency>
Expand Down
6 changes: 5 additions & 1 deletion src/test/java/redis/clients/jedis/EndpointConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package redis.clients.jedis;

import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import redis.clients.jedis.util.JedisURIHelper;

Expand Down Expand Up @@ -118,7 +120,9 @@ protected String getURISchema(boolean tls) {
}

public static HashMap<String, EndpointConfig> loadFromJSON(String filePath) throws Exception {
Gson gson = new Gson();
Gson gson = new GsonBuilder().setFieldNamingPolicy(
FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();

HashMap<String, EndpointConfig> configs;
try (FileReader reader = new FileReader(filePath)) {
configs = gson.fromJson(reader, new TypeToken<HashMap<String, EndpointConfig>>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package redis.clients.jedis.scenario;

import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.providers.ClusterConnectionProvider;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class ClusterTopologyRefreshTest {

private static final Logger log = LoggerFactory.getLogger(ClusterTopologyRefreshTest.class);

private static EndpointConfig endpoint;

private final FaultInjectionClient faultClient = new FaultInjectionClient();

@BeforeClass
public static void beforeClass() {
try {
ClusterTopologyRefreshTest.endpoint = HostAndPorts.getRedisEndpoint("re-single-shard-oss-cluster");
} catch (IllegalArgumentException e) {
log.warn("Skipping test because no Redis endpoint is configured");
org.junit.Assume.assumeTrue(false);
}
}

@Test
public void testWithPool() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(endpoint.getHostAndPort());

JedisClientConfig config = endpoint.getClientConfigBuilder()
.socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS)
.connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build();


ClusterConnectionProvider provider = new ClusterConnectionProvider(jedisClusterNode, config, RecommendedSettings.poolConfig);
ClusterConnectionProvider spyProvider = spy(provider);

try (JedisCluster client = new JedisCluster(spyProvider,
RecommendedSettings.MAX_RETRIES, RecommendedSettings.MAX_TOTAL_RETRIES_DURATION)) {
assertEquals("Was this BDB used to run this test before?", 1,
client.getClusterNodes().size());

AtomicLong commandsExecuted = new AtomicLong();

// Start thread that imitates an application that uses the client
FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
long i = commandsExecuted.getAndIncrement();
client.set(String.valueOf(i), String.valueOf(i));
return true;
});

Thread t = new Thread(fakeApp);
t.start();

HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());
params.put("actions", "[\"reshard\",\"failover\"]");

FaultInjectionClient.TriggerActionResponse actionResponse = null;

try {
log.info("Triggering Resharding and Failover");
actionResponse = faultClient.triggerAction("sequence_of_actions", params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
}

log.info("Action id: {}", actionResponse.getActionId());
fakeApp.setAction(actionResponse);

try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

assertTrue(fakeApp.capturedExceptions().isEmpty());

log.info("Commands executed: {}", commandsExecuted.get());
for (long i = 0; i < commandsExecuted.get(); i++) {
assertTrue(client.exists(String.valueOf(i)));
}

verify(spyProvider, atLeast(2)).renewSlotCache(any(Connection.class));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package redis.clients.jedis.scenario;

import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.SafeEncoder;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.*;

@RunWith(Parameterized.class)
public class ConnectionInterruptionTest {

private static final Logger log = LoggerFactory.getLogger(ConnectionInterruptionTest.class);

private static EndpointConfig endpoint;

private final FaultInjectionClient faultClient = new FaultInjectionClient();

@Parameterized.Parameters
public static Iterable<?> data() {
return Arrays.asList("dmc_restart", "network_failure");
}

@Parameterized.Parameter
public String triggerAction;

@BeforeClass
public static void beforeClass() {
try {
ConnectionInterruptionTest.endpoint = HostAndPorts.getRedisEndpoint("re-standalone");
} catch (IllegalArgumentException e) {
log.warn("Skipping test because no Redis endpoint is configured");
org.junit.Assume.assumeTrue(false);
}
}

@Test
public void testWithPool() {
ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(),
endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig);

UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES,
RecommendedSettings.MAX_TOTAL_RETRIES_DURATION);
String keyName = "counter";
client.set(keyName, "0");
assertEquals("0", client.get(keyName));

AtomicLong commandsExecuted = new AtomicLong();

// Start thread that imitates an application that uses the client
FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
assertTrue(client.incr(keyName) > 0);
long currentCount = commandsExecuted.getAndIncrement();
log.info("Command executed {}", currentCount);
return true;
});
fakeApp.setKeepExecutingForSeconds(RecommendedSettings.DEFAULT_TIMEOUT_MS/1000 * 2);
Thread t = new Thread(fakeApp);
t.start();

HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());

FaultInjectionClient.TriggerActionResponse actionResponse = null;

try {
log.info("Triggering {}", triggerAction);
actionResponse = faultClient.triggerAction(triggerAction, params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
}

log.info("Action id: {}", actionResponse.getActionId());
fakeApp.setAction(actionResponse);

try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

log.info("Commands executed: {}", commandsExecuted.get());
assertEquals(commandsExecuted.get(), Long.parseLong(client.get(keyName)));
assertTrue(fakeApp.capturedExceptions().isEmpty());

client.close();
}

@Test
public void testWithPubSub() {
ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(),
endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig);

UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES,
RecommendedSettings.MAX_TOTAL_RETRIES_DURATION);

AtomicLong messagesSent = new AtomicLong();
AtomicLong messagesReceived = new AtomicLong();

final Thread subscriberThread = getSubscriberThread(messagesReceived, connectionProvider);

// Start thread that imitates a publisher that uses the client
FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
log.info("Publishing message");
long consumed = client.publish("test", String.valueOf(messagesSent.getAndIncrement()));
return consumed > 0;
});
fakeApp.setKeepExecutingForSeconds(10);
Thread t = new Thread(fakeApp);
t.start();

HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());

FaultInjectionClient.TriggerActionResponse actionResponse = null;

try {
log.info("Triggering {}", triggerAction);
actionResponse = faultClient.triggerAction(triggerAction, params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
}

log.info("Action id: {}", actionResponse.getActionId());
fakeApp.setAction(actionResponse);

try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (subscriberThread.isAlive())
subscriberThread.interrupt();

assertEquals(messagesSent.get() - 1, messagesReceived.get());
assertTrue(fakeApp.capturedExceptions().isEmpty());

client.close();
}

private static Thread getSubscriberThread(AtomicLong messagesReceived,
ConnectionProvider connectionProvider) {
final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {

@Override
public void onMessage(String channel, String message) {
messagesReceived.incrementAndGet();
log.info("Received message: {}", message);
}

@Override
protected String encode(byte[] raw) {
return SafeEncoder.encode(raw);
}
};

final Thread subscriberThread = new Thread(() -> {
try {
pubSub.proceed(connectionProvider.getConnection(), "test");
fail("PubSub should have been interrupted");
} catch (JedisConnectionException e) {
log.info("Expected exception in Subscriber: {}", e.getMessage());
assertTrue(e.getMessage().contains("Unexpected end of stream."));
}
});
subscriberThread.start();
return subscriberThread;
}
}
65 changes: 65 additions & 0 deletions src/test/java/redis/clients/jedis/scenario/FakeApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package redis.clients.jedis.scenario;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class FakeApp implements Runnable {

private static final Logger log = LoggerFactory.getLogger(FakeApp.class);

public void setKeepExecutingForSeconds(int keepExecutingForSeconds) {
this.keepExecutingForSeconds = keepExecutingForSeconds;
}

private int keepExecutingForSeconds = 60;

private FaultInjectionClient.TriggerActionResponse actionResponse = null;
private final UnifiedJedis client;
private final ExecutedAction action;
private List<JedisException> exceptions = new ArrayList<>();

@FunctionalInterface
public interface ExecutedAction {
boolean run(UnifiedJedis client);
}

public FakeApp(UnifiedJedis client, ExecutedAction action) {
this.client = client;
this.action = action;
}

public void setAction(FaultInjectionClient.TriggerActionResponse actionResponse) {
this.actionResponse = actionResponse;
}

public List<JedisException> capturedExceptions() {
return exceptions;
}

public void run() {
log.info("Starting FakeApp");

int checkEachSeconds = 5;
int timeoutSeconds = 120;

while (actionResponse == null || !actionResponse.isCompleted(
Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds),
Duration.ofSeconds(timeoutSeconds))) {
try {
boolean success = action.run(client);

if (!success) break;
} catch (JedisConnectionException e) {
log.error("Error executing action", e);
exceptions.add(e);
}
}
}
}
Loading

0 comments on commit fc603d2

Please sign in to comment.