Skip to content

Commit

Permalink
Merge pull request #18 from Azure/tryToMakeItWork
Browse files Browse the repository at this point in the history
Try to make it work
  • Loading branch information
ohadbitt authored May 9, 2020
2 parents 260eaa8 + 2a8e336 commit 59ef7a8
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 72 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<version>0.3.3</version>
<version>0.3.4</version>
<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;


/**
* This class is used to write gzipped rolling files.
* Currently supports size based rolling, where size is for *uncompressed* size,
Expand All @@ -19,9 +18,9 @@
public class FileWriter implements Closeable {

private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
FileDescriptor currentFile;
SourceFile currentFile;
private Timer timer;
private Consumer<FileDescriptor> onRollCallback;
private Consumer<SourceFile> onRollCallback;
private final long flushInterval;
private final boolean shouldCompressData;
private Supplier<String> getFilePath;
Expand All @@ -30,6 +29,9 @@ public class FileWriter implements Closeable {
private CountingOutputStream countingStream;
private long fileThreshold;

// Don't remove! File descriptor is kept so that the file is not deleted when stream is closed
private FileDescriptor currentFileDescriptor;

/**
* @param basePath - This is path to which to write the files to.
* @param fileThreshold - Max size, uncompressed bytes.
Expand All @@ -39,7 +41,7 @@ public class FileWriter implements Closeable {
*/
public FileWriter(String basePath,
long fileThreshold,
Consumer<FileDescriptor> onRollCallback,
Consumer<SourceFile> onRollCallback,
Supplier<String> getFilePath,
long flushInterval,
boolean shouldCompressData) {
Expand Down Expand Up @@ -76,35 +78,36 @@ public synchronized void write(byte[] data) throws IOException {
}

public void openFile() throws IOException {
FileDescriptor fileDescriptor = new FileDescriptor();
SourceFile fileProps = new SourceFile();

File folder = new File(basePath);
if (!folder.exists() && !folder.mkdirs()) {
throw new IOException(String.format("Failed to create new directory %s", folder.getPath()));
}

String filePath = getFilePath.get();
fileDescriptor.path = filePath;
fileProps.path = filePath;

File file = new File(filePath);

file.createNewFile();

FileOutputStream fos = new FileOutputStream(file);
currentFileDescriptor = fos.getFD();
fos.getChannel().truncate(0);

countingStream = new CountingOutputStream(fos);
outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream;
fileDescriptor.file = file;
currentFile = fileDescriptor;
fileProps.file = file;
currentFile = fileProps;
}

void rotate() throws IOException {
finishFile();
finishFile(true);
openFile();
}

void finishFile() throws IOException {
void finishFile(Boolean delete) throws IOException {
if(isDirty()){
if(shouldCompressData){
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
Expand All @@ -114,17 +117,28 @@ void finishFile() throws IOException {
}

onRollCallback.accept(currentFile);
if (delete){
dumpFile();
}
} else {
outputStream.close();
}
}

// closing late so that the success callback will have a chance to use the file. This is a real thing on debug?!
private void dumpFile() throws IOException {
outputStream.close();
currentFileDescriptor = null;
boolean deleted = currentFile.file.delete();
if (!deleted) {
log.warn("couldn't delete temporary file. File exists: " + currentFile.file.exists());
}
}

public void rollback() throws IOException {
if (outputStream != null) {
outputStream.close();
if (currentFile != null && currentFile.file != null) {
currentFile.file.delete();
dumpFile();
}
}
}
Expand All @@ -136,7 +150,7 @@ public void close() throws IOException {
}

// Flush last file, updating index
finishFile();
finishFile(true);

// Setting to null so subsequent calls to close won't write it again
currentFile = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ public void put(Collection<SinkRecord> records) throws ConnectException {
}
}

// this is a neat trick, since our rolling files commit whenever they like, offsets may drift
// from what kafka expects. so basically this is to re-sync topic-partition offsets with our sink.
// This is a neat trick, since our rolling files commit whenever they like, offsets may drift
// from what kafka expects. So basically this is to re-sync topic-partition offsets with our sink.
@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.io.File;

public class FileDescriptor {
public class SourceFile {
long rawBytes = 0;
long zippedBytes = 0;
long numRecords = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.microsoft.azure.kusto.kafka.connect.sink;

import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
Expand Down Expand Up @@ -42,7 +40,7 @@ public class TopicPartitionWriter {
this.eventDataCompression = ingestionProps.eventDataCompression;
}

public void handleRollFile(FileDescriptor fileDescriptor) {
public void handleRollFile(SourceFile fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testOpen() throws IOException {
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;

Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> {};
Consumer<SourceFile> trackFiles = (SourceFile f) -> {};

Supplier<String> generateFileName = () -> FILE_PATH;

Expand Down Expand Up @@ -82,9 +82,9 @@ public void testGzipFileWriter() throws IOException {

final int MAX_FILE_SIZE = 100;

Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes);
Consumer<SourceFile> trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes);

Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString();
Supplier<String> generateFileName = () -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";

FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);

Expand Down Expand Up @@ -121,9 +121,9 @@ public void testGzipFileWriterFlush() throws IOException, InterruptedException {

final int MAX_FILE_SIZE = 128 * 2;

Consumer<FileDescriptor> trackFiles = (FileDescriptor f) -> files.put(f.path, f.rawBytes);
Consumer<SourceFile> trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes);

Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString();
Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";

// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false);
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testFileWriterCompressed() throws IOException {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message";

Consumer<FileDescriptor> trackFiles = getAssertFileConsumer(msg);
Consumer<SourceFile> trackFiles = getAssertFileConsumer(msg);

Supplier<String> generateFileName = () -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";

Expand All @@ -189,11 +189,11 @@ public void testFileWriterCompressed() throws IOException {
fileWriter.write(byteArrayOutputStream.toByteArray());

fileWriter.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
}

static Consumer<FileDescriptor> getAssertFileConsumer(String msg) {
return (FileDescriptor f) -> {
static Consumer<SourceFile> getAssertFileConsumer(String msg) {
return (SourceFile f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {
byte[] bytes = IOUtils.toByteArray(fileInputStream);
try (ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -23,7 +22,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.GZIPOutputStream;

import static org.mockito.Mockito.*;

Expand All @@ -33,12 +31,11 @@ public class TopicPartitionWriterTest {

@Before
public final void before() {
// currentDirectory = new File(Paths.get(
// System.getProperty("java.io.tmpdir"),
// FileWriter.class.getSimpleName(),
// String.valueOf(Instant.now().toEpochMilli())
// ).toString());
currentDirectory = new File("C:\\Users\\ohbitton\\Desktop");
currentDirectory = new File(Paths.get(
System.getProperty("java.io.tmpdir"),
FileWriter.class.getSimpleName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
}

@After
Expand All @@ -64,13 +61,12 @@ public void testHandleRollFile() {
props.ingestionProperties = ingestionProperties;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, basePath, fileThreshold, flushInterval);

FileDescriptor descriptor = new FileDescriptor();
SourceFile descriptor = new SourceFile();
descriptor.rawBytes = 1024;
descriptor.path = "somepath/somefile";
descriptor.file = new File ("C://myfile.txt");
writer.handleRollFile(descriptor);

FileSourceInfo fileSourceInfo = new FileSourceInfo(descriptor.path, descriptor.rawBytes);
ArgumentCaptor<FileSourceInfo> fileSourceInfoArgument = ArgumentCaptor.forClass(FileSourceInfo.class);
ArgumentCaptor<IngestionProperties> ingestionPropertiesArgumentCaptor = ArgumentCaptor.forClass(IngestionProperties.class);
try {
Expand Down Expand Up @@ -149,7 +145,7 @@ public void testOpenClose() {
@Test
public void testWriteNonStringAndOffset() throws Exception {
// TopicPartition tp = new TopicPartition("testPartition", 11);
// KustoIngestClient mockClient = mock(KustoIngestClient.class);
// IngestClient mockClient = mock(IngestClient.class);
// String db = "testdb1";
// String table = "testtable1";
// String basePath = "somepath";
Expand All @@ -172,34 +168,35 @@ public void testWriteNonStringAndOffset() throws Exception {
// Assert.assertEquals(writer.getFilePath(), "kafka_testPartition_11_0");
}

// @Test
// public void testWriteStringyValuesAndOffset() throws Exception {
// TopicPartition tp = new TopicPartition("testTopic", 2);
// IngestClient mockClient = mock(IngestClient.class);
// String db = "testdb1";
// String table = "testtable1";
// String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
// long fileThreshold = 100;
// long flushInterval = 300000;
// TopicIngestionProperties props = new TopicIngestionProperties();
//
// props.ingestionProperties = new IngestionProperties(db, table);
// props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
// TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);
//
//
// writer.open();
// List<SinkRecord> records = new ArrayList<SinkRecord>();
//
// records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message", 3));
// records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}", 4));
//
// for (SinkRecord record : records) {
// writer.writeRecord(record);
// }
//
// Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 3, IngestionProperties.DATA_FORMAT.csv.name())).toString());
// }
@Test
public void testWriteStringyValuesAndOffset() throws Exception {
TopicPartition tp = new TopicPartition("testTopic", 2);
IngestClient mockClient = mock(IngestClient.class);
String db = "testdb1";
String table = "testtable1";
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
long fileThreshold = 100;
long flushInterval = 300000;
TopicIngestionProperties props = new TopicIngestionProperties();

props.ingestionProperties = new IngestionProperties(db, table);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, basePath, fileThreshold, flushInterval);


writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();

records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "another,stringy,message", 3));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "{'also':'stringy','sortof':'message'}", 4));

for (SinkRecord record : records) {
writer.writeRecord(record);
}

Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 3, IngestionProperties.DATA_FORMAT.csv.name())).toString());
writer.close();
}

@Test
public void testWriteStringValuesAndOffset() throws IOException {
Expand Down Expand Up @@ -237,9 +234,10 @@ public void testWriteStringValuesAndOffset() throws IOException {
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 16, IngestionProperties.DATA_FORMAT.csv.name())).toString());

// Read
writer.fileWriter.finishFile();
Consumer<FileDescriptor> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
writer.fileWriter.finishFile(false);
Consumer<SourceFile> assertFileConsumer = FileWriterTest.getAssertFileConsumer(messages[2] + "\n");
assertFileConsumer.accept(writer.fileWriter.currentFile);
writer.close();
}

@Test
Expand All @@ -250,7 +248,7 @@ public void testWriteBytesValuesAndOffset() throws IOException {
String db = "testdb1";
String table = "testtable1";
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
FileInputStream fis = new FileInputStream("C:\\Users\\ohbitton\\source\\Workspaces\\Kusto\\Main\\Test\\UT\\Kusto.Engine.UT\\Common\\Kusto.Common.Svc\\Stream\\dataset3.avro");
FileInputStream fis = new FileInputStream("data.avro");
ByteArrayOutputStream o = new ByteArrayOutputStream();
int content;
while ((content = fis.read()) != -1) {
Expand All @@ -277,6 +275,7 @@ public void testWriteBytesValuesAndOffset() throws IOException {
Assert.assertEquals(writer.currentOffset, 10);

String currentFileName = writer.fileWriter.currentFile.path;
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 11, IngestionProperties.DATA_FORMAT.csv.name())).toString());
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s", tp.topic(), tp.partition(), 11, IngestionProperties.DATA_FORMAT.avro.name())).toString());
writer.close();
}
}

0 comments on commit 59ef7a8

Please sign in to comment.