Skip to content

Commit

Permalink
Issue #4824 - add configuration on RemoteEndpoint for maxOutgoingFrames
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Apr 29, 2020
1 parent ac97bc3 commit 71df3b5
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ public interface RemoteEndpoint
*/
void setBatchMode(BatchMode mode);

/**
* Set the maximum number of frames which allowed to be waiting to be sent at any one time.
* The default value is -1, this indicates there is no limit on how many frames can be
* queued to be sent by the implementation.
*
* @param maxOutgoingFrames the max number of frames.
*/
void setMaxOutgoingFrames(int maxOutgoingFrames);

/**
* Get the maximum number of frames which allowed to be waiting to be sent at any one time.
* The default value is -1, this indicates there is no limit on how many frames can be
* queued to be sent by the implementation.
*
* @return the max number of frames.
*/
int getMaxOutgoingFrames();

/**
* Get the InetSocketAddress for the established connection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public void writeFailed(Throwable x)
private final OutgoingFrames outgoing;
private final AtomicInteger msgState = new AtomicInteger();
private final BlockingWriteCallback blocker = new BlockingWriteCallback();
private final AtomicInteger numOutgoingFrames = new AtomicInteger();
private volatile BatchMode batchMode;
private int maxNumOutgoingFrames = -1;

public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
{
Expand Down Expand Up @@ -303,6 +305,19 @@ public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback)
BatchMode batchMode = BatchMode.OFF;
if (frame.isDataFrame())
batchMode = getBatchMode();

if (maxNumOutgoingFrames > 0 && frame.isDataFrame())
{
// Increase the number of outgoing frames, will be decremented when callback is completed.
int outgoingFrames = numOutgoingFrames.incrementAndGet();
callback = from(callback, numOutgoingFrames::decrementAndGet);
if (outgoingFrames > maxNumOutgoingFrames)
{
callback.writeFailed(new IOException("Exceeded max outgoing frames: " + outgoingFrames + ">" + maxNumOutgoingFrames));
return;
}
}

outgoing.outgoingFrame(frame, callback, batchMode);
}

Expand Down Expand Up @@ -439,6 +454,18 @@ public void setBatchMode(BatchMode batchMode)
this.batchMode = batchMode;
}

@Override
public void setMaxOutgoingFrames(int maxOutgoingFrames)
{
this.maxNumOutgoingFrames = maxOutgoingFrames;
}

@Override
public int getMaxOutgoingFrames()
{
return maxNumOutgoingFrames;
}

@Override
public void flush() throws IOException
{
Expand All @@ -459,4 +486,36 @@ public String toString()
{
return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), getBatchMode());
}

private static WriteCallback from(WriteCallback callback, Runnable completed)
{
return new WriteCallback()
{
@Override
public void writeFailed(Throwable x)
{
try
{
callback.writeFailed(x);
}
finally
{
completed.run();
}
}

@Override
public void writeSuccess()
{
try
{
callback.writeSuccess();
}
finally
{
completed.run();
}
}
};
}
}

2 comments on commit 71df3b5

@joakime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lachlan-roberts do you want to make this branch into a PR?

@lachlan-roberts
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think I like this approach very much anymore, but I'll open a PR for further discussion.

Please sign in to comment.