Browse Source

Fix bug that service thread may get stuck on cancel.

This would lead to memory leak when SMSEngine is shutdown.
tags/1.0.2
Yuk-Ming Lam 4 years ago
parent
commit
2cf2164548
1 changed files with 19 additions and 10 deletions
  1. +19
    -10
      src/main/java/altk/comm/engine/Broadcast.java

+ 19
- 10
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -263,6 +263,14 @@ public abstract class Broadcast
myLogger.warn("Dispatcher thread interrupted while waiting to resume"); myLogger.warn("Dispatcher thread interrupted while waiting to resume");
} }
} }
if (serviceThreadsShouldStop())
{
// Exit thread
myLogger.info("Thread terminating");
System.out.println(getName() + " terminating");
closeServiceProvider(serviceProviderPeer);
return;
}
} }
// Get a batch of jobs, if available // Get a batch of jobs, if available
@@ -277,12 +285,10 @@ public abstract class Broadcast
if (job == null) break; if (job == null) break;
batch.add(job); batch.add(job);
} }
}
if (batch.size()== 0)
{
// wait for jobs
synchronized(readyQueue)
{
if (batch.size()== 0)
{
// wait for jobs
try try
{ {
myLogger.debug("Waiting for jobs"); myLogger.debug("Waiting for jobs");
@@ -294,7 +300,8 @@ public abstract class Broadcast
} }
} }
} }
else

if (batch.size() > 0)
{ {
// Process jobs. // Process jobs.
// Mark start time // Mark start time
@@ -846,7 +853,7 @@ public abstract class Broadcast
*/ */
protected int getActiveJobCount() protected int getActiveJobCount()
{ {
return jobsTotal - completedJobCount - readyQueue.size();
return serviceActivityCount;
} }
@@ -910,14 +917,15 @@ public abstract class Broadcast
wakeUpServiceThreads(); wakeUpServiceThreads();
} }


protected void pause()
protected void pause(PrintWriter out)
{ {
// Sets state to PAUSING, which is monitored by Broadcast.Service threads. // Sets state to PAUSING, which is monitored by Broadcast.Service threads.
// EVentually, when all service activity ends, the state transitions to PAUSED // EVentually, when all service activity ends, the state transitions to PAUSED
setState(BroadcastState.PAUSING); setState(BroadcastState.PAUSING);
out.write("Broadcast paused");
} }
protected void resume()
protected void resume(PrintWriter out)
{ {
synchronized (resumeFlag) synchronized (resumeFlag)
{ {
@@ -927,6 +935,7 @@ public abstract class Broadcast
resumeFlag.notifyAll(); resumeFlag.notifyAll();
} }
} }
out.write("Broadcast resumed");
} }
/** /**
* Derived class may make its own Implementation of JobReport * Derived class may make its own Implementation of JobReport


Loading…
Cancel
Save