Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for client-side caching - phase 2 #3673

Merged
merged 6 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 1 addition & 18 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,7 @@ protected Object readProtocolWithCheckingBroken() {
}

try {
Protocol.readPushes(inputStream, clientSideCache);
return Protocol.read(inputStream);
// Object read = Protocol.read(inputStream);
// System.out.println("REPLY: " + SafeEncoder.encodeObject(read));
// return read;
return Protocol.read(inputStream, clientSideCache);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
Expand All @@ -376,19 +372,6 @@ public List<Object> getMany(final int count) {
return responses;
}

protected void readPushesWithCheckingBroken() {
if (broken) {
throw new JedisConnectionException("Attempting to read pushes from a broken connection");
}

try {
Protocol.readPushes(inputStream, clientSideCache);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}

/**
* Check if the client name libname, libver, characters are legal
* @param info the name
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/redis/clients/jedis/JedisClientSideCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public JedisClientSideCache(final HostAndPort hostPort, final JedisClientConfig

private void clientTrackingOn() {
String reply = connection.executeCommand(new CommandObject<>(
new CommandArguments(Protocol.Command.CLIENT).add("TRACKING").add("ON").add("BCAST"),
new CommandArguments(Protocol.Command.CLIENT).add("TRACKING").add("ON"),
Copy link
Contributor

@chayim chayim Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, though make sure you're cognizant of (hopefully) future support for broadcast and friends.

BuilderFactory.STRING));
if (!"OK".equals(reply)) {
throw new JedisException("Could not enable client tracking. Reply: " + reply);
Expand All @@ -33,7 +33,6 @@ private void clientTrackingOn() {

@Override
public String get(String key) {
connection.readPushesWithCheckingBroken();
String cachedValue = cache.getValue(key);
if (cachedValue != null) return cachedValue;

Expand Down
29 changes: 17 additions & 12 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

import redis.clients.jedis.exceptions.*;
import redis.clients.jedis.args.Rawable;
Expand Down Expand Up @@ -171,15 +170,6 @@ private static Object process(final RedisInputStream is) {
}
}

private static void processPush(final RedisInputStream is, ClientSideCache cache) {
List<Object> list = processMultiBulkReply(is);
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
}
}

private static byte[] processBulkReply(final RedisInputStream is) {
final int len = is.readIntCrLf();
if (len == -1) {
Expand Down Expand Up @@ -232,20 +222,35 @@ private static List<KeyValue> processMapKeyValueReply(final RedisInputStream is)
return ret;
}

@Deprecated
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
public static Object read(final RedisInputStream is) {
return process(is);
}

static void readPushes(final RedisInputStream is, final ClientSideCache cache) {
public static Object read(final RedisInputStream is, final ClientSideCache cache) {
readPushes(is, cache);
return process(is);
}

private static void readPushes(final RedisInputStream is, final ClientSideCache cache) {
if (cache != null) {
//System.out.println("PEEK: " + is.peekByte());
while (Objects.equals(GREATER_THAN_BYTE, is.peekByte())) {
while (is.peek(GREATER_THAN_BYTE)) {
is.readByte();
processPush(is, cache);
}
}
}

private static void processPush(final RedisInputStream is, ClientSideCache cache) {
List<Object> list = processMultiBulkReply(is);
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
cache.invalidateKeys((List) list.get(1));
}
}

public static final byte[] toByteArray(final boolean value) {
return value ? BYTES_TRUE : BYTES_FALSE;
}
Expand Down
20 changes: 3 additions & 17 deletions src/main/java/redis/clients/jedis/util/RedisInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public RedisInputStream(InputStream in) {
this(in, INPUT_BUFFER_SIZE);
}

public Byte peekByte() {
ensureFillSafe();
return buf[count];
public boolean peek(byte b) throws JedisConnectionException {
ensureFill(); // in current design, at least one reply is expected. so ensureFillSafe() is not necessary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, what happens if there is no reply? It shouldn't occur, but maybe there's something we can do for resiliency here. At the very least raise a data specific exception so that we can know this happened?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chayim A JedisConnectionException wrapping a ReadTimeoutException will be thrown.

return buf[count] == b;
}

public byte readByte() throws JedisConnectionException {
Expand Down Expand Up @@ -257,18 +257,4 @@ private void ensureFill() throws JedisConnectionException {
}
}
}

private void ensureFillSafe() {
if (count >= limit) {
try {
limit = in.read(buf);
count = 0;
if (limit == -1) {
throw new JedisConnectionException("Unexpected end of stream.");
}
} catch (IOException e) {
// do nothing
}
}
}
}
60 changes: 50 additions & 10 deletions src/test/java/redis/clients/jedis/JedisClientSideCacheTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package redis.clients.jedis;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -17,7 +19,7 @@ public class JedisClientSideCacheTest {

@Before
public void setUp() throws Exception {
jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().timeoutMillis(500).password("foobared").build());
jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().password("foobared").build());
jedis.flushAll();
}

Expand All @@ -26,45 +28,83 @@ public void tearDown() throws Exception {
jedis.close();
}

private static final JedisClientConfig configForCache = DefaultJedisClientConfig.builder()
.resp3().socketTimeoutMillis(20).password("foobared").build();
private static final JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().resp3().password("foobared").build();

@Test
public void simple() {
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache)) {
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig)) {
jedis.set("foo", "bar");
assertEquals("bar", jCache.get("foo"));
jedis.del("foo");
assertNull(jCache.get("foo"));
assertThat(jCache.get("foo"), Matchers.oneOf("bar", null)); // ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be achieved without adding this import?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chayim We'd have to write a "utility" method.
But why bother? We already have these as our test dependency and being used in some other tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General desire to have fewer dependencies.. even if it's just for test. Not a hard requirement.

}
}

@Test
public void simpleMock() {
public void simpleMoreAndMock() {
ClientSideCache cache = Mockito.mock(ClientSideCache.class);
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache, cache)) {
Mockito.when(cache.getValue("foo")).thenReturn(null, "bar", null);

try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig, cache)) {
jedis.set("foo", "bar");

assertEquals("bar", jCache.get("foo"));

jedis.del("foo");

assertEquals("bar", jCache.get("foo"));

// there should be an invalid pending; any connection command will make it read
jCache.ping();

assertNull(jCache.get("foo"));
}

InOrder inOrder = Mockito.inOrder(cache);
inOrder.verify(cache).invalidateKeys(Mockito.notNull());
inOrder.verify(cache).getValue("foo");
inOrder.verify(cache).setKey("foo", "bar");
inOrder.verify(cache).getValue("foo");
inOrder.verify(cache).invalidateKeys(Mockito.notNull());
inOrder.verify(cache).getValue("foo");
inOrder.verifyNoMoreInteractions();
}

@Test
public void flushall() {
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, configForCache)) {
public void flushAll() {
try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig)) {
jedis.set("foo", "bar");
assertEquals("bar", jCache.get("foo"));
jedis.flushAll();
assertThat(jCache.get("foo"), Matchers.oneOf("bar", null)); // ?
}
}

@Test
public void flushAllMoreAndMock() {
ClientSideCache cache = Mockito.mock(ClientSideCache.class);
Mockito.when(cache.getValue("foo")).thenReturn(null, "bar", null);

try (JedisClientSideCache jCache = new JedisClientSideCache(hnp, clientConfig, cache)) {
jedis.set("foo", "bar");

assertEquals("bar", jCache.get("foo"));

jedis.flushAll();

assertEquals("bar", jCache.get("foo"));

// there should be an invalid pending; any connection command will make it read
jCache.ping();

assertNull(jCache.get("foo"));
}

InOrder inOrder = Mockito.inOrder(cache);
inOrder.verify(cache).getValue("foo");
inOrder.verify(cache).setKey("foo", "bar");
inOrder.verify(cache).getValue("foo");
inOrder.verify(cache).invalidateKeys(Mockito.isNull());
inOrder.verify(cache).getValue("foo");
inOrder.verifyNoMoreInteractions();
}
}
Loading