java - Why doesnt awaitTermination reliably return when all tasks on ExecutorService completed or cancelled -


i'm code submit tasks executorservice , wait them complete using shutdown() , awaittermination(). if 1 tasks takes longer period complete want cancelled without affecting other tasks. use code amended code executorservice interrupts tasks after timeout follows:

package com.jthink.jaikoz.memory;  import com.jthink.jaikoz.mainwindow;  import java.util.list; import java.util.concurrent.*;  public class timeoutthreadpoolexecutor extends threadpoolexecutor {     private final long timeout;     private final timeunit timeoutunit;      private boolean isshutdown = false;      private final scheduledexecutorservice timeoutexecutor = executors.newsinglethreadscheduledexecutor();      //map task timeout task used interrupt     private final concurrentmap<runnable, scheduledfuture> runningtasks = new concurrenthashmap<runnable, scheduledfuture>();      public long gettimeout()     {         return timeout;     }      public timeunit gettimeoutunit()     {         return timeoutunit;     }      public timeoutthreadpoolexecutor(int workersize, threadfactory threadfactory, long timeout, timeunit timeoutunit)     {         super(workersize, workersize, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>(), threadfactory);         this.timeout = timeout;         this.timeoutunit = timeoutunit;     }      public timeoutthreadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, long timeout, timeunit timeoutunit) {         super(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue);         this.timeout = timeout;         this.timeoutunit = timeoutunit;     }      public timeoutthreadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, long timeout, timeunit timeoutunit) {         super(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, threadfactory);         this.timeout = timeout;         this.timeoutunit = timeoutunit;     }      public timeoutthreadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, rejectedexecutionhandler handler, long timeout, timeunit timeoutunit) {         super(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, handler);         this.timeout = timeout;         this.timeoutunit = timeoutunit;     }      public timeoutthreadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler, long timeout, timeunit timeoutunit) {         super(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, threadfactory, handler);         this.timeout = timeout;         this.timeoutunit = timeoutunit;     }      @override     public void shutdown() {         isshutdown = true;         super.shutdown();     }      @override     public list<runnable> shutdownnow() {         timeoutexecutor.shutdownnow();         return super.shutdownnow();     }      @override     protected void beforeexecute(thread t, runnable r) {         if(timeout > 0) {             //schedule task interrupt thread running task after time timeout             final scheduledfuture<?> scheduled = timeoutexecutor.schedule(new timeouttask(t), timeout, timeoutunit);              //add mapping             runningtasks.put(r, scheduled);         }     }      @override     protected void afterexecute(runnable r, throwable t) {          //remove mapping , cancel timeout task         scheduledfuture timeouttask = runningtasks.remove(r);         if(timeouttask != null) {             timeouttask.cancel(false);         }          if (isshutdown)         {             if(getqueue().isempty())             {                 //queue empty tasks either finished or running                 mainwindow.logger.severe("---thread pool queue empty");                 timeoutexecutor.shutdown();             }         }     }      /**      * interrupt thread      *      */     class timeouttask implements runnable {         private final thread thread;          public timeouttask(thread thread) {             this.thread = thread;         }          @override         public void run() {             mainwindow.logger.severe("cancelling task because taking long");             thread.interrupt();         }     } } 

and testcase when tasks have time complete , when don't both work works expected

package com.jthink.jaikoz;  import com.jthink.jaikoz.memory.timeoutthreadpoolexecutor; import junit.framework.testcase;  import java.util.concurrent.callable; import java.util.concurrent.linkedblockingqueue; import java.util.concurrent.timeunit;  /**  * created paul on 08/12/2014.  */ public class testthreadpool extends testcase {     public void testthreadpooltaskscomplete() throws exception     {         final timeoutthreadpoolexecutor executorservice = new timeoutthreadpoolexecutor(10, 10, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>(), 6, timeunit.seconds);          (int = 0; < 10; i++)         {             executorservice.submit(new callable<object>()             {                 @override                 public object call() throws exception                 {                     thread.sleep(5000);                     system.out.println("done");                     return null;                 }              });         }         executorservice.shutdown();         executorservice.awaittermination(1, timeunit.days);         system.out.println("program done");     }      public void testthreadpooltaskscancelled() throws exception     {         final timeoutthreadpoolexecutor executorservice = new timeoutthreadpoolexecutor(10, 10, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>(), 3, timeunit.seconds);          (int = 0; < 10; i++)         {             executorservice.submit(new callable<object>()             {                 @override                 public object call() throws exception                 {                     thread.sleep(5000);                     system.out.println("done");                     return null;                 }              });         }         executorservice.shutdown();         executorservice.awaittermination(1, timeunit.days);         system.out.println("program done");     } } 

and in code appear work:

private boolean matchtorelease(listmultimap<matchkey, metadatachangedwrapper> matchkeytosongs)             throws jaikozexception     {         if (stoptask)         {             mainwindow.logger.warning("analyser stopped detected in matchtorelease");             return false;         }          timeoutthreadpoolexecutor es = getexecutorservice();         list<future<boolean>> futures = new arraylist<future<boolean>>(matchkeytosongs.size());         for(matchkey matchkey:matchkeytosongs.keyset())         {             list<metadatachangedwrapper> songs = matchkeytosongs.get(matchkey);             futures.add(es.submit(new correctfrommusicbrainzworker(this, stats, matchkey, songs)));         }         es.shutdown();         try         {             es.awaittermination(matchkeytosongs.keyset().size() * es.gettimeout(), es.gettimeoutunit());         }         catch(interruptedexception ie)         {             mainwindow.logger.warning(this.getclass() + " has been interrupted");             return false;         }         return true;     } 

however 1 customer though

---thread pool queue empty 

is output awaittermination() doesn't return,only returning when user cancels task 2 hours later - full log extract here

14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.correctfrommusicbrainzworker:getsongsnotmatched:severe: /volumes/2tb external/new itunes library/itunes media/music/xtc:albummetadatamatchingcounts11:alreadymatched:2:tomatch:11 14/12/2014 20.44.19:com.jthink.jaikoz.memory.timeoutthreadpoolexecutor:afterexecute:severe: ---thread pool queue empty 14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.executorserviceenabledanalyser:canceltask:warning: cancelling class com.jthink.jaikoz.manipulate.correctfrommusicbrainzanalyser task 14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.correctfrommusicbrainzanalyser:matchtorelease:warning: class com.jthink.jaikoz.manipulate.correctfrommusicbrainzanalyser has been interrupted 

so how can awaitermination() not returning though logs show queue empty , therefore shutdown() has been called on both executor , embedded timeoutexecutor ?

i have had few thoughts myself dont know answer.

  1. firstly why necessary shutdown timeoutexecutor awaittermination() return anyway. in subclass awaittermination() not overridden if tasks have completed matter if tiumeoutexecutor (that awaittermination() knows nothing shutdown or not)

  2. secondly why ---thread pool queue empty output more once

  3. timeoutexecutor single threaded, correct/neccessary ?

update based on holgers answer

so problem have shutting down timeoutexecutor way early, hence might miss 1 or more of it’s tasks interrupt pending tasks of thread pool executor.

right see empty queue means tasks have been completed or started. (sorry example test misleading running more 10 tasks temporary edit, , in production code no of workers based on number of cpus on users machine).

so saying shutdown() timeoutexecutor (there upto workersize -1 tasks still running)and means timeoutexecutors still running tasks haven't completed yet interrupted. therefore, if of remaining not complete of own accord reason timeout tasks them no longer exist , therefore cannot used interrupt them. reason awaittermination() woiuldnt return if 1 of these last (workersize -1) tasks didn't complete.

of own accord had changed beforeexecute() to

protected void afterexecute(runnable r, throwable t) {     scheduledfuture timeouttask = runningtasks.remove(r);     if(timeouttask != null) {         timeouttask.cancel(false);     }     if (isshutdown)     {         if(getqueue().isempty())         {              if(runningtasks.size()==0)             {                 this.shutdownnow();             }         }     } } 

to ensure finish used shutdownnow() not until finished, based on comment still possibly not work

and should do

protected void afterexecute(runnable r, throwable t) {     scheduledfuture timeouttask = runningtasks.remove(r);     if(timeouttask != null) {         timeouttask.cancel(false);     } } 

and

protected void terminated()  {     timeoutexecutor.shutdown(); } 

and terminated() called tasks submitted have finished (either naturally or via being cancelled corresponding timeoutexecutor) doesn't matter timeoutexecutor still exists @ point ?

for completnesss modifying test case task take long time unless timeout task working shows original solution failing (hanging) , revised solution working

public void testthreadpooltaskscancelled() throws exception     {         instant t1, t2;         t1 = instant.now();         final timeoutthreadpoolexecutor executorservice = new timeoutthreadpoolexecutor(10, 10, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>(), 3, timeunit.seconds);          (int = 0; < 50; i++)         {             executorservice.submit(new callable<object>()             {                 @override                 public object call() throws exception                 {                     thread.sleep(500000000);                     system.out.println("done");                     return null;                 }              });         }         executorservice.shutdown();         executorservice.awaittermination(1, timeunit.days);         t2 = instant.now();         system.out.println("program done:"+(duration.between(t1, t2).tomillis()/ 1000+ " seconds"));     } 

the queue contains jobs have not started yet. having empty queue not imply there no pending jobs; might have been removed in order executed. in example code, assumption empty queue implies no running jobs deadly wrong; since configured executor have ten core threads , submit ten jobs, queue always empty throughout entire execution of example code.

so problem have shutting down timeoutexecutor way early, hence might miss 1 or more of it’s tasks interrupt pending tasks of thread pool executor.

note in principle, jobs might in state removed queue (if ever added) beforeexecute has not been called yet. having empty queue , empty runningtasks map not guaranty there no pending jobs.


to answer other question, have to shut down timeoutexecutor has associated alive thread keep executor alive. not shutting down create memory leak , further keep thread alive, hence prevent automatic jvm shutdown.

but right place shutdown of timeoutexecutor override of method protected void terminated() intended cleanup.


to last bullet, doesn’t matter how many threads timeoutexecutor given how simple tasks are, there no benefit in having multiple threads , single-threaded executor simplest , efficient solution.


Comments

Popular posts from this blog

java - Plugin org.apache.maven.plugins:maven-install-plugin:2.4 or one of its dependencies could not be resolved -

Round ImageView Android -

How can I utilize Yahoo Weather API in android -