Skip to content

Commit

Permalink
[JENKINS-8592] let the writer end know if the reader end of the pipe
Browse files Browse the repository at this point in the history
failed.
  • Loading branch information
kohsuke committed Feb 5, 2011
1 parent c628bdd commit 73a8007
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 73 deletions.
9 changes: 5 additions & 4 deletions remoting/src/main/java/hudson/remoting/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -599,9 +599,6 @@ public boolean preloadJar(ClassLoader local, URL... jars) throws IOException, In
}

/*package*/ PipeWindow getPipeWindow(int oid) {
if (!remoteCapability.supportsPipeThrottling())
return PipeWindow.FAKE;

synchronized (pipeWindows) {
Key k = new Key(oid);
WeakReference<PipeWindow> v = pipeWindows.get(k);
Expand All @@ -611,7 +608,11 @@ public boolean preloadJar(ClassLoader local, URL... jars) throws IOException, In
return w;
}

Real w = new Real(k, PIPE_WINDOW_SIZE);
PipeWindow w;
if (!remoteCapability.supportsPipeThrottling())
w = new Real(k, PIPE_WINDOW_SIZE);
else
w = new PipeWindow.Fake();
pipeWindows.put(k,new WeakReference<PipeWindow>(w));
return w;
}
Expand Down
8 changes: 8 additions & 0 deletions remoting/src/main/java/hudson/remoting/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ protected Command() {
this(true);
}

protected Command(Throwable cause) {
this.createdAt = new Source(cause);
}

/**
* @param recordCreatedAt
* If false, skip the recording of where the command is created. This makes the trouble-shooting
Expand Down Expand Up @@ -74,6 +78,10 @@ private final class Source extends Exception {
public Source() {
}

private Source(Throwable cause) {
super(cause);
}

public String toString() {
return "Command "+Command.this.toString()+" created at";
}
Expand Down
14 changes: 1 addition & 13 deletions remoting/src/main/java/hudson/remoting/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,7 @@ private Pipe(InputStream in, OutputStream out) {
* Gets the reading end of the pipe.
*/
public InputStream getIn() {
return new FilterInputStream(in) {
@Override
public void close() throws IOException {
try {
// Since closing the reading side does not stop the writing side. We read till the stream is done.
final byte[] buffer = new byte[4096];
while(read(buffer) != -1) {
}
} finally {
super.close();
}
}
};
return in;
}

/**
Expand Down
39 changes: 34 additions & 5 deletions remoting/src/main/java/hudson/remoting/PipeWindow.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/
package hudson.remoting;

import java.io.IOException;
import java.io.OutputStream;
import java.util.logging.Logger;

Expand All @@ -47,35 +48,61 @@
* @author Kohsuke Kawaguchi
*/
abstract class PipeWindow {
protected Throwable dead;

abstract void increase(int delta);

abstract int peek();

/**
* Blocks until some space becomes available.
*
* @throws IOException
* If we learned that there is an irrecoverable problem on the remote side that prevents us from writing.
* @throws InterruptedException
* If a thread was interrupted while blocking.
*/
abstract int get() throws InterruptedException;
abstract int get() throws InterruptedException, IOException;

abstract void decrease(int delta);

/**
* Indicates that the remote end has died and all the further send attempt should fail.
*/
void dead(Throwable cause) {
this.dead = cause;
}

/**
* If we already know that the remote end had developed a problem, throw an exception.
* Otherwise no-op.
*/
protected void checkDeath() throws IOException {
if (dead!=null)
// the remote end failed to write.
throw (IOException)new IOException("Pipe is already closed").initCause(dead);
}


/**
* Fake implementation used when the receiver side doesn't support throttling.
*/
static final PipeWindow FAKE = new PipeWindow() {
static class Fake extends PipeWindow {
void increase(int delta) {
}

int peek() {
return Integer.MAX_VALUE;
}

int get() throws InterruptedException {
int get() throws InterruptedException, IOException {
checkDeath();
return Integer.MAX_VALUE;
}

void decrease(int delta) {
}
};
}

static final class Key {
public final int oid;
Expand Down Expand Up @@ -134,13 +161,15 @@ public synchronized int peek() {
* to avoid fragmenting the window size. That is, if a bunch of small ACKs come in a sequence,
* bundle them up into a bigger size before making a call.
*/
public int get() throws InterruptedException {
public int get() throws InterruptedException, IOException {
checkDeath();
synchronized (this) {
if (available>0)
return available;

while (available==0) {
wait();
checkDeath();
}
}

Expand Down
39 changes: 37 additions & 2 deletions remoting/src/main/java/hudson/remoting/ProxyOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,23 @@ public void run() {
try {
os.write(buf);
} catch (IOException e) {
// ignore errors
LOGGER.log(Level.WARNING, "Failed to write to stream",e);
try {
channel.send(new NotifyDeadWriter(e,oid));
} catch (ChannelClosedException x) {
// the other direction can be already closed if the connection
// shut down is initiated from this side. In that case, remain silent.
} catch (IOException x) {
// ignore errors
LOGGER.log(Level.WARNING, "Failed to notify the sender that the write end is dead",x);
LOGGER.log(Level.WARNING, "... the failed write was:",e);
}
} finally {
if (channel.remoteCapability.supportsPipeThrottling()) {
try {
channel.send(new Ack(oid,buf.length));
} catch (ChannelClosedException x) {
// the other direction can be already closed if the connection
// shut down is initiated from this side. In that case, remain silent.
} catch (IOException e) {
// ignore errors
LOGGER.log(Level.WARNING, "Failed to ack the stream",e);
Expand Down Expand Up @@ -329,5 +340,29 @@ public String toString() {
private static final long serialVersionUID = 1L;
}

/**
* {@link Command} to notify the sender that the receiver is dead.
*/
private static final class NotifyDeadWriter extends Command {
private final int oid;

private NotifyDeadWriter(Throwable cause, int oid) {
super(cause);
this.oid = oid;
}

@Override
protected void execute(Channel channel) {
PipeWindow w = channel.getPipeWindow(oid);
w.dead(createdAt.getCause());
}

public String toString() {
return "Pipe.Dead("+oid+")";
}

private static final long serialVersionUID = 1L;
}

private static final Logger LOGGER = Logger.getLogger(ProxyOutputStream.class.getName());
}
83 changes: 34 additions & 49 deletions remoting/src/test/java/hudson/remoting/PipeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
package hudson.remoting;

import hudson.remoting.ChannelRunner.InProcessCompatibilityMode;
import hudson.remoting.FastPipedInputStream.ClosedBy;
import junit.framework.Test;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.NullOutputStream;
import org.jvnet.hudson.test.Bug;
import org.jvnet.hudson.test.For;
import org.jvnet.hudson.test.Url;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
Expand All @@ -39,9 +37,7 @@
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.logging.Handler;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.concurrent.ExecutionException;

/**
* Test {@link Pipe}.
Expand All @@ -64,57 +60,46 @@ public void testRemoteWrite() throws Exception {
}

/**
* Helper class for testPartialReadQuietlyConsumesPipeOnClose.
*/
private static class EventCounterHandler extends Handler {
int count = 0;

@Override
public void publish(final LogRecord record) {
if (ProxyOutputStream.class.getName().equals(record.getLoggerName())) {
Throwable thrown = record.getThrown();
while (thrown != null) {
if (thrown instanceof ClosedBy) {
count += 1;
break;
}
thrown = thrown.getCause();
}
}
}

@Override
public void flush() {
}

@Override
public void close() throws SecurityException {
}

public int getCount() {
return count;
}
}

/**
* This test should be reproducing the initial bug as reported in JENKINS-8592. The assert in this test is not the
* best and is fragile, but it is the best I can come up with.
* Have the reader close the read end of the pipe while the writer is still writing.
* The writer should pick up a failure.
*/
@Bug(8592)
@For(Pipe.class)
@Url("http://issues.jenkins-ci.org/browse/JENKINS-8592")
public void testPartialReadQuietlyConsumesPipeOnClose() throws Exception {
final EventCounterHandler handler = new EventCounterHandler();
final Logger logger = Logger.getLogger(ProxyOutputStream.class.getName());
logger.addHandler(handler);
public void testReaderCloseWhileWriterIsStillWriting() throws Exception {
final Pipe p = Pipe.createRemoteToLocal();
final Future<Integer> f = channel.callAsync(new WritingCallable(p));
final Future<Void> f = channel.callAsync(new InfiniteWriter(p));
final InputStream in = p.getIn();
assertEquals(in.read(), 0);
in.close();
f.get();
logger.removeHandler(handler);
assertEquals(0, handler.getCount());

try {
f.get();
fail();
} catch (ExecutionException e) {
// should have resulted in an IOException
if (!(e.getCause() instanceof IOException)) {
e.printStackTrace();
fail();
}
}
}

/**
* Just writes forever to the pipe
*/
private static class InfiniteWriter implements Callable<Void, Exception> {
private final Pipe pipe;

public InfiniteWriter(Pipe pipe) {
this.pipe = pipe;
}

public Void call() throws Exception {
while (true) {
pipe.getOut().write(0);
Thread.sleep(10);
}
}
}

private static class WritingCallable implements Callable<Integer, IOException> {
Expand Down

0 comments on commit 73a8007

Please sign in to comment.