Skip to content

Commit

Permalink
Issue #1919 scalable scheduler
Browse files Browse the repository at this point in the history
Avoid race between compare and set by allowing reschedule to always set the timer

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Nov 3, 2017
1 parent 5408218 commit 33bd7de
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,7 @@ private void scheduleTimeout(long nanoTime, long expiresInMs)
// timeout is set.
long expiresAtMs = NANOSECONDS.toMillis(nanoTime) + expiresInMs;
long lastExpiresAtMs = nextTimeout.getAndUpdate(e->Math.min(e,expiresAtMs));
if (lastExpiresAtMs==Long.MAX_VALUE)
timeout.schedule(expiresInMs,MILLISECONDS);
else if (lastExpiresAtMs!=expiresAtMs)
if (lastExpiresAtMs!=expiresAtMs)
timeout.reschedule(expiresInMs,MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public boolean schedule(HttpRequest request)
LOG.debug("Scheduled timeout {} ms for {}", timeoutInMs, request);
if (timeoutInMs>=0 && this.request.compareAndSet(null,request))
{
schedule(timeoutInMs, TimeUnit.MILLISECONDS);
reschedule(timeoutInMs, TimeUnit.MILLISECONDS);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public ContentResponse get(long timeout, TimeUnit unit) throws InterruptedExcept
boolean expired = !latch.await(timeout, unit);
if (expired)
throw new TimeoutException();
if (failure instanceof TimeoutException)
throw (TimeoutException)failure;
return getResult();
}

Expand Down
19 changes: 14 additions & 5 deletions jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeoutTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ public Scheduler getScheduler()
return _scheduler;
}

public void schedule(long delay, TimeUnit units)
/**
* Schedule a timer.
* @param delay The period to delay before the timer expires.
* @param units The units of the delay period.
* @throws IllegalStateException Thrown if the timer is already set.
*/
public void schedule(long delay, TimeUnit units) throws IllegalStateException
{
long now = System.nanoTime();
long expireAtNanos = now + units.toNanos(delay);
Expand All @@ -66,6 +72,12 @@ public void schedule(long delay, TimeUnit units)
_scheduled.compareAndSet(schedule,new Schedule(now,expireAtNanos,schedule));
}

/**
* Reschedule a timer, even if already set, cancelled or expired
* @param delay The period to delay before the timer expires.
* @param units The units of the delay period.
* @return True if the timer was already set.
*/
public boolean reschedule(long delay, TimeUnit units)
{
long now = System.nanoTime();
Expand All @@ -74,15 +86,12 @@ public boolean reschedule(long delay, TimeUnit units)
while(true)
{
long expireAt = _expireAtNanos.get();
if (expireAt==MAX_VALUE)
return false;

if (_expireAtNanos.compareAndSet(expireAt,expireAtNanos))
{
Schedule schedule = _scheduled.get();
if (schedule==null || schedule._scheduledAt>expireAtNanos)
_scheduled.compareAndSet(schedule,new Schedule(now,expireAtNanos,schedule));
return true;
return expireAt!=MAX_VALUE;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ public void testMultiple() throws Exception
{
Thread.sleep(1500);
Assert.assertTrue(_expired);
Assert.assertFalse(_timeout.reschedule(1000,TimeUnit.MILLISECONDS));
_expired=false;
Assert.assertFalse(_timeout.reschedule(500,TimeUnit.MILLISECONDS));
Thread.sleep(1000);
Assert.assertTrue(_expired);
_expired=false;
_timeout.schedule(500,TimeUnit.MILLISECONDS);
Thread.sleep(1000);
Expand Down

0 comments on commit 33bd7de

Please sign in to comment.