diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java index 16b00e36053..5db46b11ead 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java @@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; /** - * + * * BufferQueueChannel */ public class BufferQueueChannel extends AbstractChannel { @@ -45,6 +45,7 @@ public class BufferQueueChannel extends AbstractChannel { public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; public static final String KEY_RELOADINTERVAL = "reloadInterval"; + public static final String KEY_TASK_NAME = "taskName"; public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; // global buffer size @@ -54,6 +55,7 @@ public class BufferQueueChannel extends AbstractChannel { protected Timer channelTimer; private AtomicLong takeCounter = new AtomicLong(0); private AtomicLong putCounter = new AtomicLong(0); + private String taskName; /** * Constructor @@ -66,7 +68,7 @@ public BufferQueueChannel() { /** * put - * + * * @param event * @throws ChannelException */ @@ -88,7 +90,7 @@ public void put(Event event) throws ChannelException { /** * take - * + * * @return Event * @throws ChannelException */ @@ -106,7 +108,7 @@ public Event take() throws ChannelException { /** * getTransaction - * + * * @return */ @Override @@ -138,7 +140,8 @@ protected void setReloadTimer() { TimerTask channelTask = new TimerTask() { public void run() { - LOG.info("queueSize:{},availablePermits:{},put:{},take:{}", + LOG.info("taskName:{},queueSize:{},availablePermits:{},put:{},take:{}", + taskName, bufferQueue.size(), bufferQueue.availablePermits(), putCounter.getAndSet(0), @@ -152,11 +155,12 @@ public void run() { /** * configure - * + * * @param context */ @Override public void configure(Context context) { + this.taskName = context.getString(KEY_TASK_NAME); } /** diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java index caa1fbbb861..93a29e0142a 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java @@ -106,7 +106,6 @@ public void doRun() { context.addSendFailMetric(); profileEvent.ack(); } - tx.commit(); } else { List indexRequestList = handler.parse( context, profileEvent, context.getTransformProcessor(profileEvent.getUid())); @@ -117,6 +116,7 @@ public void doRun() { profileEvent.ack(); } } + tx.commit(); } catch (Throwable t) { LOG.error("Process event failed!" + this.getName(), t);