java - ThreadPoolExecutor With PriorityBlockong Queque do not create(dynamicly) non core Thread -


here qu see priority threadpoolexecutor - work good, problem not create new thread if number of сorepool еhread achieved.

import java.util.comparator; import java.util.concurrent.callable; import java.util.concurrent.future; import java.util.concurrent.futuretask; import java.util.concurrent.priorityblockingqueue; import java.util.concurrent.rejectedexecutionhandler; import java.util.concurrent.runnablefuture; import java.util.concurrent.threadfactory; import java.util.concurrent.threadpoolexecutor; import java.util.concurrent.timeunit;  /**  * created ngrigoriev on 4/24/14.  */ public class priorityexecutor extends threadpoolexecutor {      public priorityexecutor(int corepoolsize, int maxpoolsize, int quequsize) {         super(corepoolsize, maxpoolsize, 60l, timeunit.seconds, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator()));     }      public priorityexecutor(int corepoolsize, int maxpoolsize, long time, timeunit unit, int quequsize) {         super(corepoolsize, maxpoolsize, time, unit, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator()));     }      public priorityexecutor() {         super(0, integer.max_value, 60l, timeunit.seconds, new priorityblockingqueue<>(11, new prioritytaskcomparator()));     }      public priorityexecutor(final threadfactory threadfactory, int quequsize) {         super(0, integer.max_value, 60l, timeunit.seconds, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator()), threadfactory);     }      public priorityexecutor(final rejectedexecutionhandler handler, int quequsize) {         super(0, integer.max_value, 60l, timeunit.seconds, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator()), handler);     }      public priorityexecutor(final threadfactory threadfactory, final rejectedexecutionhandler handler, int quequsize) {         super(0, integer.max_value, 60l, timeunit.seconds, new priorityblockingqueue<>(quequsize, new prioritytaskcomparator()), threadfactory,                 handler);     }      @override     public future<?> submit(final runnable task) {         if (task == null)             throw new nullpointerexception();         final runnablefuture<object> ftask = newtaskfor(task, null);         execute(ftask);         return ftask;     }      @override     public <t> future<t> submit(callable<t> task) {         if (task == null)             throw new nullpointerexception();         final runnablefuture<t> ftask = newtaskfor(task);         execute(ftask);         return ftask;     }      @override     public <t> future<t> submit(runnable task, t result) {         if (task == null)             throw new nullpointerexception();         final runnablefuture<t> ftask = newtaskfor(task, result);         execute(ftask);         return ftask;     }      @override     protected <t> runnablefuture<t> newtaskfor(final callable<t> callable) {         if (callable instanceof important)             return new prioritytask<>(((important) callable).getpriority(), callable);         else             return new prioritytask<>(0, callable);     }      @override     protected <t> runnablefuture<t> newtaskfor(final runnable runnable, final t value) {         if (runnable instanceof important)             return new prioritytask<>(((important) runnable).getpriority(), runnable, value);         else             return new prioritytask<>(0, runnable, value);     }      public interface important {         int getpriority();     }      private static final class prioritytask<t> extends futuretask<t> implements comparable<prioritytask<t>> {         private final int priority;          public prioritytask(final int priority, final callable<t> tcallable) {             super(tcallable);              this.priority = priority;         }          public prioritytask(final int priority, final runnable runnable, final t result) {             super(runnable, result);              this.priority = priority;         }          @override         public int compareto(final prioritytask<t> o) {             final long diff = o.priority - priority;             return 0 == diff ? 0 : 0 > diff ? -1 : 1;         }     }      private static class prioritytaskcomparator implements comparator<runnable> {         @override         public int compare(final runnable left, final runnable right) {             return ((prioritytask) left).compareto((prioritytask) right);         }     } } 

during debug have found in method execute in line 1368 gave condition create non core worker, condition never true method(workercountof(recheck)) can't debug without debug byte code

    if (isrunning(c) && workqueue.offer(command)) {         int recheck = ctl.get();         if (! isrunning(recheck) && remove(command))             reject(command);        1368 **else if (workercountof(recheck) == 0)**             addworker(null, false);     } 

it's answer

the pool creates non-core threads when queue cannot accept tasks. queue accepts task not create more threads.

thanks, can change thread pool policy add new non core thread condition - example core thread state waiting

you need provide custom blockingqueue , rejectedexecutionhandler implementation wraps priority queue. custom queue need reference threadpoolexecutor , choose based on current active count of threadpoolexecutor whether or not accept "offered" runnable instances. must rejected execution handler threadpoolexecutor if multiple objects being offered concurrently, causing max size exceeded, jobs still added priority queue.

the alternative (which /much/ simpler) set "core" size max number wish allow , allow core threads time out.


Comments

Popular posts from this blog

java - Intellij Synchronizing output directories .. -

git - Initial Commit: "fatal: could not create leading directories of ..." -