Skip to content

Commit

Permalink
Parquet Ingestion and Mapping (#98)
Browse files Browse the repository at this point in the history
* parquet mapping

* naming
  • Loading branch information
ohadbitt authored Sep 10, 2019
1 parent 39c6b37 commit c87cbb4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package com.microsoft.azure.kusto.ingest;

import java.util.Arrays;
import java.util.List;

/// <summary>
// This class describes the ingestion mapping to use for an ingestion request.
// When a CSV data source schema and the target schema doesn't match or when using JSON or AVRO formats,
// When a CSV data source schema and the target schema doesn't match or when using JSON, AVRO or PARQUET formats,
// there is a need to define an ingestion mapping to map the source schema to the table schema.
// This class describes a pre-define ingestion mapping by its name- mapping reference and its kind.
/// </summary>
public class IngestionMapping {
private IngestionMappingKind ingestionMappingKind;
private String ingestionMappingReference;
public final static List<String> mappingRequiredFormats = Arrays.asList("json", "singlejson", "avro", "parquet");

/**
* Creates a default ingestion mapping with kind unknown and empty mapping reference.
Expand Down Expand Up @@ -48,5 +52,5 @@ public String getIngestionMappingReference() {
}

/// Represents an ingestion mapping kind - the format of the source data to map from.
public enum IngestionMappingKind {unknown, csv, json, avro}
public enum IngestionMappingKind {unknown, csv, json, parquet, avro}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.microsoft.azure.kusto.ingest;

import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
Expand Down Expand Up @@ -171,17 +172,11 @@ Map<String, String> getIngestionProperties() throws IOException {
}
fullAdditionalProperties.putAll(additionalProperties);

switch (ingestionMapping.getIngestionMappingKind()) {
case csv:
fullAdditionalProperties.put("csvMappingReference", ingestionMapping.getIngestionMappingReference());
break;
case json:
fullAdditionalProperties.put("jsonMappingReference", ingestionMapping.getIngestionMappingReference());
break;
case avro:
fullAdditionalProperties.put("avroMappingReference", ingestionMapping.getIngestionMappingReference());
break;
String mappingReference = ingestionMapping.getIngestionMappingReference();
if(StringUtils.isNotBlank(mappingReference)) {
fullAdditionalProperties.put("ingestionMappingReference", mappingReference);
}

return fullAdditionalProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.zip.GZIPOutputStream;

public class StreamingIngestClient implements IngestClient {

private final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private AzureStorageClient azureStorageClient;
private StreamingClient streamingClient;
private final List<String> mappingRequiredFormats = Arrays.asList("json", "singlejson", "avro");
private static final int STREAM_COMPRESS_BUFFER_SIZE = 16 * 1024;

StreamingIngestClient(ConnectionStringBuilder csb) throws URISyntaxException {
Expand Down Expand Up @@ -165,7 +163,7 @@ private String getMappingReference(IngestionProperties ingestionProperties, Stri
IngestionMapping ingestionMapping = ingestionProperties.getIngestionMapping();
String mappingReference = ingestionMapping.getIngestionMappingReference();
IngestionMapping.IngestionMappingKind ingestionMappingKind = ingestionMapping.getIngestionMappingKind();
if (mappingRequiredFormats.contains(format)) {
if (IngestionMapping.mappingRequiredFormats.contains(format)) {
String message = null;
if (!format.equals(ingestionMappingKind.name())) {
message = String.format("Wrong ingestion mapping for format %s, found %s mapping kind.", format, ingestionMappingKind.name());
Expand Down

0 comments on commit c87cbb4

Please sign in to comment.