diff --git a/src/main/java/org/mariadb/jdbc/client/impl/StandardClient.java b/src/main/java/org/mariadb/jdbc/client/impl/StandardClient.java index 451dbf4ba..04a1e48b2 100644 --- a/src/main/java/org/mariadb/jdbc/client/impl/StandardClient.java +++ b/src/main/java/org/mariadb/jdbc/client/impl/StandardClient.java @@ -104,10 +104,10 @@ public StandardClient( // ********************************************************************** // creating socket // ********************************************************************** - OutputStream out = new BufferedOutputStream(socket.getOutputStream(), 16384); + OutputStream out = socket.getOutputStream(); InputStream in = conf.useReadAheadInput() - ? new ReadAheadBufferedStream(socket.getInputStream(), lock) + ? new ReadAheadBufferedStream(socket.getInputStream()) : new BufferedInputStream(socket.getInputStream(), 16384); assignStream(out, in, conf, null); @@ -165,7 +165,7 @@ public StandardClient( out = new BufferedOutputStream(sslSocket.getOutputStream(), 16384); in = conf.useReadAheadInput() - ? new ReadAheadBufferedStream(sslSocket.getInputStream(), lock) + ? new ReadAheadBufferedStream(sslSocket.getInputStream()) : new BufferedInputStream(sslSocket.getInputStream(), 16384); assignStream(out, in, conf, handshake.getThreadId()); } @@ -199,7 +199,7 @@ public StandardClient( if ((clientCapabilities & Capabilities.COMPRESS) != 0) { assignStream( new CompressOutputStream(out, compressionSequence), - new CompressInputStream(in, compressionSequence, lock), + new CompressInputStream(in, compressionSequence), conf, handshake.getThreadId()); } diff --git a/src/main/java/org/mariadb/jdbc/client/socket/impl/CompressInputStream.java b/src/main/java/org/mariadb/jdbc/client/socket/impl/CompressInputStream.java index f07781583..ee39fc18f 100644 --- a/src/main/java/org/mariadb/jdbc/client/socket/impl/CompressInputStream.java +++ b/src/main/java/org/mariadb/jdbc/client/socket/impl/CompressInputStream.java @@ -24,18 +24,19 @@ public class CompressInputStream extends InputStream { private int end; private int pos; private volatile byte[] buf; - private final ReentrantLock lock; /** * Constructor. When this handler is used, driver expect packet with 7 byte compression header * + * Implementation doesn't use synchronized/semaphore because all used are already locked by + * Statement/PreparedStatement Reentrant lock + * * @param in socket input stream * @param compressionSequence compression sequence */ - public CompressInputStream(InputStream in, MutableByte compressionSequence, ReentrantLock lock) { + public CompressInputStream(InputStream in, MutableByte compressionSequence) { this.in = in; this.sequence = compressionSequence; - this.lock = lock; } /** @@ -90,23 +91,18 @@ public int read(byte[] b, int off, int len) throws IOException { } int totalReads = 0; - lock.lock(); - try { - do { - if (end - pos <= 0) { - retrieveBuffer(); - } - // copy internal value to buf. - int copyLength = Math.min(len - totalReads, end - pos); - System.arraycopy(buf, pos, b, off + totalReads, copyLength); - pos += copyLength; - totalReads += copyLength; - } while (totalReads < len && super.available() > 0); + do { + if (end - pos <= 0) { + retrieveBuffer(); + } + // copy internal value to buf. + int copyLength = Math.min(len - totalReads, end - pos); + System.arraycopy(buf, pos, b, off + totalReads, copyLength); + pos += copyLength; + totalReads += copyLength; + } while (totalReads < len && super.available() > 0); - return totalReads; - } finally { - lock.unlock(); - } + return totalReads; } private void retrieveBuffer() throws IOException { @@ -225,12 +221,7 @@ public long skip(long n) throws IOException { */ @Override public int available() throws IOException { - lock.lock(); - try { - return in.available(); - } finally { - lock.unlock(); - } + return in.available(); } /** @@ -270,12 +261,7 @@ public void close() throws IOException { */ @Override public void mark(int readlimit) { - lock.lock(); - try { - in.mark(readlimit); - } finally { - lock.unlock(); - } + in.mark(readlimit); } /** @@ -317,12 +303,7 @@ public void mark(int readlimit) { */ @Override public void reset() throws IOException { - lock.lock(); - try { - in.reset(); - } finally { - lock.unlock(); - } + in.reset(); } /** diff --git a/src/main/java/org/mariadb/jdbc/client/socket/impl/ReadAheadBufferedStream.java b/src/main/java/org/mariadb/jdbc/client/socket/impl/ReadAheadBufferedStream.java index c0cdc64f2..76b0bab93 100644 --- a/src/main/java/org/mariadb/jdbc/client/socket/impl/ReadAheadBufferedStream.java +++ b/src/main/java/org/mariadb/jdbc/client/socket/impl/ReadAheadBufferedStream.java @@ -16,25 +16,22 @@ public class ReadAheadBufferedStream extends FilterInputStream { private static final int BUF_SIZE = 16384; private final byte[] buf; - private final ReentrantLock lock; private int end; private int pos; /** * Constructor * + * Implementation doesn't use synchronized/semaphore because all used are already locked by + * Statement/PreparedStatement Reentrant lock + * * @param in socket input stream */ - public ReadAheadBufferedStream(InputStream in, ReentrantLock lock) { + public ReadAheadBufferedStream(InputStream in) { super(in); buf = new byte[BUF_SIZE]; end = 0; pos = 0; - this.lock = lock; - } - - public ReentrantLock getLock() { - return lock; } /** @@ -51,44 +48,39 @@ public int read(byte[] externalBuf, int off, int len) throws IOException { if (len == 0) { return 0; } - lock.lock(); - try { - int totalReads = 0; - while (true) { - - // read - if (end - pos <= 0) { - if (len - totalReads >= buf.length) { - // buf length is less than asked byte and buf is empty - // => filling directly into external buf - int reads = super.read(externalBuf, off + totalReads, len - totalReads); - if (reads <= 0) { - return (totalReads == 0) ? -1 : totalReads; - } - return totalReads + reads; - - } else { - - // filling internal buf - fillingBuffer(len - totalReads); - if (end <= 0) { - return (totalReads == 0) ? -1 : totalReads; - } + int totalReads = 0; + while (true) { + + // read + if (end - pos <= 0) { + if (len - totalReads >= buf.length) { + // buf length is less than asked byte and buf is empty + // => filling directly into external buf + int reads = super.read(externalBuf, off + totalReads, len - totalReads); + if (reads <= 0) { + return (totalReads == 0) ? -1 : totalReads; } - } + return totalReads + reads; - // copy internal value to buf. - int copyLength = Math.min(len - totalReads, end - pos); - System.arraycopy(buf, pos, externalBuf, off + totalReads, copyLength); - pos += copyLength; - totalReads += copyLength; + } else { - if (totalReads >= len || super.available() <= 0) { - return totalReads; + // filling internal buf + fillingBuffer(len - totalReads); + if (end <= 0) { + return (totalReads == 0) ? -1 : totalReads; + } } } - } finally{ - lock.unlock(); + + // copy internal value to buf. + int copyLength = Math.min(len - totalReads, end - pos); + System.arraycopy(buf, pos, externalBuf, off + totalReads, copyLength); + pos += copyLength; + totalReads += copyLength; + + if (totalReads >= len || super.available() <= 0) { + return totalReads; + } } } @@ -115,12 +107,7 @@ public void close() throws IOException { } public int available() throws IOException { - lock.lock(); - try { - return end - pos + super.available(); - } finally { - lock.unlock(); - } + return end - pos + super.available(); } public int read() throws IOException {