Skip to content

Commit

Permalink
Addresses issues #779 and #775.
Browse files Browse the repository at this point in the history
Conflicts:
	src/main/java/redis/clients/jedis/Connection.java
	src/main/java/redis/clients/jedis/Protocol.java
  • Loading branch information
radifalco authored and HeartSaVioR committed Nov 14, 2014
1 parent cbf26df commit be79ce7
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 97 deletions.
17 changes: 9 additions & 8 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package redis.clients.jedis;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream;
import redis.clients.util.SafeEncoder;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -8,13 +15,6 @@
import java.util.ArrayList;
import java.util.List;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.util.RedisInputStream;
import redis.clients.util.RedisOutputStream;
import redis.clients.util.SafeEncoder;

public class Connection implements Closeable {
private String host;
private int port = Protocol.DEFAULT_PORT;
Expand Down Expand Up @@ -176,7 +176,7 @@ public boolean isConnected() {
&& !socket.isOutputShutdown();
}

protected String getStatusCodeReply() {
public String getStatusCodeReply() {
flush();
pipelinedCommands--;
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
Expand Down Expand Up @@ -286,4 +286,5 @@ protected Object readProtocolWithCheckingBroken() {
throw exc;
}
}

}
70 changes: 32 additions & 38 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,67 +127,61 @@ private static String[] parseTargetHostAndSlot(
}

private static Object process(final RedisInputStream is) {
try {
byte b = is.readByte();
if (b == MINUS_BYTE) {
processError(is);
} else if (b == ASTERISK_BYTE) {
return processMultiBulkReply(is);
} else if (b == COLON_BYTE) {
return processInteger(is);
} else if (b == DOLLAR_BYTE) {
return processBulkReply(is);
} else if (b == PLUS_BYTE) {
return processStatusCodeReply(is);
} else {
throw new JedisConnectionException("Unknown reply: " + (char) b);
}
} catch (IOException e) {
throw new JedisConnectionException(e);

final byte b = is.readByte();
if (b == PLUS_BYTE) {
return processStatusCodeReply(is);
} else if (b == DOLLAR_BYTE) {
return processBulkReply(is);
} else if (b == ASTERISK_BYTE) {
return processMultiBulkReply(is);
} else if (b == COLON_BYTE) {
return processInteger(is);
} else if (b == MINUS_BYTE) {
processError(is);
return null;
} else {
throw new JedisConnectionException("Unknown reply: " + (char) b);
}
return null;
}

private static byte[] processStatusCodeReply(final RedisInputStream is) {
return SafeEncoder.encode(is.readLine());
return is.readLineBytes();
}

private static byte[] processBulkReply(final RedisInputStream is) {
int len = Integer.parseInt(is.readLine());
final int len = is.readIntCrLf();
if (len == -1) {
return null;
}
byte[] read = new byte[len];

final byte[] read = new byte[len];
int offset = 0;
try {
while (offset < len) {
int size = is.read(read, offset, (len - offset));
if (size == -1)
throw new JedisConnectionException(
"It seems like server has closed the connection.");
offset += size;
}
// read 2 more bytes for the command delimiter
is.readByte();
is.readByte();
} catch (IOException e) {
throw new JedisConnectionException(e);
while (offset < len) {
final int size = is.read(read, offset, (len - offset));
if (size == -1)
throw new JedisConnectionException(
"It seems like server has closed the connection.");
offset += size;
}

// read 2 more bytes for the command delimiter
is.readByte();
is.readByte();

return read;
}

private static Long processInteger(final RedisInputStream is) {
String num = is.readLine();
return Long.valueOf(num);
return is.readLongCrLf();
}

private static List<Object> processMultiBulkReply(final RedisInputStream is) {
int num = Integer.parseInt(is.readLine());
final int num = is.readIntCrLf();
if (num == -1) {
return null;
}
List<Object> ret = new ArrayList<Object>(num);
final List<Object> ret = new ArrayList<Object>(num);
for (int i = 0; i < num; i++) {
try {
ret.add(process(is));
Expand Down
204 changes: 153 additions & 51 deletions src/main/java/redis/clients/util/RedisInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package redis.clients.util;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;

import redis.clients.jedis.exceptions.JedisConnectionException;

/**
* This class assumes (to some degree) that we are reading a RESP stream. As such it assumes
* certain conventions regarding CRLF line termination. It also assumes that if the Protocol
* layer requires a byte that if that byte is not there it is a stream error.
*/
public class RedisInputStream extends FilterInputStream {

protected final byte buf[];
Expand All @@ -40,73 +43,172 @@ public RedisInputStream(InputStream in) {
this(in, 8192);
}

public byte readByte() throws IOException {
if (count == limit) {
fill();
}

public byte readByte() throws JedisConnectionException {
ensureFill();
return buf[count++];
}

public String readLine() {
int b;
byte c;
StringBuilder sb = new StringBuilder();

try {
while (true) {
if (count == limit) {
fill();
}
if (limit == -1)
break;
final StringBuilder sb = new StringBuilder();
while (true) {
ensureFill();

byte b = buf[count++];
if (b == '\r') {
ensureFill(); // Must be one more byte

b = buf[count++];
if (b == '\r') {
if (count == limit) {
fill();
}

if (limit == -1) {
sb.append((char) b);
break;
}

c = buf[count++];
if (c == '\n') {
break;
}
sb.append((char) b);
sb.append((char) c);
} else {
sb.append((char) b);
byte c = buf[count++];
if (c == '\n') {
break;
}
sb.append((char) b);
sb.append((char) c);
} else {
sb.append((char) b);
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
String reply = sb.toString();

final String reply = sb.toString();
if (reply.length() == 0) {
throw new JedisConnectionException(
"It seems like server has closed the connection.");
throw new JedisConnectionException("It seems like server has closed the connection.");
}

return reply;
}

public int read(byte[] b, int off, int len) throws IOException {
if (count == limit) {
fill();
if (limit == -1)
return -1;
public byte[] readLineBytes() {

/* This operation should only require one fill. In that typical
case we optimize allocation and copy of the byte array. In the
edge case where more than one fill is required then we take a
slower path and expand a byte array output stream as is
necessary. */

ensureFill();

int pos = count;
final byte[] buf = this.buf;
while (true) {
if (pos == limit) {
return readLineBytesSlowly();
}

if (buf[pos++] == '\r') {
if (pos == limit) {
return readLineBytesSlowly();
}

if (buf[pos++] == '\n') {
break;
}
}
}

final int N = (pos - count) - 2;
final byte[] line = new byte[N];
System.arraycopy(buf, count, line, 0, N);
count = pos;
return line;
}

/**
* Slow path in case a line of bytes cannot be read in one #fill() operation. This is still faster
* than creating the StrinbBuilder, String, then encoding as byte[] in Protocol, then decoding back
* into a String.
*/
private byte[] readLineBytesSlowly() {
ByteArrayOutputStream bout = null;
while (true) {
ensureFill();

byte b = buf[count++];
if (b == '\r') {
ensureFill(); // Must be one more byte

byte c = buf[count++];
if (c == '\n') {
break;
}

if (bout == null) {
bout = new ByteArrayOutputStream(16);
}

bout.write(b);
bout.write(c);
} else {
if (bout == null) {
bout = new ByteArrayOutputStream(16);
}

bout.write(b);
}
}

return bout == null ? new byte[0] : bout.toByteArray();
}

public int readIntCrLf() {
return (int)readLongCrLf();
}

public long readLongCrLf() {
final byte[] buf = this.buf;

ensureFill();

final boolean isNeg = buf[count] == '-';
if (isNeg) {
++count;
}

long value = 0;
while (true) {
ensureFill();

final int b = buf[count++];
if (b == '\r') {
ensureFill();

if (buf[count++] != '\n') {
throw new JedisConnectionException("Unexpected character!");
}

break;
}
else {
value = value * 10 + b - '0';
}
}

return (isNeg ? -value : value);
}

public int read(byte[] b, int off, int len) throws JedisConnectionException {
ensureFill();

final int length = Math.min(limit - count, len);
System.arraycopy(buf, count, b, off, length);
count += length;
return length;
}

private void fill() throws IOException {
limit = in.read(buf);
count = 0;
/**
* This methods assumes there are required bytes to be read. If we cannot read
* anymore bytes an exception is thrown to quickly ascertain that the stream
* was smaller than expected.
*/
private void ensureFill() throws JedisConnectionException {
if (count >= limit) {
try {
limit = in.read(buf);
count = 0;
if (limit == -1) {
throw new JedisConnectionException("Unexpected end of stream.");
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
}
}
Loading

0 comments on commit be79ce7

Please sign in to comment.