Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Scenario tests #3847

Merged
merged 10 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<version>2.38.0</version> <!-- 3.x requires Java 17 -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5-fluent</artifactId>
<version>5.3.1</version>
</dependency>
uglide marked this conversation as resolved.
Show resolved Hide resolved

<!-- circuit breaker / failover -->
<dependency>
Expand Down
6 changes: 5 additions & 1 deletion src/test/java/redis/clients/jedis/EndpointConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package redis.clients.jedis;

import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import redis.clients.jedis.util.JedisURIHelper;

Expand Down Expand Up @@ -118,7 +120,9 @@ protected String getURISchema(boolean tls) {
}

public static HashMap<String, EndpointConfig> loadFromJSON(String filePath) throws Exception {
Gson gson = new Gson();
Gson gson = new GsonBuilder().setFieldNamingPolicy(
FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create();

HashMap<String, EndpointConfig> configs;
try (FileReader reader = new FileReader(filePath)) {
configs = gson.fromJson(reader, new TypeToken<HashMap<String, EndpointConfig>>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package redis.clients.jedis.scenario;

import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.providers.ClusterConnectionProvider;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class ClusterTopologyRefreshTest {

private static final Logger log = LoggerFactory.getLogger(ClusterTopologyRefreshTest.class);

private static EndpointConfig endpoint;

private final FaultInjectionClient faultClient = new FaultInjectionClient();

@BeforeClass
public static void beforeClass() {
try {
ClusterTopologyRefreshTest.endpoint = HostAndPorts.getRedisEndpoint("re-single-shard-oss-cluster");
} catch (IllegalArgumentException e) {
log.warn("Skipping test because no Redis endpoint is configured");
org.junit.Assume.assumeTrue(false);
}
}

@Test
public void testWithPool() {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(endpoint.getHostAndPort());

JedisClientConfig config = endpoint.getClientConfigBuilder()
.socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS)
.connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build();


ClusterConnectionProvider provider = new ClusterConnectionProvider(jedisClusterNode, config, RecommendedSettings.poolConfig);
ClusterConnectionProvider spyProvider = spy(provider);

try (JedisCluster client = new JedisCluster(spyProvider,
RecommendedSettings.MAX_RETRIES, RecommendedSettings.MAX_TOTAL_RETRIES_DURATION)) {
assertEquals("Was this BDB used to run this test before?", 1,
client.getClusterNodes().size());

AtomicLong commandsExecuted = new AtomicLong();

// Start thread that imitates an application that uses the client
FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
long i = commandsExecuted.getAndIncrement();
client.set(String.valueOf(i), String.valueOf(i));
return true;
});

Thread t = new Thread(fakeApp);
t.start();

HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());
params.put("actions", "[\"reshard\",\"failover\"]");

FaultInjectionClient.TriggerActionResponse actionResponse = null;

try {
log.info("Triggering Resharding and Failover");
actionResponse = faultClient.triggerAction("sequence_of_actions", params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
}

log.info("Action id: {}", actionResponse.getActionId());
fakeApp.setAction(actionResponse);

try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

assertTrue(fakeApp.capturedExceptions().isEmpty());

log.info("Commands executed: {}", commandsExecuted.get());
for (long i = 0; i < commandsExecuted.get(); i++) {
assertTrue(client.exists(String.valueOf(i)));
}

verify(spyProvider, atLeast(2)).renewSlotCache(any(Connection.class));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package redis.clients.jedis.scenario;

import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.SafeEncoder;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.*;

@RunWith(Parameterized.class)
public class ConnectionInterruptionTest {

private static final Logger log = LoggerFactory.getLogger(ConnectionInterruptionTest.class);

private static EndpointConfig endpoint;

private final FaultInjectionClient faultClient = new FaultInjectionClient();

@Parameterized.Parameters
public static Iterable<?> data() {
return Arrays.asList("dmc_restart", "network_failure");
}

@Parameterized.Parameter
public String triggerAction;

@BeforeClass
public static void beforeClass() {
try {
ConnectionInterruptionTest.endpoint = HostAndPorts.getRedisEndpoint("re-standalone");
} catch (IllegalArgumentException e) {
log.warn("Skipping test because no Redis endpoint is configured");
org.junit.Assume.assumeTrue(false);
}
}

@Test
public void testWithPool() {
ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(),
endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig);

UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES,
RecommendedSettings.MAX_TOTAL_RETRIES_DURATION);
String keyName = "counter";
client.set(keyName, "0");
assertEquals("0", client.get(keyName));

AtomicLong commandsExecuted = new AtomicLong();

// Start thread that imitates an application that uses the client
FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
assertTrue(client.incr(keyName) > 0);
long currentCount = commandsExecuted.getAndIncrement();
log.info("Command executed {}", currentCount);
return true;
});
fakeApp.setKeepExecutingForSeconds(RecommendedSettings.DEFAULT_TIMEOUT_MS/1000 * 2);
Thread t = new Thread(fakeApp);
t.start();

HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());

FaultInjectionClient.TriggerActionResponse actionResponse = null;

try {
log.info("Triggering {}", triggerAction);
actionResponse = faultClient.triggerAction(triggerAction, params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
}

log.info("Action id: {}", actionResponse.getActionId());
fakeApp.setAction(actionResponse);

try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

log.info("Commands executed: {}", commandsExecuted.get());
assertEquals(commandsExecuted.get(), Long.parseLong(client.get(keyName)));
assertTrue(fakeApp.capturedExceptions().isEmpty());

client.close();
}

@Test
public void testWithPubSub() {
ConnectionProvider connectionProvider = new PooledConnectionProvider(endpoint.getHostAndPort(),
endpoint.getClientConfigBuilder().build(), RecommendedSettings.poolConfig);

UnifiedJedis client = new UnifiedJedis(connectionProvider, RecommendedSettings.MAX_RETRIES,
RecommendedSettings.MAX_TOTAL_RETRIES_DURATION);

AtomicLong messagesSent = new AtomicLong();
AtomicLong messagesReceived = new AtomicLong();

final Thread subscriberThread = getSubscriberThread(messagesReceived, connectionProvider);

// Start thread that imitates a publisher that uses the client
FakeApp fakeApp = new FakeApp(client, (UnifiedJedis c) -> {
log.info("Publishing message");
long consumed = client.publish("test", String.valueOf(messagesSent.getAndIncrement()));
return consumed > 0;
});
fakeApp.setKeepExecutingForSeconds(10);
Thread t = new Thread(fakeApp);
t.start();

HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());

FaultInjectionClient.TriggerActionResponse actionResponse = null;

try {
log.info("Triggering {}", triggerAction);
actionResponse = faultClient.triggerAction(triggerAction, params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
}

log.info("Action id: {}", actionResponse.getActionId());
fakeApp.setAction(actionResponse);

try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (subscriberThread.isAlive())
subscriberThread.interrupt();

assertEquals(messagesSent.get() - 1, messagesReceived.get());
assertTrue(fakeApp.capturedExceptions().isEmpty());

client.close();
}

private static Thread getSubscriberThread(AtomicLong messagesReceived,
ConnectionProvider connectionProvider) {
final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {

@Override
public void onMessage(String channel, String message) {
messagesReceived.incrementAndGet();
log.info("Received message: {}", message);
}

@Override
protected String encode(byte[] raw) {
return SafeEncoder.encode(raw);
}
};

final Thread subscriberThread = new Thread(() -> {
try {
pubSub.proceed(connectionProvider.getConnection(), "test");
fail("PubSub should have been interrupted");
} catch (JedisConnectionException e) {
log.info("Expected exception in Subscriber: {}", e.getMessage());
assertTrue(e.getMessage().contains("Unexpected end of stream."));
}
});
subscriberThread.start();
return subscriberThread;
}
}
65 changes: 65 additions & 0 deletions src/test/java/redis/clients/jedis/scenario/FakeApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package redis.clients.jedis.scenario;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class FakeApp implements Runnable {

private static final Logger log = LoggerFactory.getLogger(FakeApp.class);

public void setKeepExecutingForSeconds(int keepExecutingForSeconds) {
this.keepExecutingForSeconds = keepExecutingForSeconds;
}

private int keepExecutingForSeconds = 60;

private FaultInjectionClient.TriggerActionResponse actionResponse = null;
private final UnifiedJedis client;
private final ExecutedAction action;
private List<JedisException> exceptions = new ArrayList<>();

@FunctionalInterface
public interface ExecutedAction {
boolean run(UnifiedJedis client);
}

public FakeApp(UnifiedJedis client, ExecutedAction action) {
this.client = client;
this.action = action;
}

public void setAction(FaultInjectionClient.TriggerActionResponse actionResponse) {
this.actionResponse = actionResponse;
}

public List<JedisException> capturedExceptions() {
return exceptions;
}

public void run() {
log.info("Starting FakeApp");

int checkEachSeconds = 5;
int timeoutSeconds = 120;

while (actionResponse == null || !actionResponse.isCompleted(
Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds),
Duration.ofSeconds(timeoutSeconds))) {
try {
boolean success = action.run(client);

if (!success) break;
} catch (JedisConnectionException e) {
log.error("Error executing action", e);
exceptions.add(e);
}
}
}
}
Loading
Loading