Skip to content

Commit

Permalink
Fixes for Spring core integration (#11288)
Browse files Browse the repository at this point in the history
* Cleanup of HttpFields to improve spring integration

* Fixed Subscriber for spring integration

* Fixed implementation of ContentSinkSubscriber.
Added comment to clarify behavior of ContentSinkPublisher.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Apply suggestions from code review

Co-authored-by: Simone Bordet <simone.bordet@gmail.com>

* updates from review

* updates from review

* updates from review

* updates from review

* fix canRetain

* updates from review

---------

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
  • Loading branch information
3 people authored Jan 26, 2024
1 parent 985b290 commit c6c5f07
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,4 +668,47 @@ public long getLongValue()
return _long;
}
}

static class MultiHttpField extends HttpField
{
private final List<String> _list;

public MultiHttpField(String name, List<String> list)
{
super(name, buildValue(list));
_list = list;
}

private static String buildValue(List<String> list)
{
StringBuilder builder = null;
for (String v : list)
{
if (StringUtil.isBlank(v))
throw new IllegalArgumentException("blank element");
if (builder == null)
builder = new StringBuilder(list.size() * (v == null ? 5 : v.length()) * 2);
else
builder.append(", ");
builder.append(v);
}

return builder == null ? null : builder.toString();
}

@Override
public List<String> getValueList()
{
return _list;
}

@Override
public boolean contains(String search)
{
for (String v : _list)
if (StringUtil.asciiEqualsIgnoreCase(v, search))
return true;
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -989,6 +990,29 @@ default Mutable add(HttpFields fields)
return this;
}

/**
* <p>Adds a field associated with a list of values.</p>
*
* @param name the name of the field
* @param list the List value of the field.
* @return this builder
*/
default Mutable add(String name, List<String> list)
{
Objects.requireNonNull(name);
if (list == null)
throw new IllegalArgumentException("null list");
if (list.isEmpty())
return this;
if (list.size() == 1)
{
String v = list.get(0);
return add(name, v == null ? "" : v);
}
HttpField field = new HttpField.MultiHttpField(name, list);
return add(field);
}

/**
* <p>Adds the given value(s) to the {@link HttpField} with the given name,
* encoding them as comma-separated if necessary,
Expand Down Expand Up @@ -1186,27 +1210,25 @@ default Mutable put(HttpHeader header, String value)
}

/**
* Set a field.
* <p>Puts a field associated with a list of values.</p>
*
* @param name the name of the field
* @param list the List value of the field. If null the field is cleared.
* @return this builder
*/
default Mutable put(String name, List<String> list)
{
// TODO: this implementation should not add
// multiple headers, see RFC 9110 section 5.3.
boolean first = true;
for (String s : list)
Objects.requireNonNull(name);
if (list == null || list.isEmpty())
return remove(name);
if (list.size() == 1)
{
HttpField field = new HttpField(name, s);
if (first)
put(field);
else
add(field);
first = false;
String value = list.get(0);
return put(name, value == null ? "" : value);
}
return this;

HttpField field = new HttpField.MultiHttpField(name, list);
return put(field);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
Expand Down Expand Up @@ -358,20 +357,6 @@ public Mutable put(HttpHeader header, String value)
: put(new HttpField(header, value));
}

@Override
public Mutable put(String name, List<String> list)
{
Objects.requireNonNull(name);
Objects.requireNonNull(list);
remove(name);
for (String v : list)
{
if (v != null)
add(name, v);
}
return this;
}

@Override
public Mutable computeField(HttpHeader header, BiFunction<HttpHeader, List<HttpField>, HttpField> computeFn)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,13 +1000,89 @@ public void testPutNullName()
assertThat(fields.size(), is(0));
}

@Test
public void testAddNullValueList()
{
HttpFields.Mutable fields = HttpFields.build();
assertThrows(IllegalArgumentException.class, () -> fields.add("name", (List<String>)null));
assertThat(fields.size(), is(0));
List<String> list = new ArrayList<>();
fields.add("name", list);
assertThat(fields.size(), is(0));

list.add("Foo");
list.add(null);
list.add("Bar");
assertThrows(IllegalArgumentException.class, () -> fields.add("name", list));

list.set(1, "");
assertThrows(IllegalArgumentException.class, () -> fields.add("name", list));

list.set(1, " ");
assertThrows(IllegalArgumentException.class, () -> fields.add("name", list));

list.set(1, " ");
assertThrows(IllegalArgumentException.class, () -> fields.add("name", list));

assertThat(fields.size(), is(0));
}

@Test
public void testAddValueList()
{
HttpFields.Mutable fields = HttpFields.build();

fields.add("name", "0, 1, 2");
fields.add("name", List.of("A", "B", "C"));
assertThat(fields.size(), is(2));
assertThat(fields.getValuesList("name"), contains("0, 1, 2", "A, B, C"));
assertThat(fields.getCSV("name", false), contains("0", "1", "2", "A", "B", "C"));
assertThat(fields.getField("name").getValueList(), contains("0", "1", "2"));
assertThat(fields.getField(1).getValueList(), contains("A", "B", "C"));
}

@Test
public void testPutNullValueList()
{
HttpFields.Mutable fields = HttpFields.build();

assertThrows(NullPointerException.class, () -> fields.put("name", (List<String>)null));
fields.add("name", "x");
fields.put("name", (List<String>)null);
assertThat(fields.size(), is(0));

List<String> list = new ArrayList<>();
fields.add("name", "x");
fields.put("name", list);
assertThat(fields.size(), is(0));

fields.add("name", "x");
list.add("Foo");
list.add(null);
list.add("Bar");
assertThrows(IllegalArgumentException.class, () -> fields.put("name", list));

list.set(1, "");
assertThrows(IllegalArgumentException.class, () -> fields.put("name", list));

list.set(1, " ");
assertThrows(IllegalArgumentException.class, () -> fields.put("name", list));

list.set(1, " ");
assertThrows(IllegalArgumentException.class, () -> fields.put("name", list));

assertThat(fields.size(), is(1));
assertThat(fields.get("name"), is("x"));
}

@Test
public void testPutValueList()
{
HttpFields.Mutable fields = HttpFields.build();

fields.put("name", List.of("A", "B", "C"));
assertThat(fields.size(), is(1));
assertThat(fields.get("name"), is("A, B, C"));
assertThat(fields.getField("name").getValueList(), contains("A", "B", "C"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.http;

import java.nio.ByteBuffer;
import java.util.List;

import org.eclipse.jetty.util.BufferUtil;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -98,7 +99,8 @@ public void testEmptyHeaders() throws Exception

HttpFields.Mutable fields = HttpFields.build();
fields.add("Host", "something");
assertThrows(IllegalArgumentException.class, () -> fields.add("Null", null));
assertThrows(IllegalArgumentException.class, () -> fields.add("Null", (String)null));
assertThrows(IllegalArgumentException.class, () -> fields.add("Null", (List<String>)null));
fields.add("Empty", "");
RequestInfo info = new RequestInfo("GET", "/index.html", fields);
assertFalse(gen.isChunking());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@

import java.nio.ByteBuffer;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;

/**
* <p>A {@link Flow.Subscriber} that wraps a {@link Content.Sink}.
* Content delivered to the {@link #onNext(Content.Chunk)} method is
* written to {@link Content.Sink#write(boolean, ByteBuffer, Callback)} and the chunk
* is released once the write collback is succeeded or failed.</p>
* <p>A {@link Flow.Subscriber} that wraps a {@link Content.Sink}.</p>
* <p>Content delivered to the {@link #onNext(Content.Chunk)} method is
* written to {@link Content.Sink#write(boolean, ByteBuffer, Callback)}
* and the chunk is released once the write callback is succeeded or failed.</p>
*/
public class ContentSinkSubscriber implements Flow.Subscriber<Content.Chunk>
{
private final AtomicInteger lastAndComplete = new AtomicInteger(2);
private final AtomicBoolean callbackComplete = new AtomicBoolean();
private final Content.Sink sink;
private final Callback callback;
private Flow.Subscription subscription;
Expand All @@ -49,33 +54,58 @@ public void onNext(Content.Chunk chunk)
{
// Retain the chunk because the write may not complete immediately.
chunk.retain();
// Always set last=false because we do the last write from onComplete().
sink.write(false, chunk.getByteBuffer(), Callback.from(() -> succeeded(chunk), x -> failed(chunk, x)));
sink.write(chunk.isLast(), chunk.getByteBuffer(), new Callback()
{
public void succeeded()
{
chunk.release();
if (chunk.isLast())
complete();
else
subscription.request(1);
}

public void failed(Throwable failure)
{
chunk.release();
subscription.cancel();
error(failure);
}

@Override
public InvocationType getInvocationType()
{
return Invocable.getInvocationType(callback);
}
});
}

private void succeeded(Content.Chunk chunk)
@Override
public void onError(Throwable failure)
{
chunk.release();
if (!chunk.isLast())
subscription.request(1);
error(failure);
}

private void failed(Content.Chunk chunk, Throwable failure)
private void error(Throwable failure)
{
chunk.release();
subscription.cancel();
onError(failure);
if (callbackComplete.compareAndSet(false, true))
callback.failed(failure);
}

@Override
public void onError(Throwable failure)
public void onComplete()
{
callback.failed(failure);
complete();
}

@Override
public void onComplete()
private void complete()
{
sink.write(true, null, callback);
// Success the callback only when called twice:
// once from last write success and once from the publisher.
if (lastAndComplete.decrementAndGet() == 0)
{
if (callbackComplete.compareAndSet(false, true))
callback.succeeded();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ private void process()
if (chunk.isLast())
{
terminate();
// Reactive Stream specification rule 2.9 allows Publishers to call onComplete()
// even without demand, and Subscribers must be prepared to handle this case.
subscriber.onComplete();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public WithRetainable(ByteBuffer byteBuffer, boolean last, Retainable retainable
@Override
public boolean canRetain()
{
return true;
return retainable.canRetain();
}

@Override
Expand Down
Loading

0 comments on commit c6c5f07

Please sign in to comment.