Skip to content

Commit

Permalink
Error handling and DLQ support for IO operation failures. (#32)
Browse files Browse the repository at this point in the history
* added implementaion of Miscellaneous DLQ for TempDir

* resolved Pr changes

Co-authored-by: Fahad Sheikh <44108345+fahadhasher@users.noreply.github.com>

* resolved pr comments and updated relevant documentation

Co-authored-by: SanchayGupta1197 <sanchay.gupta@hashedin.com>
Co-authored-by: Sanchay Gupta <58803136+SanchayGupta1197@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 21, 2020
1 parent 6a979e9 commit 1ca54a6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ public static String[] getNames() {

static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "misc.deadletterqueue.bootstrap.servers";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC = "Configure this list to Kafka broker's address(es) "
+ "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster. "
+ "to which the Connector should write records failed due to restrictions while writing to the file in `tempdir.path`, network interruptions or unavailability of Kusto cluster. "
+ "This list should be in the form host-1:port-1,host-2:port-2,…host-n:port-n. ";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Miscellaneous Dead-Letter Queue Bootstrap Servers";

static final String KUSTO_DLQ_TOPIC_NAME_CONF = "misc.deadletterqueue.topic.name";
private static final String KUSTO_DLQ_TOPIC_NAME_DOC = "Set this to the Kafka topic's name "
+ "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster.";
+ "to which the Connector should write records failed due to restrictions while writing to the file in `tempdir.path`, network interruptions or unavailability of Kusto cluster.";
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Miscellaneous Dead-Letter Queue Topic Name";

static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "errors.retry.max.time.ms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class TopicPartitionWriter {

private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
private static final String COMPRESSION_EXTENSION = ".gz";

private static final String FILE_EXCEPTION_MESSAGE = "Failed to create file or write record into file for ingestion.";

private final TopicPartition tp;
private final IngestClient client;
private final IngestionProperties ingestionProps;
Expand Down Expand Up @@ -158,13 +159,25 @@ void writeRecord(SinkRecord record) throws ConnectException {
this.currentOffset = record.kafkaOffset();
fileWriter.writeData(record);
} catch (IOException | DataException ex) {
throw new ConnectException("Failed to create file or write records into file for ingestion.", ex);
handleErrors(record, ex);
} finally {
reentrantReadWriteLock.readLock().unlock();
}
}
}

private void handleErrors(SinkRecord record, Exception ex) {
if (BehaviorOnError.FAIL == behaviorOnError) {
throw new ConnectException(FILE_EXCEPTION_MESSAGE, ex);
} else if (BehaviorOnError.LOG == behaviorOnError) {
log.error(FILE_EXCEPTION_MESSAGE + " {}", ex);
sendFailedRecordToDlq(record);
} else {
log.debug(FILE_EXCEPTION_MESSAGE + " {}", ex);
sendFailedRecordToDlq(record);
}
}

void open() {
// Should compress binary files
fileWriter = new FileWriter(
Expand Down

0 comments on commit 1ca54a6

Please sign in to comment.