Skip to content

Commit

Permalink
Content.Source from methods (#11949)
Browse files Browse the repository at this point in the history
Introduce Content.Source.from methods
These isolate code from specific implementations (which could even be made internal)
  • Loading branch information
gregw committed Jun 25, 2024
1 parent 9a9ef89 commit 718c6fc
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.ChunksContentSource;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
Expand Down Expand Up @@ -476,7 +475,7 @@ public Path getPath()
public Content.Source newContentSource()
{
// TODO: use a ByteBuffer pool and direct ByteBuffers?
return new ByteChannelContentSource.PathContentSource(getPath());
return Content.Source.from(getPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.IOResources;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.ContentSourceCompletableFuture;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.thread.AutoLock;
Expand Down Expand Up @@ -164,14 +163,55 @@ protected int fillBufferFromInputStream(InputStream inputStream, byte[] buffer)
}

/**
* <p>A specialized {@link org.eclipse.jetty.io.content.ByteChannelContentSource.PathContentSource}
* whose content is sliced by a byte range.</p>
* <p>A specialized {@link Content.Source}
* whose {@link Path} content is sliced by a byte range.</p>
*
* @deprecated use {@link Content.Source#from(ByteBufferPool.Sized, Path, long, long)}
*/
public static class PathContentSource extends ByteChannelContentSource.PathContentSource
@Deprecated(forRemoval = true, since = "12.0.11")
public static class PathContentSource implements Content.Source
{
private final Content.Source contentSource;

public PathContentSource(Path path, ByteRange byteRange)
{
super(new ByteBufferPool.Sized(null), path, byteRange.first(), byteRange.getLength());
contentSource = Content.Source.from(null, path, byteRange.first(), byteRange.getLength());
}

@Override
public void demand(Runnable demandCallback)
{
contentSource.demand(demandCallback);
}

@Override
public void fail(Throwable failure)
{
contentSource.fail(failure);
}

@Override
public void fail(Throwable failure, boolean last)
{
contentSource.fail(failure, last);
}

@Override
public long getLength()
{
return contentSource.getLength();
}

@Override
public Content.Chunk read()
{
return contentSource.read();
}

@Override
public boolean rewind()
{
return contentSource.rewind();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
public interface ByteBufferPool
{
ByteBufferPool NON_POOLING = new NonPooling();
ByteBufferPool.Sized SIZED_NON_POOLING = new Sized(ByteBufferPool.NON_POOLING);

/**
* <p>Acquires a {@link RetainableByteBuffer} from this pool.</p>
Expand Down
137 changes: 137 additions & 0 deletions jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Consumer;

import org.eclipse.jetty.io.content.BufferedContentSink;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ContentSinkOutputStream;
import org.eclipse.jetty.io.content.ContentSinkSubscriber;
import org.eclipse.jetty.io.content.ContentSourceInputStream;
import org.eclipse.jetty.io.content.ContentSourcePublisher;
import org.eclipse.jetty.io.content.InputStreamContentSource;
import org.eclipse.jetty.io.internal.ByteBufferChunk;
import org.eclipse.jetty.io.internal.ByteChannelContentSource;
import org.eclipse.jetty.io.internal.ContentCopier;
import org.eclipse.jetty.io.internal.ContentSourceByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceConsumer;
Expand Down Expand Up @@ -151,6 +157,137 @@ public static void copy(Source source, Sink sink, Chunk.Processor chunkProcessor
*/
public interface Source
{
/**
* Create a {@code Content.Source} from zero or more {@link ByteBuffer}s
* @param byteBuffers The {@link ByteBuffer}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBuffer... byteBuffers)
{
return new ByteBufferContentSource(byteBuffers);
}

/**
* Create a {@code Content.Source} from a {@link Path}.
* @param path The {@link Path}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(Path path)
{
return from(null, path, 0, -1);
}

/**
* Create a {@code Content.Source} from a {@link Path}.
* @param path The {@link Path}s to use as the source.
* @param offset The offset in bytes from which to start the source
* @param length The length in bytes of the source.
* @return A {@code Content.Source}
*/
static Content.Source from(Path path, long offset, long length)
{
return from(null, path, offset, length);
}

/**
* Create a {@code Content.Source} from a {@link Path}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param path The {@link Path}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, Path path)
{
return from(byteBufferPool, path, 0, -1);
}

/**
* Create a {@code Content.Source} from a {@link Path}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param path The {@link Path}s to use as the source.
* @param offset The offset in bytes from which to start the source
* @param length The length in bytes of the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, Path path, long offset, long length)
{
return new ByteChannelContentSource.PathContentSource(byteBufferPool, path, offset, length);
}

/**
* Create a {@code Content.Source} from a {@link ByteChannel}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param byteChannel The {@link ByteChannel}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel)
{
return new ByteChannelContentSource(byteBufferPool, byteChannel);
}

/**
* Create a {@code Content.Source} from a {@link ByteChannel}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param seekableByteChannel The {@link ByteChannel}s to use as the source.
* @param offset The offset in bytes from which to start the source
* @param length The length in bytes of the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, SeekableByteChannel seekableByteChannel, long offset, long length)
{
return new ByteChannelContentSource(byteBufferPool, seekableByteChannel, offset, length);
}

static Content.Source from(InputStream inputStream)
{
return from(null, inputStream);
}

/**
* Create a {@code Content.Source} from a {@link Path}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param inputStream The {@link InputStream}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, InputStream inputStream)
{
return new InputStreamContentSource(inputStream, byteBufferPool);
}

/**
* Create a {@code Content.Source} from a {@link Path}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param inputStream The {@link InputStream}s to use as the source.
* @param offset The offset in bytes from which to start the source
* @param length The length in bytes of the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, InputStream inputStream, long offset, long length)
{
return new InputStreamContentSource(inputStream, byteBufferPool)
{
private long skip = offset;
private long toRead = length;

@Override
protected int fillBufferFromInputStream(InputStream inputStream, byte[] buffer) throws IOException
{
if (skip > 0)
{
inputStream.skipNBytes(skip);
skip = 0;
}

if (toRead == 0)
return -1;
int toReadInt = (int)Math.min(Integer.MAX_VALUE, toRead);
int len = toReadInt > -1 ? Math.min(toReadInt, buffer.length) : buffer.length;
int read = inputStream.read(buffer, 0, len);
toRead -= read;
return read;
}
};
}

/**
* <p>Reads, non-blocking, the whole content source into a {@link ByteBuffer}.</p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.nio.file.Path;

import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.InputStreamContentSource;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -137,7 +136,7 @@ public static Content.Source asContentSource(Resource resource, ByteBufferPool b
Path path = resource.getPath();
if (path != null)
{
return new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path);
return Content.Source.from(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path, 0, -1);
}
if (resource instanceof MemoryResource memoryResource)
{
Expand Down Expand Up @@ -181,20 +180,20 @@ public static Content.Source asContentSource(Resource resource, ByteBufferPool b
Path path = resource.getPath();
if (path != null)
{
return new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path, first, length);
return Content.Source.from(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path, first, length);
}

// Try an optimization for MemoryResource.
if (resource instanceof MemoryResource memoryResource)
return new ByteBufferContentSource(ByteBuffer.wrap(memoryResource.getBytes()));
return Content.Source.from(ByteBuffer.wrap(memoryResource.getBytes()));

// Fallback to InputStream.
try
{
InputStream inputStream = resource.newInputStream();
if (inputStream == null)
throw new IllegalArgumentException("Resource does not support InputStream: " + resource);
return new RangedInputStreamContentSource(inputStream, new ByteBufferPool.Sized(bufferPool, direct, bufferSize), first, length);
return Content.Source.from(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), inputStream, first, length);
}
catch (IOException e)
{
Expand Down Expand Up @@ -411,33 +410,4 @@ protected void onCompleteFailure(Throwable x)
super.onCompleteFailure(x);
}
}

/**
* <p>A specialized {@link InputStreamContentSource}
* whose content is sliced by a byte range.</p>
*/
private static class RangedInputStreamContentSource extends InputStreamContentSource
{
private long toRead;

public RangedInputStreamContentSource(InputStream inputStream, ByteBufferPool bufferPool, long first, long length) throws IOException
{
super(inputStream, bufferPool);
inputStream.skipNBytes(first);
// TODO perform sanity checks on length?
this.toRead = length;
}

@Override
protected int fillBufferFromInputStream(InputStream inputStream, byte[] buffer) throws IOException
{
if (toRead == 0)
return -1;
int toReadInt = (int)Math.min(Integer.MAX_VALUE, toRead);
int len = toReadInt > -1 ? Math.min(toReadInt, buffer.length) : buffer.length;
int read = inputStream.read(buffer, 0, len);
toRead -= read;
return read;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class InputStreamContentSource implements Content.Source

public InputStreamContentSource(InputStream inputStream)
{
this(inputStream, new ByteBufferPool.Sized(null));
this(inputStream, null);
}

public InputStreamContentSource(InputStream inputStream, ByteBufferPool bufferPool)
Expand All @@ -58,7 +58,7 @@ public InputStreamContentSource(InputStream inputStream, ByteBufferPool bufferPo
public InputStreamContentSource(InputStream inputStream, ByteBufferPool.Sized bufferPool)
{
this.inputStream = Objects.requireNonNull(inputStream);
this.bufferPool = Objects.requireNonNull(bufferPool);
this.bufferPool = Objects.requireNonNullElse(bufferPool, ByteBufferPool.SIZED_NON_POOLING);
}

public int getBufferSize()
Expand Down
Loading

0 comments on commit 718c6fc

Please sign in to comment.