From c87cbb4be9a4a4df6ab9f7ca4e1a296f7c32a58d Mon Sep 17 00:00:00 2001
From: ohadbitt <32278684+ohadbitt@users.noreply.github.com>
Date: Tue, 10 Sep 2019 11:15:01 +0300
Subject: [PATCH] Parquet Ingestion and Mapping (#98)
* parquet mapping
* naming
---
.../azure/kusto/ingest/IngestionMapping.java | 8 ++++++--
.../azure/kusto/ingest/IngestionProperties.java | 15 +++++----------
.../azure/kusto/ingest/StreamingIngestClient.java | 4 +---
3 files changed, 12 insertions(+), 15 deletions(-)
diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionMapping.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionMapping.java
index fbeb2eb4..ab33d775 100644
--- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionMapping.java
+++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionMapping.java
@@ -1,14 +1,18 @@
package com.microsoft.azure.kusto.ingest;
+import java.util.Arrays;
+import java.util.List;
+
///
// This class describes the ingestion mapping to use for an ingestion request.
-// When a CSV data source schema and the target schema doesn't match or when using JSON or AVRO formats,
+// When a CSV data source schema and the target schema doesn't match or when using JSON, AVRO or PARQUET formats,
// there is a need to define an ingestion mapping to map the source schema to the table schema.
// This class describes a pre-define ingestion mapping by its name- mapping reference and its kind.
///
public class IngestionMapping {
private IngestionMappingKind ingestionMappingKind;
private String ingestionMappingReference;
+ public final static List mappingRequiredFormats = Arrays.asList("json", "singlejson", "avro", "parquet");
/**
* Creates a default ingestion mapping with kind unknown and empty mapping reference.
@@ -48,5 +52,5 @@ public String getIngestionMappingReference() {
}
/// Represents an ingestion mapping kind - the format of the source data to map from.
- public enum IngestionMappingKind {unknown, csv, json, avro}
+ public enum IngestionMappingKind {unknown, csv, json, parquet, avro}
}
\ No newline at end of file
diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionProperties.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionProperties.java
index c75a81d5..88d47187 100644
--- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionProperties.java
+++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestionProperties.java
@@ -1,5 +1,6 @@
package com.microsoft.azure.kusto.ingest;
+import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
@@ -171,17 +172,11 @@ Map getIngestionProperties() throws IOException {
}
fullAdditionalProperties.putAll(additionalProperties);
- switch (ingestionMapping.getIngestionMappingKind()) {
- case csv:
- fullAdditionalProperties.put("csvMappingReference", ingestionMapping.getIngestionMappingReference());
- break;
- case json:
- fullAdditionalProperties.put("jsonMappingReference", ingestionMapping.getIngestionMappingReference());
- break;
- case avro:
- fullAdditionalProperties.put("avroMappingReference", ingestionMapping.getIngestionMappingReference());
- break;
+ String mappingReference = ingestionMapping.getIngestionMappingReference();
+ if(StringUtils.isNotBlank(mappingReference)) {
+ fullAdditionalProperties.put("ingestionMappingReference", mappingReference);
}
+
return fullAdditionalProperties;
}
diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java
index e86b4875..e4f7c438 100644
--- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java
+++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java
@@ -18,7 +18,6 @@
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.*;
import java.util.zip.GZIPOutputStream;
public class StreamingIngestClient implements IngestClient {
@@ -26,7 +25,6 @@ public class StreamingIngestClient implements IngestClient {
private final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private AzureStorageClient azureStorageClient;
private StreamingClient streamingClient;
- private final List mappingRequiredFormats = Arrays.asList("json", "singlejson", "avro");
private static final int STREAM_COMPRESS_BUFFER_SIZE = 16 * 1024;
StreamingIngestClient(ConnectionStringBuilder csb) throws URISyntaxException {
@@ -165,7 +163,7 @@ private String getMappingReference(IngestionProperties ingestionProperties, Stri
IngestionMapping ingestionMapping = ingestionProperties.getIngestionMapping();
String mappingReference = ingestionMapping.getIngestionMappingReference();
IngestionMapping.IngestionMappingKind ingestionMappingKind = ingestionMapping.getIngestionMappingKind();
- if (mappingRequiredFormats.contains(format)) {
+ if (IngestionMapping.mappingRequiredFormats.contains(format)) {
String message = null;
if (!format.equals(ingestionMappingKind.name())) {
message = String.format("Wrong ingestion mapping for format %s, found %s mapping kind.", format, ingestionMappingKind.name());