Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to make it work #18

Merged
merged 6 commits into from
May 9, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<version>0.3.3</version>
<version>0.3.4</version>
<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.io.File;

public class FileDescriptor {
public class FileProperties {
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
long rawBytes = 0;
long zippedBytes = 0;
long numRecords = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;


/**
* This class is used to write gzipped rolling files.
* Currently supports size based rolling, where size is for *uncompressed* size,
Expand All @@ -19,16 +18,17 @@
public class FileWriter implements Closeable {

private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
FileDescriptor currentFile;
FileProperties currentFile;
private Timer timer;
private Consumer<FileDescriptor> onRollCallback;
private Consumer<FileProperties> onRollCallback;
private final long flushInterval;
private final boolean shouldCompressData;
private Supplier<String> getFilePath;
private OutputStream outputStream;
private String basePath;
private CountingOutputStream countingStream;
private long fileThreshold;
private FileDescriptor currentFd;
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved

/**
* @param basePath - This is path to which to write the files to.
Expand All @@ -39,7 +39,7 @@ public class FileWriter implements Closeable {
*/
public FileWriter(String basePath,
long fileThreshold,
Consumer<FileDescriptor> onRollCallback,
Consumer<FileProperties> onRollCallback,
Supplier<String> getFilePath,
long flushInterval,
boolean shouldCompressData) {
Expand Down Expand Up @@ -76,27 +76,28 @@ public synchronized void write(byte[] data) throws IOException {
}

public void openFile() throws IOException {
FileDescriptor fileDescriptor = new FileDescriptor();
FileProperties fileProps = new FileProperties();

File folder = new File(basePath);
if (!folder.exists() && !folder.mkdirs()) {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
}

String filePath = getFilePath.get();
fileDescriptor.path = filePath;
fileProps.path = filePath;

File file = new File(filePath);

file.createNewFile();

FileOutputStream fos = new FileOutputStream(file);
currentFd = fos.getFD();
fos.getChannel().truncate(0);

countingStream = new CountingOutputStream(fos);
outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream;
fileDescriptor.file = file;
currentFile = fileDescriptor;
fileProps.file = file;
currentFile = fileProps;
}

void rotate() throws IOException {
Expand All @@ -105,6 +106,10 @@ void rotate() throws IOException {
}

void finishFile() throws IOException {
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved
finishFile(true);
}

void finishFile(Boolean delete) throws IOException {
if(isDirty()){
if(shouldCompressData){
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
Expand All @@ -114,17 +119,28 @@ void finishFile() throws IOException {
}

onRollCallback.accept(currentFile);
if (delete){
dumpFile();
}
} else {
outputStream.close();
}
}

// closing late so that the success callback will have a chance to use the file. This is a real thing on debug?!
private void dumpFile() throws IOException {
outputStream.close();
currentFd = null;
boolean deleted = currentFile.file.delete();
if (!deleted) {
log.warn("couldn't delete temporary file. File exists: " + currentFile.file.exists());
}
}

public void rollback() throws IOException {
if (outputStream != null) {
outputStream.close();
if (currentFile != null && currentFile.file != null) {
currentFile.file.delete();
dumpFile();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.microsoft.azure.kusto.kafka.connect.sink;

import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
Expand Down Expand Up @@ -42,7 +40,7 @@ public class TopicPartitionWriter {
this.eventDataCompression = ingestionProps.eventDataCompression;
}

public void handleRollFile(FileDescriptor fileDescriptor) {
public void handleRollFile(FileProperties fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testOpen() throws IOException {
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;

Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> {};
Consumer<FileProperties> trackFiles = (FileProperties f) -> {};

Supplier<String> generateFileName = () -> FILE_PATH;

Expand Down Expand Up @@ -82,9 +82,9 @@ public void testGzipFileWriter() throws IOException {

final int MAX_FILE_SIZE = 100;

Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes);
Consumer<FileProperties> trackFiles = (FileProperties f) -> files.put(f.path, f.rawBytes);

Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString();
Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";

FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);

Expand Down Expand Up @@ -121,9 +121,9 @@ public void testGzipFileWriterFlush() throws IOException, InterruptedException {

final int MAX_FILE_SIZE = 128 * 2;

Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes);
Consumer<FileProperties> trackFiles = (FileProperties f) -> files.put(f.path, f.rawBytes);

Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString();
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
ohadbitt marked this conversation as resolved.
Show resolved Hide resolved

// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testFileWriterCompressed() throws IOException {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message";

Consumer<FileDescriptor> trackFiles = getAssertFileConsumer(msg);
Consumer<FileProperties> trackFiles = getAssertFileConsumer(msg);

Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";

Expand All @@ -189,11 +189,11 @@ public void testFileWriterCompressed() throws IOException {
fileWriter.write(byteArrayOutputStream.toByteArray());

fileWriter.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
}

static Consumer<FileDescriptor> getAssertFileConsumer(String msg) {
return (FileDescriptor f) -> {
static Consumer<FileProperties> getAssertFileConsumer(String msg) {
return (FileProperties f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
byte[] bytes = IOUtils.toByteArray(fileInputStream);
try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -19,11 +18,9 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.GZIPOutputStream;

import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -64,7 +61,7 @@ public void testHandleRollFile() {
props.ingestionProperties = ingestionProperties;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, basePath, fileThreshold, flushInterval);

FileDescriptor descriptor = new FileDescriptor();
FileProperties descriptor = new FileProperties();
descriptor.rawBytes = 1024;
descriptor.path = "somepath/somefile";
descriptor.file = new File ("C://myfile.txt");
Expand Down Expand Up @@ -237,8 +234,8 @@ public void testWriteStringValuesAndOffset() throws IOException {
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString());

// Read
writer.fileWriter.finishFile();
Consumer<FileDescriptor> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
writer.fileWriter.finishFile(false);
Consumer<FileProperties> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
assertFileConsumer.accept(writer.fileWriter.currentFile);
}

Expand Down