java - ThreadPoolExecutor With PriorityBlockong Queque do not create(dynamicly) non core Thread -
here qu see priority threadpoolexecutor - work good, problem not create new thread if number of сorepool еhread achieved.
import java.util.comparator; import java.util.concurrent.callable; import java.util.concurrent.future; import java.util.concurrent.futuretask; import java.util.concurrent.priorityblockingqueue; import java.util.concurrent.rejectedexecutionhandler; import java.util.concurrent.runnablefuture; import java.util.concurrent.threadfactory; import java.util.concurrent.threadpoolexecutor; import java.util.concurrent.timeunit; /** * created ngrigoriev on 4/24/14. */ public class priorityexecutor extends threadpoolexecutor { public priorityexecutor(int corepoolsize, int maxpoolsize, int quequsize) { super(corepoolsize, maxpoolsize, 60l, timeunit.seconds, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator())); } public priorityexecutor(int corepoolsize, int maxpoolsize, long time, timeunit unit, int quequsize) { super(corepoolsize, maxpoolsize, time, unit, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator())); } public priorityexecutor() { super(0, integer.max_value, 60l, timeunit.seconds, new priorityblockingqueue<>(11, new prioritytaskcomparator())); } public priorityexecutor(final threadfactory threadfactory, int quequsize) { super(0, integer.max_value, 60l, timeunit.seconds, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator()), threadfactory); } public priorityexecutor(final rejectedexecutionhandler handler, int quequsize) { super(0, integer.max_value, 60l, timeunit.seconds, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator()), handler); } public priorityexecutor(final threadfactory threadfactory, final rejectedexecutionhandler handler, int quequsize) { super(0, integer.max_value, 60l, timeunit.seconds, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator()), threadfactory, handler); } @override public future<?> submit(final runnable task) { if (task == null) throw new nullpointerexception(); final runnablefuture<object> ftask = newtaskfor(task, null); execute(ftask); return ftask; } @override public <t> future<t> submit(callable<t> task) { if (task == null) throw new nullpointerexception(); final runnablefuture<t> ftask = newtaskfor(task); execute(ftask); return ftask; } @override public <t> future<t> submit(runnable task, t result) { if (task == null) throw new nullpointerexception(); final runnablefuture<t> ftask = newtaskfor(task, result); execute(ftask); return ftask; } @override protected <t> runnablefuture<t> newtaskfor(final callable<t> callable) { if (callable instanceof important) return new prioritytask<>(((important) callable).getpriority(), callable); else return new prioritytask<>(0, callable); } @override protected <t> runnablefuture<t> newtaskfor(final runnable runnable, final t value) { if (runnable instanceof important) return new prioritytask<>(((important) runnable).getpriority(), runnable, value); else return new prioritytask<>(0, runnable, value); } public interface important { int getpriority(); } private static final class prioritytask<t> extends futuretask<t> implements comparable<prioritytask<t>> { private final int priority; public prioritytask(final int priority, final callable<t> tcallable) { super(tcallable); this.priority = priority; } public prioritytask(final int priority, final runnable runnable, final t result) { super(runnable, result); this.priority = priority; } @override public int compareto(final prioritytask<t> o) { final long diff = o.priority - priority; return 0 == diff ? 0 : 0 > diff ? -1 : 1; } } private static class prioritytaskcomparator implements comparator<runnable> { @override public int compare(final runnable left, final runnable right) { return ((prioritytask) left).compareto((prioritytask) right); } } }
during debug have found in method execute
in line 1368 gave condition create non core worker, condition never true method(workercountof(recheck)
) can't debug without debug byte code
if (isrunning(c) && workqueue.offer(command)) { int recheck = ctl.get(); if (! isrunning(recheck) && remove(command)) reject(command); 1368 **else if (workercountof(recheck) == 0)** addworker(null, false); }
it's answer
the pool creates non-core threads when queue cannot accept tasks. queue accepts task not create more threads.
thanks, can change thread pool policy add new non core thread condition - example core thread state waiting
you need provide custom blockingqueue , rejectedexecutionhandler implementation wraps priority queue. custom queue need reference threadpoolexecutor , choose based on current active count of threadpoolexecutor whether or not accept "offered" runnable instances. must rejected execution handler threadpoolexecutor if multiple objects being offered concurrently, causing max size exceeded, jobs still added priority queue.
the alternative (which /much/ simpler) set "core" size max number wish allow , allow core threads time out.
Comments
Post a Comment