-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[JENKINS-50568] make receive queue max threads configurable #79
Conversation
use a ThreadPoolExecutor to process the received events. This allows to easily reconfigure the number of threads.
For the Jenkins gerrit-trigger plugin, a change is also required to provide a ThreadPoolFactory that is jenkins aware (runs with Jenkins system ACL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for not responding until now, I've been quite busy on other things.
@@ -84,6 +84,10 @@ private GerritDefaultValues() { | |||
* The default nr of event worker threads. | |||
*/ | |||
public static final int DEFAULT_NR_OF_RECEIVING_WORKER_THREADS = 3; | |||
/** | |||
* The default nr of event worker threads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy paste error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
this.numberOfWorkerThreads = numberOfWorkerThreads; | ||
this.threadKeepAliveTime = threadKeepAliveTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since executor.allowCoreThreadTimeOut(true);
is specified, I think we need to specify some minimum safe number here. According to the documentation setting for example the timeout to 0 will cause new threads to be created all the time and thrown away upon completion. And I think that might not be desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would be a reasonable minimum value? 30 seconds? Maybe we could also use a fixed value (not allowing it to be changed but that makes testing harder and when you reduce the number of threads it might take quite long until they die.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can go lower, even 10 seconds. IIUC as long as it's not 0
the core thread count should stay the same.
* Returns the largest number of threads that have ever simultaneously been in the pool. | ||
* @return number of threads | ||
*/ | ||
int getLargestPoolSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why package private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have any value for users to know what is the max poolsize that was encountered? I need this method for testing to be at least package private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, in those cases a comment explaining it's there for testability is common :)
pool.shutdown(); // Disable new tasks from being submitted | ||
try { | ||
// Wait a while for existing tasks to terminate | ||
if (!pool.awaitTermination(WAIT_FOR_JOBS_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should only awaitTermination
if join == true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
handler.addListener(listener); | ||
postEventsToQueue(5); | ||
waitForEventsProcessed(); | ||
assertThat(handler.getLargestPoolSize(), equalTo(3)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use the constant here, so an onlooker understands it tests the default number
URL url = Thread.currentThread().getContextClassLoader().getResource( | ||
"com/sonymobile/tools/gerrit/gerritevents/id_rsa"); | ||
File file = new File(url.getPath()); | ||
File file = new File("src/test/resources/com/sonymobile/tools/gerrit/gerritevents/id_rsa"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you change this?
this won't work when running the tests from the published test jar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, but with the url it doesn't work with maven (at least on windows)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I would guess this is full of test code not working on windows :) Though still a bit strange why this wouldn't work.
@@ -77,6 +78,7 @@ public static void beforeClass() { | |||
* @throws Exception if so. | |||
*/ | |||
@Test(timeout = 1000) | |||
@Ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to just remove these tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gone
@@ -183,6 +276,7 @@ private void post(Work work) { | |||
try { | |||
logger.trace("putting work on queue."); | |||
workQueue.put(work); | |||
queueWork(work); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why double storage of the events? You seem to have removed most of the workQueue
usage already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this completely, but then the getWorkQueue method should also be removed which is an API change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executor.getQueue()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executor.getQueue() returns BlockingQueue<Runnable>
while the interface defines BlockingQueue<Work>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could then keep the workQueue
reference to the queue you give the executor. They would then be the same instance but with the correct type. Or just cast it since we control what goes in it I guess, both ways are slightly ugly in their own ways :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I got a bit confused there thinking that Work
was a the Runnable
that got added to the queue, but you actually introduced a new class for that. So getWorkQueue
needs to stay for binary compatibility reasons, but it would be deprecated and since a lot of the information about the internal queue is exposed via other methods something half drastic could be done to keep it functioning a bit as before. For example iterate over the internal queue and creating a new Queue every time containing the Work
objects or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can make Work implement also Runnable.
Then we could cast executor.getQueue()
to BlockingQueue<Work>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very easy without breaking more compatibility stuff, the way it's designed with the 'perform(Coordinator)' method.
But the simple fix to keep it would be something like
List<Work> l = new ArrayList<>();
for (Runnable r : executor.getQueue()) {
if (r instanceof EventWorker) {
l.add(((EventWorker)l).work);
}
}
I think that would iterate normally without blocking, but needs testing.
There might be some callers that are expecting to get the reference to the actual queue, so the javadoc needs to be updated as well. But at least we keep binary compatibility and hopefully the deprecation warning will make it so no user uses it any more.
set minimum value for keep alive time remove EventThread class
*/ | ||
public GerritHandler(int numberOfWorkerThreads, int threadKeepAliveTime) { | ||
this.numberOfWorkerThreads = numberOfWorkerThreads; | ||
if (threadKeepAliveTime < MIN_RECEIVE_THREAD_KEEP_ALIVE_TIME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Math.max
* Retrieves the work queue for workers to poll. | ||
* @return the queue | ||
*/ | ||
BlockingQueue<Work> getWorkQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, unfortunately we can't change binary signature like this. Something else needs to be done.
* The idea is to split up as much work as possible to be able to quickly handle the next event from Gerrit | ||
* @author Robert Sandell <robert.sandell@sonyericsson.com> | ||
*/ | ||
public class EventThread extends Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speaking of binary compatibility, this needs to stay but be deprecated.
update javadoc
* | ||
* @param threadKeepAliveTime number of seconds | ||
*/ | ||
public void setThreadKeepAliveTime(int threadKeepAliveTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs same MIN_RECEIVE_THREAD_KEEP_ALIVE_TIME
guard as in the constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also check for null
return workQueue; | ||
|
||
BlockingQueue<Work> queue = new LinkedBlockingQueue<Work>(); | ||
BlockingQueue<Runnable> workQueue = executor.getQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check for null
* @return number of threads | ||
*/ | ||
int getLargestPoolSize() { | ||
return executor.getLargestPoolSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check for null
worker.shutdown(); | ||
if (executor != null) { | ||
ThreadPoolExecutor pool = executor; | ||
executor = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are places throughout in the code that are assuming a non null executor. See above comments but I have most likely not found them all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually executor can't be null. It is initialized in the constructor.
* @author Robert Sandell <robert.sandell@sonyericsson.com> | ||
*/ | ||
public interface Coordinator { | ||
/** | ||
* Retrieves the work queue for workers to poll. | ||
* @return the queue | ||
*/ | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a walkthrough of gerrit-trigger-plugin to figure out what it needed this for and probably add whatever it actually needs. A bit iffy in terms of binary compatibility. It might be time to go to java 8 and add default methods, or introduce a Coordinator2
interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gerrit trigger plugin is not using the getWorkQueue()
method.
*/ | ||
protected EventThread createEventThread(String threadName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, was this removed as well, sorry I missed it. Same binary compatibility problems as the other removed methods I think.
recover protected method for binary compatibility
use a ThreadPoolExecutor to process the received events. This allows to
easily reconfigure the number of threads similar to the GerritSendQueue