-
Notifications
You must be signed in to change notification settings - Fork 6
/
WatchableInputStream.java
152 lines (129 loc) · 4.65 KB
/
WatchableInputStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - info@scireum.de
*/
package sirius.biz.storage.util;
import sirius.kernel.async.Future;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.function.Consumer;
/**
* Wraps an {@link InputStream} and provides a {@link Future completion future}.
* <p>
* The future is fullfilled once the stream is closed.
*/
public class WatchableInputStream extends InputStream {
private final InputStream delegate;
private Runnable successHandler;
private Consumer<Throwable> failureHandler;
private volatile boolean closeHandled = false;
/**
* Creates a new stream which wraps and delegates all calls to the given one.
*
* @param delegate the stream to wrap
*/
public WatchableInputStream(@Nonnull InputStream delegate) {
Objects.requireNonNull(delegate, "null was passed into a WatchableInputStream");
this.delegate = delegate;
}
@Override
public int read() throws IOException {
return delegate.read();
}
@Override
public int read(byte[] b) throws IOException {
return delegate.read(b);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return delegate.read(b, off, len);
}
@Override
public long skip(long n) throws IOException {
return delegate.skip(n);
}
@Override
public int available() throws IOException {
return delegate.available();
}
@Override
public synchronized void mark(int readlimit) {
delegate.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
delegate.reset();
}
@Override
public boolean markSupported() {
return delegate.markSupported();
}
@Override
public void close() throws IOException {
try {
delegate.close();
} catch (IOException exception) {
// Close might be invoked several times (e.g. by some ZIP implementations).
// Therefore, we filter this to only execute the handler once.
if (failureHandler != null && !closeHandled) {
closeHandled = true;
failureHandler.accept(exception);
}
throw exception;
}
// Filter duplicate executions (s.a.)...
if (successHandler != null && !closeHandled) {
closeHandled = true;
successHandler.run();
}
}
/**
* Sets the completion handler to this stream which is executed only if closing the stream is successful.
*
* @param successHandler the handler to be executed once the stream is successfully closed
* @return <tt>this</tt> for fluent method chaining
*/
public WatchableInputStream onSuccess(@Nonnull final Runnable successHandler) {
if (this.successHandler != null) {
throw new UnsupportedOperationException("Only one success handler can be specified");
}
this.successHandler = successHandler;
return this;
}
/**
* Sets the completion handler to this stream which is executed only if closing the stream fails.
* <p>
* The original exception will be thrown, if this handler doesn't throw its own exception.
*
* @param failureHandler the handler to be executed once closing the stream failed
* @return <tt>this</tt> for fluent method chaining
*/
public WatchableInputStream onFailure(@Nonnull final Consumer<Throwable> failureHandler) {
if (this.failureHandler != null) {
throw new UnsupportedOperationException("Only one failure handler can be specified");
}
this.failureHandler = failureHandler;
return this;
}
/**
* Installs a completion handler to this stream which is executed when the stream is closed.
* <p>
* In case of a failure, the original exception will be thrown, if this handler doesn't throw its own exception.
*
* @param completionHandler the handler to be executed once the stream is closed
* @return <tt>this</tt> for fluent method chaining
*/
public WatchableInputStream onCompletion(@Nonnull final Runnable completionHandler) {
if (this.successHandler != null || this.failureHandler != null) {
throw new UnsupportedOperationException("Only one completion handler can be specified");
}
this.successHandler = completionHandler;
this.failureHandler = ignored -> completionHandler.run();
return this;
}
}