Skip to content

Commit

Permalink
Improve PumpReader surrogate char handling (#720)
Browse files Browse the repository at this point in the history
Additionally changes the reading logic to consume pending data even if the
reader has been closed.
  • Loading branch information
Marcono1234 committed Oct 17, 2021
1 parent e2313df commit eac455e
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 10 deletions.
80 changes: 70 additions & 10 deletions terminal/src/main/java/org/jline/utils/PumpReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,33 +53,69 @@ public java.io.InputStream createInputStream(Charset charset) {
return new InputStream(this, charset);
}

private boolean wait(CharBuffer buffer) throws InterruptedIOException {
if (closed) {
return false;
/**
* Blocks until more input is available, even if {@link #readBuffer} already
* contains some chars; or until the reader is closed.
*
* @return true if more input is available, false if no additional input is
* available and the reader is closed
* @throws InterruptedIOException If {@link #wait()} is interrupted
*/
private boolean waitForMoreInput() throws InterruptedIOException {
if (!writeBuffer.hasRemaining()) {
throw new AssertionError("No space in write buffer");
}

while (!buffer.hasRemaining()) {
// Wake up waiting readers/writers
int oldRemaining = readBuffer.remaining();

do {
if (closed) {
return false;
}

// Wake up waiting writers
notifyAll();

try {
wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} while (readBuffer.remaining() <= oldRemaining);

return true;
}

/**
* Waits until {@code buffer.hasRemaining() == true}, or it is false and
* the reader is {@link #closed}.
*
* @return true if {@code buffer.hasRemaining() == true}; false otherwise
* when reader is closed
*/
private boolean wait(CharBuffer buffer) throws InterruptedIOException {
while (!buffer.hasRemaining()) {
if (closed) {
return false;
}

// Wake up waiting readers/writers
notifyAll();

try {
wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

return true;
}

/**
* Blocks until more input is available or the reader is closed.
* Blocks until input is available or the reader is closed.
*
* @return true if more input is available, false if the reader is closed
* @return true if input is available, false if no input is available and the reader is closed
* @throws InterruptedIOException If {@link #wait()} is interrupted
*/
private boolean waitForInput() throws InterruptedIOException {
Expand All @@ -94,7 +130,8 @@ private boolean waitForInput() throws InterruptedIOException {
* @throws ClosedException If the reader was closed
*/
private void waitForBufferSpace() throws InterruptedIOException, ClosedException {
if (!wait(writeBuffer)) {
// Check `closed` to throw even if writer buffer has space available
if (!wait(writeBuffer) || closed) {
throw new ClosedException();
}
}
Expand Down Expand Up @@ -205,10 +242,33 @@ public synchronized int read(CharBuffer target) throws IOException {
}

private void encodeBytes(CharsetEncoder encoder, ByteBuffer output) throws IOException {
int oldPos = output.position();
CoderResult result = encoder.encode(readBuffer, output, false);
if (rewindReadBuffer() && result.isUnderflow()) {
encoder.encode(readBuffer, output, false);
int encodedCount = output.position() - oldPos;

if (result.isUnderflow()) {
boolean hasMoreInput = rewindReadBuffer();
boolean reachedEndOfInput = false;

// If encoding did not make any progress must block for more input
if (encodedCount == 0 && !hasMoreInput) {
reachedEndOfInput = !waitForMoreInput();
}

result = encoder.encode(readBuffer, output, reachedEndOfInput);
if (result.isError()) {
result.throwException();
}
if (!reachedEndOfInput && output.position() - oldPos == 0) {
throw new AssertionError("Failed to encode any chars");
}
rewindReadBuffer();
} else if (result.isOverflow()) {
if (encodedCount == 0) {
throw new AssertionError("Output buffer has not enough space");
}
} else {
result.throwException();
}
}

Expand Down
49 changes: 49 additions & 0 deletions terminal/src/test/java/org/jline/utils/PumpReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.lang.Thread.State;
import java.nio.charset.StandardCharsets;

public class PumpReaderTest {
Expand Down Expand Up @@ -68,4 +71,50 @@ public void testSmallBuffer() throws IOException {
assertEquals("\uD83D\uDE0A㐀", reader.readLine());
}

@Test
public void testSplitSurrogatePair() throws IOException {
PumpReader pump = new PumpReader();
Writer writer = pump.getWriter();
// Only provide high surrogate
writer.write('\uD83D');
Thread thread = Thread.currentThread();

new Thread(() -> {
// Busy wait until InputStream blocks for more chars to encode
// (rather brittle, but cannot be easily implemented in a different way)
while (thread.getState() != State.WAITING && thread.getState() != State.TIMED_WAITING) {
Thread.yield();
}
try {
// Complete the surrogate pair
writer.write('\uDE0A');
writer.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).start();

InputStream inputStream = pump.createInputStream(StandardCharsets.UTF_8);
byte[] expectedEncoded = "\uD83D\uDE0A".getBytes(StandardCharsets.UTF_8);
assertEquals(4, expectedEncoded.length); // verify that test is correctly implemented
assertEquals(expectedEncoded[0], inputStream.read());
assertEquals(expectedEncoded[1], inputStream.read());
assertEquals(expectedEncoded[2], inputStream.read());
assertEquals(expectedEncoded[3], inputStream.read());
assertEquals(-1, inputStream.read());
}

@Test
public void testTrailingHighSurrogate() throws IOException {
PumpReader pump = new PumpReader();
Writer writer = pump.getWriter();
writer.write('\uD83D');
writer.close();

InputStream inputStream = pump.createInputStream(StandardCharsets.UTF_8);
// Encoder should have replaced incomplete trailing high surrogate
assertEquals('?', inputStream.read());
assertEquals(-1, inputStream.read());
}

}

0 comments on commit eac455e

Please sign in to comment.