Skip to content

Commit

Permalink
Revert "IPROTO-265 Remove additional byte[] allocations for nested re…
Browse files Browse the repository at this point in the history
…aders/writers"

This reverts commit 16ba83f.
  • Loading branch information
pruivo authored and wburns committed Sep 1, 2023
1 parent a42ddf3 commit 5720b13
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 481 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Objec
}

public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Object t, int bufferSize) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStreamEx(bufferSize);
ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
WrappedMessage.write(ctx, TagWriterImpl.newInstanceNoBuffer(ctx, baos), t);
return baos.toByteArray();
}
Expand All @@ -155,7 +155,7 @@ public static ByteBuffer toWrappedByteBuffer(ImmutableSerializationContext ctx,
}

public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t) throws IOException {
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, out), t);
toWrappedStream(ctx, out, t, DEFAULT_STREAM_BUFFER_SIZE);
}

public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t, int bufferSize) throws IOException {
Expand Down
35 changes: 7 additions & 28 deletions core/src/main/java/org/infinispan/protostream/TagReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ public interface TagReader extends RawProtoStreamReader {

boolean readBool() throws IOException;

default int readEnum() throws IOException {
return readInt32();
}
int readEnum() throws IOException;

/**
* Reads a {@code string} value.
Expand All @@ -52,48 +50,29 @@ default int readEnum() throws IOException {
*/
ByteBuffer readByteBuffer() throws IOException;

/**
* Similar to {@link #readByteArray()} except that the reader impl may optimize creation of a sub TagReader from
* itself, possibly avoiding byte[] allocations
* @return a new TagReader
*/
TagReader subReaderFromArray() throws IOException;

default double readDouble() throws IOException {
return Double.longBitsToDouble(readFixed64());
}
double readDouble() throws IOException;

default float readFloat() throws IOException {
return Float.intBitsToFloat(readFixed32());
}
float readFloat() throws IOException;

long readInt64() throws IOException;

default long readUInt64() throws IOException {
return readInt64();
}
long readUInt64() throws IOException;

long readSInt64() throws IOException;

long readFixed64() throws IOException;

default long readSFixed64() throws IOException {
return readFixed64();
}
long readSFixed64() throws IOException;

int readInt32() throws IOException;

default int readUInt32() throws IOException {
return readInt32();
}
int readUInt32() throws IOException;

int readSInt32() throws IOException;

int readFixed32() throws IOException;

default int readSFixed32() throws IOException {
return readFixed32();
}
int readSFixed32() throws IOException;

/**
* Sets a limit (based on the length of the length delimited value) when entering an embedded message.
Expand Down
64 changes: 11 additions & 53 deletions core/src/main/java/org/infinispan/protostream/TagWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,9 @@ public interface TagWriter extends RawProtoStreamWriter {
// start low level ops
void flush() throws IOException;

/**
* Invoke after done with writer, this implies a flush if necessary
* It is necessary to invoke this on a writer returned from {@link #subWriter(int)} to actually push the data
*/
void close() throws IOException;
void writeTag(int number, int wireType) throws IOException;

default void writeTag(int number, int wireType) throws IOException {
writeVarint32(WireType.makeTag(number, wireType));
}

default void writeTag(int number, WireType wireType) throws IOException {
writeVarint32(WireType.makeTag(number, wireType));
}
void writeTag(int number, WireType wireType) throws IOException;

void writeVarint32(int value) throws IOException;

Expand All @@ -38,70 +28,38 @@ default void writeTag(int number, WireType wireType) throws IOException {
// start high level ops
void writeString(int number, String value) throws IOException;

default void writeInt32(int number, int value) throws IOException {
if (value >= 0) {
writeUInt32(number, value);
} else {
writeUInt64(number, value);
}
}
void writeInt32(int number, int value) throws IOException;

void writeUInt32(int number, int value) throws IOException;

default void writeSInt32(int number, int value) throws IOException {
// Roll the bits in order to move the sign bit from position 31 to position 0, to reduce the wire length of negative numbers.
writeUInt32(number, (value << 1) ^ (value >> 31));
}
void writeSInt32(int number, int value) throws IOException;

void writeFixed32(int number, int value) throws IOException;

default void writeSFixed32(int number, int value) throws IOException {
writeFixed32(number, value);
}
void writeSFixed32(int number, int value) throws IOException;

void writeInt64(int number, long value) throws IOException;

void writeUInt64(int number, long value) throws IOException;

default void writeSInt64(int number, long value) throws IOException {
// Roll the bits in order to move the sign bit from position 63 to position 0, to reduce the wire length of negative numbers.
writeUInt64(number, (value << 1) ^ (value >> 63));
}
void writeSInt64(int number, long value) throws IOException;

void writeFixed64(int number, long value) throws IOException;

default void writeSFixed64(int number, long value) throws IOException {
writeFixed64(number, value);
}
void writeSFixed64(int number, long value) throws IOException;

default void writeEnum(int number, int value) throws IOException {
writeInt32(number, value);
}
void writeEnum(int number, int value) throws IOException;

void writeBool(int number, boolean value) throws IOException;

default void writeDouble(int number, double value) throws IOException {
writeFixed64(number, Double.doubleToRawLongBits(value));
}
void writeDouble(int number, double value) throws IOException;

default void writeFloat(int number, float value) throws IOException {
writeFixed32(number, Float.floatToRawIntBits(value));
}
void writeFloat(int number, float value) throws IOException;

void writeBytes(int number, ByteBuffer value) throws IOException;

default void writeBytes(int number, byte[] value) throws IOException {
writeBytes(number, value, 0, value.length);
}
void writeBytes(int number, byte[] value) throws IOException;

void writeBytes(int number, byte[] value, int offset, int length) throws IOException;
// end high level ops

/**
* Used to write a sub message that can be optimized by implementation. When the sub writer is complete, flush
* should be invoked to ensure
* @return
* @throws IOException
*/
TagWriter subWriter(int number, boolean nested) throws IOException;
}
21 changes: 12 additions & 9 deletions core/src/main/java/org/infinispan/protostream/WrappedMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,15 @@ private static void writeMessage(ImmutableSerializationContext ctx, TagWriter ou
if (t.getClass().isEnum()) {
((EnumMarshallerDelegate) marshallerDelegate).encode(WRAPPED_ENUM, (Enum) t, out);
} else {
TagWriter nestedWriter = out.subWriter(WRAPPED_MESSAGE, false);
marshallerDelegate.marshall((ProtobufTagMarshaller.WriteContext) nestedWriter, null, t);
nestedWriter.close();
ByteArrayOutputStreamEx buffer = new ByteArrayOutputStreamEx();
TagWriterImpl nestedCtx = TagWriterImpl.newInstanceNoBuffer(ctx, buffer);
marshallerDelegate.marshall(nestedCtx, null, t);
nestedCtx.flush();
out.writeBytes(WRAPPED_MESSAGE, buffer.getByteBuffer());
}
}
}
out.close();
out.flush();
}

private static void writeContainer(ImmutableSerializationContext ctx, TagWriter out, BaseMarshallerDelegate marshallerDelegate, Object container) throws IOException {
Expand Down Expand Up @@ -353,7 +355,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
String typeName = null;
Integer typeId = null;
int enumValue = -1;
TagReader messageReader = null;
byte[] messageBytes = null;
Object value = null;
int fieldCount = 0;
int expectedFieldCount = 1;
Expand Down Expand Up @@ -396,7 +398,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
}
case WRAPPED_MESSAGE << WireType.TAG_TYPE_NUM_BITS | WireType.WIRETYPE_LENGTH_DELIMITED: {
expectedFieldCount = 2;
messageReader = in.subReaderFromArray();
messageBytes = in.readByteArray();
break;
}
case WRAPPED_STRING << WireType.TAG_TYPE_NUM_BITS | WireType.WIRETYPE_LENGTH_DELIMITED: {
Expand Down Expand Up @@ -512,7 +514,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
}
}

if (value == null && typeName == null && typeId == null && messageReader == null) {
if (value == null && typeName == null && typeId == null && messageBytes == null) {
return null;
}

Expand All @@ -531,9 +533,10 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
typeName = ctx.getDescriptorByTypeId(typeId).getFullName();
}
BaseMarshallerDelegate marshallerDelegate = ((SerializationContextImpl) ctx).getMarshallerDelegate(typeName);
if (messageReader != null) {
if (messageBytes != null) {
// it's a Message type
return (T) marshallerDelegate.unmarshall((ProtobufTagMarshaller.ReadContext) messageReader, null);
TagReaderImpl nestedInput = TagReaderImpl.newInstance(ctx, messageBytes);
return (T) marshallerDelegate.unmarshall(nestedInput, null);
} else {
// it's an Enum
EnumMarshaller marshaller = (EnumMarshaller) marshallerDelegate.getMarshaller();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.IOException;

import org.infinispan.protostream.ProtobufTagMarshaller;
import org.infinispan.protostream.TagWriter;
import org.infinispan.protostream.impl.BaseMarshallerDelegate;
import org.infinispan.protostream.impl.ByteArrayOutputStreamEx;
import org.infinispan.protostream.impl.Log;
Expand Down Expand Up @@ -47,17 +46,6 @@ protected final <T> void writeNestedMessage(BaseMarshallerDelegate<T> marshaller
throw log.maxNestedMessageDepth(maxNestedMessageDepth, message.getClass());
}

if (ctx instanceof TagWriter) {
TagWriter nestedWriter = ((TagWriter) ctx).subWriter(fieldNumber, true);
marshallerDelegate.marshall((ProtobufTagMarshaller.WriteContext) nestedWriter, null, message);
nestedWriter.close();
} else {
handleNonTagWriter(marshallerDelegate, ctx, fieldNumber, message);
}
}

private <T> void handleNonTagWriter(BaseMarshallerDelegate<T> marshallerDelegate, ProtobufTagMarshaller.WriteContext ctx,
int fieldNumber, T message) throws IOException {
ByteArrayOutputStreamEx baos = new ByteArrayOutputStreamEx();
TagWriterImpl nested = TagWriterImpl.newNestedInstance(ctx, baos);
writeMessage(marshallerDelegate, nested, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,4 @@ public ByteArrayOutputStreamEx(int size) {
public synchronized ByteBuffer getByteBuffer() {
return ByteBuffer.wrap(buf, 0, count);
}

public int skipFixedVarint() {
int prev = count;
count += 5;
return prev;
}

public void writePositiveFixedVarint(int pos) {
TagWriterImpl.writePositiveFixedVarint(buf, pos, count - pos - 5);
}
}
Loading

0 comments on commit 5720b13

Please sign in to comment.