diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java index 3468a49423dff..9410f4f67aedb 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.time.DateFormatter; @@ -28,7 +29,9 @@ import org.elasticsearch.xpack.monitoring.exporter.ExportBulk; import org.elasticsearch.xpack.monitoring.exporter.ExportException; +import java.io.FilterOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Map; @@ -56,10 +59,15 @@ class HttpExportBulk extends ExportBulk { private final DateFormatter formatter; /** - * The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}. + * The compressed bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}. */ private BytesReference payload = null; + /** + * Uncompressed length of {@link #payload} contents. + */ + private long payloadLength = -1L; + HttpExportBulk(final String name, final RestClient client, final Map parameters, final DateFormatter dateTimeFormatter, final ThreadContext threadContext) { super(name, threadContext); @@ -73,14 +81,17 @@ class HttpExportBulk extends ExportBulk { public void doAdd(Collection docs) throws ExportException { try { if (docs != null && docs.isEmpty() == false) { - try (BytesStreamOutput payload = new BytesStreamOutput()) { + final BytesStreamOutput scratch = new BytesStreamOutput(); + final CountingOutputStream countingStream; + try (StreamOutput payload = CompressorFactory.COMPRESSOR.streamOutput(scratch)) { + countingStream = new CountingOutputStream(payload); for (MonitoringDoc monitoringDoc : docs) { - writeDocument(monitoringDoc, payload); + writeDocument(monitoringDoc, countingStream); } - - // store the payload until we flush - this.payload = payload.bytes(); } + payloadLength = countingStream.bytesWritten; + // store the payload until we flush + this.payload = scratch.bytes(); } } catch (Exception e) { throw new ExportException("failed to add documents to export bulk [{}]", e, name); @@ -97,7 +108,8 @@ public void doFlush(ActionListener listener) throws ExportException { request.addParameter(param.getKey(), param.getValue()); } try { - request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON)); + request.setEntity(new InputStreamEntity( + CompressorFactory.COMPRESSOR.streamInput(payload.streamInput()), payloadLength, ContentType.APPLICATION_JSON)); } catch (IOException e) { listener.onFailure(e); return; @@ -127,7 +139,7 @@ public void onFailure(Exception exception) { } } - private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException { + private void writeDocument(MonitoringDoc doc, OutputStream out) throws IOException { final XContentType xContentType = XContentType.JSON; final XContent xContent = xContentType.xContent(); @@ -166,4 +178,39 @@ private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOExcepti name, index, id, doc.getType() ); } + + // Counting input stream used to record the uncompressed size of the bulk payload when writing it to a compressed stream + private static final class CountingOutputStream extends FilterOutputStream { + private long bytesWritten = 0; + + CountingOutputStream(final OutputStream out) { + super(out); + } + + @Override + public void write(final int b) throws IOException { + out.write(b); + count(1); + } + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + out.write(b, off, len); + count(len); + } + + @Override + public void close() { + // don't close nested stream + } + + protected void count(final long written) { + if (written != -1) { + bytesWritten += written; + } + } + } }