Skip to content

Latest commit

 

History

History
executable file
·
403 lines (339 loc) · 14.6 KB

CM_Tutorial_Bytes.adoc

File metadata and controls

executable file
·
403 lines (339 loc) · 14.6 KB

BytesWriter and BytesReader

Table of Contents

This pair of interfaces is configured using ChronicleMapBuilder.keyMarshallers() or valueMarshallers(), for the key, or value type of the map, respectively.

These interfaces are the most suitable, where the size of the serialized form is not known in advance. The easiest way to compute the size of the serialized form of some object of the type, is performing serialization itself, and then looking at the number of written bytes.

This interface-pair is the least efficient, and the simplest to implement, so it should also be used when efficiency is not the top priority, or when gains of using other pairs of interfaces (which are more complicated to implement) are marginal.

Basically you should implement two serialization methods:

  • void write(Bytes out, @NotNull T toWrite) from BytesWriter interface, which writes the given toWrite instance of the serialized type to the given out bytes sink.

  • T read(Bytes in, @Nullable T using) from BytesReader interface, which reads the serialized object into the given using instance (if the serialized type is reusable, the using object is not null, and suitable for reusing for this particular serialized object), or a newly created instance. The returned object contains the serialized data; it may be identical, or not identical, to the passed using instance.

For example, here is the implementation of BytesWriter and BytesReader for CharSequence[] value type (array of CharSequences):

public final class CharSequenceArrayBytesMarshaller
        implements BytesWriter<CharSequence[]>, BytesReader<CharSequence[]>,
        ReadResolvable<CharSequenceArrayBytesMarshaller> {

    static final CharSequenceArrayBytesMarshaller INSTANCE = new CharSequenceArrayBytesMarshaller();

    private CharSequenceArrayBytesMarshaller() {}

    @Override
    public void write(Bytes out, @NotNull CharSequence[] toWrite) {
        out.writeInt(toWrite.length);
        for (CharSequence cs : toWrite) {
            // Assume elements non-null for simplicity
            Objects.requireNonNull(cs);
            out.writeUtf8(cs);
        }
    }

    @NotNull
    @Override
    public CharSequence[] read(Bytes in, @Nullable CharSequence[] using) {
        int len = in.readInt();
        if (using == null)
            using = new CharSequence[len];
        if (using.length != len)
            using = Arrays.copyOf(using, len);
        for (int i = 0; i < len; i++) {
            CharSequence cs = using[i];
            if (cs instanceof StringBuilder) {
                in.readUtf8((StringBuilder) cs);
            } else {
                StringBuilder sb = new StringBuilder(0);
                in.readUtf8(sb);
                using[i] = sb;
            }
        }
        return using;
    }

    @Override
    public void writeMarshallable(@NotNull WireOut wireOut) {
        // no fields to write
    }

    @Override
    public void readMarshallable(@NotNull WireIn wireIn) {
        // no fields to read
    }

    @Override
    public CharSequenceArrayBytesMarshaller readResolve() {
        return INSTANCE;
    }
}

Usage example:

try (ChronicleMap<String, CharSequence[]> map = ChronicleMap
        .of(String.class, CharSequence[].class)
        .averageKey("fruits")
        .valueMarshaller(CharSequenceArrayBytesMarshaller.INSTANCE)
        .averageValue(new CharSequence[]{"banana", "pineapple"})
        .entries(2)
        .create()) {
    map.put("fruits", new CharSequence[]{"banana", "pineapple"});
    map.put("vegetables", new CharSequence[] {"carrot", "potato"});
    Assert.assertEquals(2, map.get("fruits").length);
    Assert.assertEquals(2, map.get("vegetables").length);
}

The total size of serialization-form for a CharSequence[] array is 4 bytes for storing the array length, plus the sum of sizes of all CharSequences, in UTF-8 encoding. Computing this size without actual encoding has comparable computational cost with performing actual encoding. That makes CharSequence[] type to meet the second criteria (see above). This makes BytesWriter and BytesReader the most suitable pair of serialization interfaces to implement for the type.

Note how read() implementation attempts to reuse, not only the array object, but also the elements, minimizing the amount of produced garbage. This is a recommended practice.

Some additional notes:

  • If the reader, or writer, interface implementation is not configurable, and doesn’t have per-instance cache or state fields (it doesn’t have instance fields at all), there is a convention to make such implementation classes final, give them a private constructor, and then expose a single INSTANCE constant; a sole instance of this implementation in the JVM.

  • Do not make marshaller class enum, because there are some issues with enum serialization/deserialization.

  • For such no-state implementations, do not forget to implement the ReadResolvable interface and return INSTANCE. Otherwise you have no guarantee that INSTANCE constant is the only alive instance of this implementation in the JVM.

  • If both the writer and reader interface implementations have no fields, it might be a good idea to merge them into a single type, in order to keep writing and reading logic together.

Custom CharSequence encoding

Another example shows how to serialize `CharSequence`s using custom encoding (rather than UTF-8):

Writer:

public final class CharSequenceCustomEncodingBytesWriter
        implements BytesWriter<CharSequence>,
        StatefulCopyable<CharSequenceCustomEncodingBytesWriter> {

    // config fields, non-final because read in readMarshallable()
    private Charset charset;
    private int inputBufferSize;

    // cache fields
    private transient CharsetEncoder charsetEncoder;
    private transient CharBuffer inputBuffer;
    private transient ByteBuffer outputBuffer;

    public CharSequenceCustomEncodingBytesWriter(Charset charset, int inputBufferSize) {
        this.charset = charset;
        this.inputBufferSize = inputBufferSize;
        initTransients();
    }

    private void initTransients() {
        charsetEncoder = charset.newEncoder();
        inputBuffer = CharBuffer.allocate(inputBufferSize);
        int outputBufferSize = (int) (inputBufferSize * charsetEncoder.averageBytesPerChar());
        outputBuffer = ByteBuffer.allocate(outputBufferSize);
    }

    @Override
    public void write(Bytes out, @NotNull CharSequence cs) {
        // Write the actual cs length for accurate StringBuilder.ensureCapacity() while reading
        out.writeStopBit(cs.length());
        long encodedSizePos = out.writePosition();
        out.writeSkip(4);
        charsetEncoder.reset();
        inputBuffer.clear();
        outputBuffer.clear();
        int csPos = 0;
        boolean endOfInput = false;
        // this loop inspired by the CharsetEncoder.encode(CharBuffer) implementation
        while (true) {
            if (!endOfInput) {
                int nextCsPos = Math.min(csPos + inputBuffer.remaining(), cs.length());
                append(inputBuffer, cs, csPos, nextCsPos);
                inputBuffer.flip();
                endOfInput = nextCsPos == cs.length();
                csPos = nextCsPos;
            }

            CoderResult cr = inputBuffer.hasRemaining() ?
                    charsetEncoder.encode(inputBuffer, outputBuffer, endOfInput) :
                    CoderResult.UNDERFLOW;

            if (cr.isUnderflow() && endOfInput)
                cr = charsetEncoder.flush(outputBuffer);

            if (cr.isUnderflow()) {
                if (endOfInput) {
                    break;
                } else {
                    inputBuffer.compact();
                    continue;
                }
            }

            if (cr.isOverflow()) {
                outputBuffer.flip();
                writeOutputBuffer(out);
                outputBuffer.clear();
                continue;
            }

            try {
                cr.throwException();
            } catch (CharacterCodingException e) {
                throw new IORuntimeException(e);
            }
        }
        outputBuffer.flip();
        writeOutputBuffer(out);

        out.writeInt(encodedSizePos, (int) (out.writePosition() - encodedSizePos - 4));
    }

    private void writeOutputBuffer(Bytes out) {
        int remaining = outputBuffer.remaining();
        out.write(out.writePosition(), outputBuffer, 0, remaining);
        out.writeSkip(remaining);
    }

    /**
     * Need this method because {@link CharBuffer#append(CharSequence, int, int)} produces garbage
     */
    private static void append(CharBuffer charBuffer, CharSequence cs, int start, int end) {
        for (int i = start; i < end; i++) {
            charBuffer.put(cs.charAt(i));
        }
    }

    @Override
    public void readMarshallable(@NotNull WireIn wireIn) {
        charset = (Charset) wireIn.read(() -> "charset").object();
        inputBufferSize = wireIn.read(() -> "inputBufferSize").int32();
        initTransients();
    }

    @Override
    public void writeMarshallable(@NotNull WireOut wireOut) {
        wireOut.write(() -> "charset").object(charset);
        wireOut.write(() -> "inputBufferSize").int32(inputBufferSize);
    }

    @Override
    public CharSequenceCustomEncodingBytesWriter copy() {
        return new CharSequenceCustomEncodingBytesWriter(charset, inputBufferSize);
    }
}

Reader:

public final class CharSequenceCustomEncodingBytesReader
        implements BytesReader<CharSequence>,
        StatefulCopyable<CharSequenceCustomEncodingBytesReader> {

    // config fields, non-final because read in readMarshallable()
    private Charset charset;
    private int inputBufferSize;

    // cache fields
    private transient CharsetDecoder charsetDecoder;
    private transient ByteBuffer inputBuffer;
    private transient CharBuffer outputBuffer;

    public CharSequenceCustomEncodingBytesReader(Charset charset, int inputBufferSize) {
        this.charset = charset;
        this.inputBufferSize = inputBufferSize;
        initTransients();
    }

    private void initTransients() {
        charsetDecoder = charset.newDecoder();
        inputBuffer = ByteBuffer.allocate(inputBufferSize);
        int outputBufferSize = (int) (inputBufferSize * charsetDecoder.averageCharsPerByte());
        outputBuffer = CharBuffer.allocate(outputBufferSize);
    }

    @NotNull
    @Override
    public CharSequence read(Bytes in, @Nullable CharSequence using) {
        long csLengthAsLong = in.readStopBit();
        if (csLengthAsLong > Integer.MAX_VALUE) {
            throw new IORuntimeException("cs len shouldn't be more than " + Integer.MAX_VALUE +
                    ", " + csLengthAsLong + " read");
        }
        int csLength = (int) csLengthAsLong;
        StringBuilder sb;
        if (using instanceof StringBuilder) {
            sb = (StringBuilder) using;
            sb.setLength(0);
            sb.ensureCapacity(csLength);
        } else {
            sb = new StringBuilder(csLength);
        }

        int remainingBytes = in.readInt();
        charsetDecoder.reset();
        inputBuffer.clear();
        outputBuffer.clear();
        boolean endOfInput = false;
        // this loop inspired by the CharsetDecoder.decode(ByteBuffer) implementation
        while (true) {
            if (!endOfInput) {
                int inputChunkSize = Math.min(inputBuffer.remaining(), remainingBytes);
                inputBuffer.limit(inputBuffer.position() + inputChunkSize);
                in.read(inputBuffer);
                inputBuffer.flip();
                remainingBytes -= inputChunkSize;
                endOfInput = remainingBytes == 0;
            }

            CoderResult cr = inputBuffer.hasRemaining() ?
                    charsetDecoder.decode(inputBuffer, outputBuffer, endOfInput) :
                    CoderResult.UNDERFLOW;

            if (cr.isUnderflow() && endOfInput)
                cr = charsetDecoder.flush(outputBuffer);

            if (cr.isUnderflow()) {
                if (endOfInput) {
                    break;
                } else {
                    inputBuffer.compact();
                    continue;
                }
            }

            if (cr.isOverflow()) {
                outputBuffer.flip();
                sb.append(outputBuffer);
                outputBuffer.clear();
                continue;
            }

            try {
                cr.throwException();
            } catch (CharacterCodingException e) {
                throw new IORuntimeException(e);
            }
        }
        outputBuffer.flip();
        sb.append(outputBuffer);

        return sb;
    }

    @Override
    public void readMarshallable(@NotNull WireIn wireIn) throws IORuntimeException {
        charset = (Charset) wireIn.read(() -> "charset").object();
        inputBufferSize = wireIn.read(() -> "inputBufferSize").int32();
        initTransients();
    }

    @Override
    public void writeMarshallable(@NotNull WireOut wireOut) {
        wireOut.write(() -> "charset").object(charset);
        wireOut.write(() -> "inputBufferSize").int32(inputBufferSize);
    }

    @Override
    public CharSequenceCustomEncodingBytesReader copy() {
        return new CharSequenceCustomEncodingBytesReader(charset, inputBufferSize);
    }
}

Usage example:

Charset charset = Charset.forName("GBK");
int charBufferSize = 100;
int bytesBufferSize = 200;
CharSequenceCustomEncodingBytesWriter writer =
        new CharSequenceCustomEncodingBytesWriter(charset, charBufferSize);
CharSequenceCustomEncodingBytesReader reader =
        new CharSequenceCustomEncodingBytesReader(charset, bytesBufferSize);
try (ChronicleMap<String, CharSequence> englishToChinese = ChronicleMap
        .of(String.class, CharSequence.class)
        .valueMarshallers(reader, writer)
        .averageKey("hello")
        .averageValue("你好")
        .entries(10)
        .create()) {
    englishToChinese.put("hello", "你好");
    englishToChinese.put("bye", "再见");

    Assert.assertEquals("你好", englishToChinese.get("hello").toString());
    Assert.assertEquals("再见", englishToChinese.get("bye").toString());
}

Some notes on this form of custom serialization:

  • Both CharSequenceCustomEncodingBytesWriter and CharSequenceCustomEncodingBytesReader have configurations (charset and input buffer size). They are implemented as normal classes rather than classes with private constructors and a single INSTANCE.

  • Both writer and reader classes have some "cache" fields; their contents are mutated during writing and reading. That is why they have to implement the StatefulCopyable interface.