Skip to content

Commit

Permalink
thread PRIORITY
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Aug 29, 2024
1 parent 28061b1 commit f78d79e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
4 changes: 2 additions & 2 deletions rxlib/src/main/java/org/rx/core/CpuWatchman.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public String toString() {

static final OperatingSystemMXBean osMx = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
static final ThreadMXBean threadMx = (ThreadMXBean) ManagementFactory.getThreadMXBean();
// static final HotspotThreadMBean internalThreadMx = ManagementFactoryHelper.getHotspotThreadMBean();
static final HashedWheelTimer timer = new HashedWheelTimer(ThreadPool.newThreadFactory("timer"), 800L, TimeUnit.MILLISECONDS, 8);
// static final HotspotThreadMBean internalThreadMx = ManagementFactoryHelper.getHotspotThreadMBean();
static final HashedWheelTimer timer = new HashedWheelTimer(ThreadPool.newThreadFactory("timer", Thread.MAX_PRIORITY), 800L, TimeUnit.MILLISECONDS, 8);
//place after timer
static final CpuWatchman INSTANCE = new CpuWatchman();
static Timeout samplingCpuTimeout;
Expand Down
25 changes: 17 additions & 8 deletions rxlib/src/main/java/org/rx/core/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public T call() {
} finally {
Thread t = Thread.currentThread();
TraceHandler.INSTANCE.saveMethodTrace(t,
fn.getClass().getSimpleName(),
this.getClass().getSimpleName(),//fn.getClass().getSimpleName(),
stackTrace != null
? "[" + Linq.from(stackTrace).select(StackTraceElement::toString).toJoinString(Constants.STACK_TRACE_FLAG) + "]"
: "Unknown",
Expand Down Expand Up @@ -267,8 +267,19 @@ public FutureTaskAdapter(Runnable runnable, T result) {
static final FastThreadLocal<Boolean> CONTINUE_FLAG = new FastThreadLocal<>();
private static final FastThreadLocal<Object> COMPLETION_RETURNED_VALUE = new FastThreadLocal<>();
static final String POOL_NAME_PREFIX = "℞Threads-";
static final IntWaterMark DEFAULT_CPU_WATER_MARK = new IntWaterMark(RxConfig.INSTANCE.threadPool.lowCpuWaterMark,
RxConfig.INSTANCE.threadPool.highCpuWaterMark);
static final IntWaterMark DEFAULT_CPU_WATER_MARK = new IntWaterMark() {
private static final long serialVersionUID = 4308886582647381475L;

@Override
public int getLow() {
return RxConfig.INSTANCE.threadPool.lowCpuWaterMark;
}

@Override
public int getHigh() {
return RxConfig.INSTANCE.threadPool.highCpuWaterMark;
}
};
static final Map<Object, RefCounter<ReentrantLock>> taskLockMap = new ConcurrentHashMap<>(8);
static final Map<Object, CompletableFuture<?>> taskSerialMap = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -348,10 +359,8 @@ public static int computeThreads(double cpuUtilization, long waitTime, long cpuT
return (int) Math.max(Constants.CPU_THREADS, Math.floor(Constants.CPU_THREADS * cpuUtilization * (1 + (double) waitTime / cpuTime)));
}

static ThreadFactory newThreadFactory(String name) {
return new DefaultThreadFactory(String.format("%s%s", POOL_NAME_PREFIX, name), true
// , Thread.NORM_PRIORITY + 1
);
static ThreadFactory newThreadFactory(String name, int priority) {
return new DefaultThreadFactory(String.format("%s%s", POOL_NAME_PREFIX, name), true, priority);
}

static boolean continueFlag(boolean def) {
Expand Down Expand Up @@ -398,7 +407,7 @@ public ThreadPool(int initSize, int queueCapacity, String poolName) {
*/
public ThreadPool(int initSize, int queueCapacity, IntWaterMark cpuWaterMark, String poolName) {
super(checkSize(initSize), Integer.MAX_VALUE,
RxConfig.INSTANCE.threadPool.keepAliveSeconds, TimeUnit.SECONDS, new ThreadQueue(checkCapacity(queueCapacity)), newThreadFactory(poolName), (r, executor) -> {
RxConfig.INSTANCE.threadPool.keepAliveSeconds, TimeUnit.SECONDS, new ThreadQueue(checkCapacity(queueCapacity)), newThreadFactory(poolName, Thread.NORM_PRIORITY), (r, executor) -> {
if (executor.isShutdown()) {
log.warn("ThreadPool {} is shutdown", poolName);
return;
Expand Down
2 changes: 1 addition & 1 deletion rxlib/src/main/java/org/rx/core/WheelTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void run(Timeout timeout) throws Exception {
static final long TICK_DURATION = 100;
static final Map<Object, TimeoutFuture> holder = new ConcurrentHashMap<>();
final ExecutorService executor;
final HashedWheelTimer timer = new HashedWheelTimer(ThreadPool.newThreadFactory("TIMER"), TICK_DURATION, TimeUnit.MILLISECONDS);
final HashedWheelTimer timer = new HashedWheelTimer(ThreadPool.newThreadFactory("TIMER", Thread.NORM_PRIORITY), TICK_DURATION, TimeUnit.MILLISECONDS);
final EmptyTimeout nonTask = new EmptyTimeout();

public TimeoutFuture<?> getFutureById(Object taskId) {
Expand Down

0 comments on commit f78d79e

Please sign in to comment.