Browse Source

Fixed bugs with 'resume', 'cancel', 'pause', etc.

tags/1.0.2
Yuk-Ming Lam 4 years ago
parent
commit
f70c901424
3 changed files with 186 additions and 154 deletions
  1. +166
    -145
      src/main/java/altk/comm/engine/Broadcast.java
  2. +3
    -3
      src/main/java/altk/comm/engine/CommEngine.java
  3. +17
    -6
      src/main/java/altk/comm/engine/Postback.java

+ 166
- 145
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -76,6 +76,8 @@ public abstract class Broadcast
private int jobsTotal; private int jobsTotal;
private int scheduledJobs; private int scheduledJobs;
private Integer serviceActivityCount;
public static enum BroadcastState public static enum BroadcastState
{ {
ACCEPTED, ACCEPTED,
@@ -238,136 +240,106 @@ public abstract class Broadcast
myLogger.info("Thread starting..."); myLogger.info("Thread starting...");
for (;;) for (;;)
{ {
if (serviceThreadsShouldStop())
{
closeServiceProvider(serviceProviderPeer);
myLogger.info("Thread terminating");
return;
}
synchronized (resumeFlag)
if (serviceThreadsShouldStop())
{
// Exit thread
myLogger.info("Thread terminating");
System.out.println(getName() + " terminating");
closeServiceProvider(serviceProviderPeer);
return;
}
if (serviceThreadsShouldPause())
{ {
if (threadsShouldPause())
synchronized (resumeFlag)
{ {
try try
{ {
myLogger.debug("Paused");
resumeFlag.wait(); resumeFlag.wait();
myLogger.debug("Pause ended");
} }
catch (InterruptedException e) catch (InterruptedException e)
{ {
myLogger.warn("Dispatcher thread interrupted while waiting to resume"); myLogger.warn("Dispatcher thread interrupted while waiting to resume");
return;
} }
} }
} }
List<Job> batch = null;
/**
* Includes allocation from capacity. Only returns when the required allocation
* is obtained. Example, RTP port allocation, limit due to total number of allowable calls.
*/
ServicePrerequisites prerequisites = null;
synchronized(readyQueue)
{
// get a batch of jobs
Job job = readyQueue.peek();
if (job == null)
try
{
readyQueue.wait();
continue;
}
catch (InterruptedException e)
{
return;
}
myLogger.debug("Woke up from wait");
// Check if expired
if (System.currentTimeMillis() >= expireTime)
{
setState(BroadcastState.EXPIRED);
continue;
}
/**
* Includes allocation from capacity. Only returns when the required allocation
* is obtained. Example, RTP port allocation, limit due to total number of allowable calls.
*/
prerequisites = secureServicePrerequisites();
if (serviceThreadsShouldStop() || threadsShouldPause())
{
returnPrerequisites(prerequisites);
continue;
}
// Check again if expired
if (System.currentTimeMillis() >= expireTime)
{
returnPrerequisites(prerequisites);
setState(BroadcastState.EXPIRED);
continue;
}

// Now that we can go ahead with this job, let us remove this from queue
readyQueue.poll();
batch = new ArrayList<Job>();
batch.add(job);
// Get a batch of jobs, if available
myLogger.debug("Looking for jobs");
List<Job> batch = new ArrayList<Job>();
synchronized(readyQueue)
{
// We we are to get a batch of more than one, let us fill in the rest. // We we are to get a batch of more than one, let us fill in the rest.
for (int i = 1; i < getJobBatchSize(); i++)
for (int i = 0; i < getJobBatchSize(); i++)
{ {
job = readyQueue.poll();
Job job = readyQueue.poll();
if (job == null) break; if (job == null) break;
batch.add(job); batch.add(job);
} }
} }
if (batch == null || batch.size()== 0)
{
// Exit thread
myLogger.info("Thread terminating");
return;
}
// Process jobs.
// Mark start time
long now = System.currentTimeMillis();
for (Job job : batch)
{
job.startTime = now;
}
// Service the jobs
try
{
processJobs(batch, serviceProviderPeer, prerequisites);
}
catch (EngineException e)
if (batch.size()== 0)
{ {
terminate(BroadcastState.ABORTED, e.getMessage());
// wait for jobs
synchronized(readyQueue)
{
try
{
myLogger.debug("Waiting for jobs");
readyQueue.wait();
}
catch (Exception e)
{
// Do nothing
}
}
} }
catch (Throwable t)
else
{ {
// This is unexpected. Log stack trace
myLogger.error("Caught unexpected Throwable", t);
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
}
if (sleepBetweenJobs > 0)
{
try
{
Thread.sleep(sleepBetweenJobs);
}
catch (InterruptedException e1)
{
// Do nothing?
}
// Process jobs.
// Mark start time
long now = System.currentTimeMillis();
for (Job job : batch)
{
job.startTime = now;
}
// Service the jobs
// But first get dependent resource
// which includes allocation from capacity. Only returns when the required allocation
// is obtained. Example, RTP port allocation, limit due to total number of allowable calls.
ServicePrerequisites prerequisites = secureServicePrerequisites();
try
{
updateServiceActivityCount(1);
processJobs(batch, serviceProviderPeer, prerequisites);
updateServiceActivityCount(-1);
}
catch (EngineException e)
{
terminate(BroadcastState.ABORTED, e.getMessage());
}
catch (Throwable t)
{
// This is unexpected. Log stack trace
myLogger.error("Caught unexpected Throwable", t);
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
}
if (sleepBetweenJobs > 0)
{
try
{
Thread.sleep(sleepBetweenJobs);
}
catch (InterruptedException e1)
{
// Do nothing?
}
}
} }
}
}
} }
} }


@@ -394,6 +366,7 @@ public abstract class Broadcast
resumeFlag = new Object(); resumeFlag = new Object();
receiveTime = System.currentTimeMillis(); receiveTime = System.currentTimeMillis();
serviceActivityCount = Integer.valueOf(0);
} }
public int pendingJobs() public int pendingJobs()
@@ -573,7 +546,7 @@ public abstract class Broadcast


/** /**
* Makes a state transition to the given newState if the transition from * Makes a state transition to the given newState if the transition from
* the current state is legal.
* the current state is legal. Also posts back a state change notification.
* @param newState * @param newState
* @return StateChangeResult * @return StateChangeResult
*/ */
@@ -607,7 +580,7 @@ public abstract class Broadcast
{ {
//synchronized(postback.postQueue) //synchronized(postback.postQueue)
{ {
postback.queueReport(mkStatusReport(newState));
postback.queueReportFirst(mkStatusReport(newState));
state = newState; state = newState;
} }
} }
@@ -924,25 +897,23 @@ public abstract class Broadcast
switch (result.stateChangeStatus) switch (result.stateChangeStatus)
{ {
case SUCCESS: case SUCCESS:
responseContent = "OK";
responseContent = "Broadcast canceled";
break; break;
case NO_CHANGE: case NO_CHANGE:
responseContent = "Not canceled: Already cancelled";
responseContent = "Already canceled";
break; break;
case FORBIDDEN: case FORBIDDEN:
responseContent = "Not canceled: Not allowed to cancel a broadcast in " + result.currentState + " state"; responseContent = "Not canceled: Not allowed to cancel a broadcast in " + result.currentState + " state";
} }
out.write(responseContent); out.write(responseContent);


synchronized(resumeFlag)
{
resumeFlag.notifyAll();
}
wakeUpServiceThreads();
} }


protected void pause() protected void pause()
{ {
// Sets state to PAUSING, which is monitored by Broadcast.Service threads. // Sets state to PAUSING, which is monitored by Broadcast.Service threads.
// EVentually, when all service activity ends, the state transitions to PAUSED
setState(BroadcastState.PAUSING); setState(BroadcastState.PAUSING);
} }
@@ -950,7 +921,7 @@ public abstract class Broadcast
{ {
synchronized (resumeFlag) synchronized (resumeFlag)
{ {
if (threadsShouldPause())
if (serviceThreadsShouldPause())
{ {
setState(BroadcastState.RUNNING); setState(BroadcastState.RUNNING);
resumeFlag.notifyAll(); resumeFlag.notifyAll();
@@ -984,26 +955,30 @@ public abstract class Broadcast
public void doBroadcast() throws BroadcastException public void doBroadcast() throws BroadcastException
{ {
changeStateTime = System.currentTimeMillis(); changeStateTime = System.currentTimeMillis();
setState(BroadcastState.RUNNING);

// Start the dispatcher threads
for (Service thread : serviceThreadPool)
{
thread.start();
}
for (Service thread : serviceThreadPool)
{
try
{
thread.join();
}
catch (InterruptedException e)
{
myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e);
}
}
setState(BroadcastState.COMPLETED);
if (!serviceThreadsShouldStop())
{
setState(BroadcastState.RUNNING);
// Start the dispatcher threads
for (Service thread : serviceThreadPool)
{
thread.start();
}
// Wait for them to finish
for (Service thread : serviceThreadPool)
{
try
{
thread.join();
}
catch (InterruptedException e)
{
myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e);
}
}
setState(BroadcastState.COMPLETED);
}
destroyResources(); destroyResources();
postback.wrapup(); postback.wrapup();
postback = null; postback = null;
@@ -1043,21 +1018,55 @@ public abstract class Broadcast
// Do nothing in base class // Do nothing in base class
} }
/**
* Examines if service threads should stop running, or even start
*
* @return
*/
private boolean serviceThreadsShouldStop() private boolean serviceThreadsShouldStop()
{ {
return state == BroadcastState.CANCELING ||
state == BroadcastState.CANCELED || state.isFinal
|| pendingJobs() == 0;
if (System.currentTimeMillis() >= expireTime)
{
setState(BroadcastState.EXPIRED);
wakeUpServiceThreads();
return true;
}
if (state == BroadcastState.CANCELING ||
state == BroadcastState.EXPIRED ||
state == BroadcastState.CANCELED || state.isFinal)
{
return true;
}
if (pendingJobs() == 0)
{
wakeUpServiceThreads();
return true;
}
return false;
}
private void wakeUpServiceThreads()
{
synchronized (readyQueue)
{
readyQueue.notifyAll();
}
synchronized (resumeFlag)
{
resumeFlag.notifyAll();
}
} }


private boolean threadsShouldPause()
private boolean serviceThreadsShouldPause()
{ {
return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING; return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING;
} }
/** /**
* job status is reported back to this broadcast, via the logAndQueueForPostBack method. * job status is reported back to this broadcast, via the logAndQueueForPostBack method.
* This method should use the updateServiceActivityCount(+-1) method to allow Broadcast
* to keep track of overall service progress. The serviceActvityCount is used to determine
* if all service threads are idle or terminated.
* @param batch * @param batch
* @param prerequisites * @param prerequisites
*/ */
@@ -1073,6 +1082,19 @@ public abstract class Broadcast
{ {
return 1; return 1;
} }
protected void updateServiceActivityCount(int increment)
{
synchronized (serviceActivityCount)
{
serviceActivityCount += increment;
if (increment < 0 && serviceActivityCount <= 0)
{
if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED);
if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED);
}
}
}


/** /**
* Sets jobStatus in job, and post job report. * Sets jobStatus in job, and post job report.
@@ -1096,8 +1118,7 @@ public abstract class Broadcast
*/ */
protected void postJobStatus(Job job, long rescheduleTimeMS) protected void postJobStatus(Job job, long rescheduleTimeMS)
{ {
logJobCount("Entering postJobStatus");
myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS);
//logJobCount("Entering postJobStatus");
if (postback != null) if (postback != null)
{ {
JobReport report = mkJobReport(); JobReport report = mkJobReport();
@@ -1115,12 +1136,12 @@ public abstract class Broadcast
{ {
// No more rescheduling on cancel, expire, or pause // No more rescheduling on cancel, expire, or pause
completedJobCount++; completedJobCount++;
logJobCount("Completed a job");
//logJobCount("Completed a job");
} }
else if (rescheduleTimeMS == 0) else if (rescheduleTimeMS == 0)
{ {
addJob(job); addJob(job);
logJobCount("Added a job to queue");
//logJobCount("Added a job to queue");
} }
else if (rescheduleTimeMS > 0) else if (rescheduleTimeMS > 0)
{ {


+ 3
- 3
src/main/java/altk/comm/engine/CommEngine.java View File

@@ -227,7 +227,7 @@ public abstract class CommEngine extends HttpServlet
{ {
for (String id : broadcasts.keySet()) for (String id : broadcasts.keySet())
{ {
if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000)
if (now - broadcasts.get(id).changeStateTime > deadBroadcastViewingMinutes * 60 * 1000)
{ {
Broadcast broadcast = broadcasts.get(id); Broadcast broadcast = broadcasts.get(id);
completedJobCount += broadcast.getCompletedJobCount(); completedJobCount += broadcast.getCompletedJobCount();
@@ -333,7 +333,7 @@ public abstract class CommEngine extends HttpServlet
return; return;
} }
broadcast.pause(); broadcast.pause();
out.print("OK");
out.write("OK");
} }




@@ -348,7 +348,7 @@ public abstract class CommEngine extends HttpServlet
return; return;
} }
broadcast.resume(); broadcast.resume();
out.print("OK");
out.write("OK");
} }


/** /**


+ 17
- 6
src/main/java/altk/comm/engine/Postback.java View File

@@ -67,7 +67,7 @@ public class Postback
private final String postBackURL; private final String postBackURL;
private final String xmlTopElement; private final String xmlTopElement;
final Queue<String> postQueue;
final LinkedList<String> postQueue;
private final int maxQueueSize; private final int maxQueueSize;
private List<Sender> senderPool; private List<Sender> senderPool;
private final String myName; private final String myName;
@@ -366,6 +366,21 @@ public class Postback
postedTransactions += size; postedTransactions += size;
} }


/**
* Puts report at the head of post queue, disregarding size limit.
* This is suitable for posting broadcast status ahead of
* a possibly long line of transaction status.
* @param report
* @return
*/
public void queueReportFirst(String report)
{
synchronized(postQueue)
{
postQueue.offerFirst(report);
postQueue.notify();
}
}
/** /**
* Queues report to postQueue only if the queue size has not reached the * Queues report to postQueue only if the queue size has not reached the
* maxQueueSize. * maxQueueSize.
@@ -374,9 +389,6 @@ public class Postback
*/ */
public boolean queueReport(String report) public boolean queueReport(String report)
{ {
// Log for recovery in case of problem in posting report.
CommonLogger.activity.info("Attempting to queue report");
synchronized(postQueue) synchronized(postQueue)
{ {
for (;;) for (;;)
@@ -385,8 +397,7 @@ public class Postback
{ {
myLogger.debug("Queueing report" + report); myLogger.debug("Queueing report" + report);
postQueue.add(report); postQueue.add(report);
myLogger.debug("Added 1 report - postQueue size: " + postQueue.size());
postQueue.notifyAll();
postQueue.notify();
return true; return true;
} }
else else


Loading…
Cancel
Save