Skip to content

Commit

Permalink
[INLONG-11100][Sort] The buffer queue is not released after sending m…
Browse files Browse the repository at this point in the history
…essages to elasticsearch
  • Loading branch information
vernedeng committed Sep 13, 2024
1 parent e33a5e8 commit 560bcf3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.atomic.AtomicLong;

/**
*
*
* BufferQueueChannel
*/
public class BufferQueueChannel extends AbstractChannel {
Expand All @@ -45,6 +45,7 @@ public class BufferQueueChannel extends AbstractChannel {

public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
public static final String KEY_RELOADINTERVAL = "reloadInterval";
public static final String KEY_TASK_NAME = "taskName";
public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;

// global buffer size
Expand All @@ -54,6 +55,7 @@ public class BufferQueueChannel extends AbstractChannel {
protected Timer channelTimer;
private AtomicLong takeCounter = new AtomicLong(0);
private AtomicLong putCounter = new AtomicLong(0);
private String taskName;

/**
* Constructor
Expand All @@ -66,7 +68,7 @@ public BufferQueueChannel() {

/**
* put
*
*
* @param event
* @throws ChannelException
*/
Expand All @@ -88,7 +90,7 @@ public void put(Event event) throws ChannelException {

/**
* take
*
*
* @return Event
* @throws ChannelException
*/
Expand All @@ -106,7 +108,7 @@ public Event take() throws ChannelException {

/**
* getTransaction
*
*
* @return
*/
@Override
Expand Down Expand Up @@ -138,7 +140,8 @@ protected void setReloadTimer() {
TimerTask channelTask = new TimerTask() {

public void run() {
LOG.info("queueSize:{},availablePermits:{},put:{},take:{}",
LOG.info("taskName:{},queueSize:{},availablePermits:{},put:{},take:{}",
taskName,
bufferQueue.size(),
bufferQueue.availablePermits(),
putCounter.getAndSet(0),
Expand All @@ -152,11 +155,12 @@ public void run() {

/**
* configure
*
*
* @param context
*/
@Override
public void configure(Context context) {
this.taskName = context.getString(KEY_TASK_NAME);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public void doRun() {
context.addSendFailMetric();
profileEvent.ack();
}
tx.commit();
} else {
List<EsIndexRequest> indexRequestList = handler.parse(
context, profileEvent, context.getTransformProcessor(profileEvent.getUid()));
Expand All @@ -117,6 +116,7 @@ public void doRun() {
profileEvent.ack();
}
}
tx.commit();

} catch (Throwable t) {
LOG.error("Process event failed!" + this.getName(), t);
Expand Down

0 comments on commit 560bcf3

Please sign in to comment.