Browse Source

Fixed bug in job managements and broadcast status

tags/1.0.2
Yuk-Ming Lam 4 years ago
parent
commit
7296a77eac
3 changed files with 125 additions and 57 deletions
  1. +65
    -23
      src/main/java/altk/comm/engine/Broadcast.java
  2. +3
    -3
      src/main/java/altk/comm/engine/CommEngine.java
  3. +57
    -31
      src/main/java/altk/comm/engine/Postback.java

+ 65
- 23
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -74,12 +74,16 @@ public abstract class Broadcast
private ScheduledExecutorService scheduler; private ScheduledExecutorService scheduler;
private int serviceThreadPoolSize_default; private int serviceThreadPoolSize_default;
private int jobsTotal; private int jobsTotal;
private int scheduledJobs;
protected int transactions;
protected int completedTransactions;


public static enum BroadcastState public static enum BroadcastState
{ {
ACCEPTED, ACCEPTED,
RUNNING, RUNNING,
ALLDONE,
// ALLDONE,
PAUSING, PAUSING,
PAUSED, PAUSED,
CANCELING, CANCELING,
@@ -175,14 +179,15 @@ public abstract class Broadcast
// Transitions from RUNNING // Transitions from RUNNING
toStates.put(BroadcastState.RUNNING, Arrays.asList( toStates.put(BroadcastState.RUNNING, Arrays.asList(
BroadcastState.COMPLETED, // after completed all sends
BroadcastState.CANCELING, // User action BroadcastState.CANCELING, // User action
BroadcastState.CANCELED, // User action BroadcastState.CANCELED, // User action
BroadcastState.PAUSING, // User action BroadcastState.PAUSING, // User action
BroadcastState.PAUSED, // User action BroadcastState.PAUSED, // User action
BroadcastState.PURGED, // User action BroadcastState.PURGED, // User action
BroadcastState.ABORTED, // Service provider irrecoverable error BroadcastState.ABORTED, // Service provider irrecoverable error
BroadcastState.EXPIRED,
BroadcastState.ALLDONE // Natural transition, if all ongoing calls complete and no more jobs in Dispatcher queues.
BroadcastState.EXPIRED
// BroadcastState.ALLDONE // Natural transition, if all ongoing calls complete and no more jobs in Dispatcher queues.
)); ));
// Transitions from CANCELING // Transitions from CANCELING
@@ -207,9 +212,9 @@ public abstract class Broadcast
BroadcastState.PURGED // User action BroadcastState.PURGED // User action
)); ));
// Transitions from ALLDONE // Transitions from ALLDONE
toStates.put(BroadcastState.ALLDONE, Arrays.asList(
BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more jobs in post queue
));
//toStates.put(BroadcastState.ALLDONE, Arrays.asList(
// BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more jobs in post queue
// ));
} }
public static class StateChangeResult public static class StateChangeResult
@@ -242,7 +247,7 @@ public abstract class Broadcast
myLogger.info("Thread starting..."); myLogger.info("Thread starting...");
for (;;) for (;;)
{ {
if (threadsShouldStop())
if (serviceThreadsShouldStop())
{ {
closeServiceProvider(serviceProviderPeer); closeServiceProvider(serviceProviderPeer);
myLogger.info("Thread terminating"); myLogger.info("Thread terminating");
@@ -301,7 +306,7 @@ public abstract class Broadcast
*/ */
prerequisites = secureServicePrerequisites(); prerequisites = secureServicePrerequisites();
if (threadsShouldStop() || threadsShouldPause())
if (serviceThreadsShouldStop() || threadsShouldPause())
{ {
returnPrerequisites(prerequisites); returnPrerequisites(prerequisites);
continue; continue;
@@ -347,6 +352,7 @@ public abstract class Broadcast
// Service the jobs // Service the jobs
try try
{ {
incrementTransactions(batch.size());
processJobs(batch, serviceProviderPeer, prerequisites); processJobs(batch, serviceProviderPeer, prerequisites);
} }
catch (EngineException e) catch (EngineException e)
@@ -399,7 +405,22 @@ public abstract class Broadcast
receiveTime = System.currentTimeMillis(); receiveTime = System.currentTimeMillis();
} }
/**
public int pendingJobs()
{
return readyQueue.size() + scheduledJobs;
}
public void incrementTransactions(int size) {
transactions += size;
}
public void incrementCompletedTransactions(int size)
{
completedTransactions += size;
}
/**
* Experimental formulation where it takes over directing * Experimental formulation where it takes over directing
* the activity of a Broadcast, as it should, instead of relegating * the activity of a Broadcast, as it should, instead of relegating
* it to CommEngine. This is directly invoked by CommEngine.doPost method, * it to CommEngine. This is directly invoked by CommEngine.doPost method,
@@ -843,7 +864,7 @@ public abstract class Broadcast
} }
} }
statusBf.append("<job_summary completed='" + getCompletedJobCount() + statusBf.append("<job_summary completed='" + getCompletedJobCount() +
"' ready='" + getReadyJobCount() + "'");
"' ready='" + getPendingJobCount() + "'");
statusBf.append(" active='" + getActiveJobCount() + "'"); statusBf.append(" active='" + getActiveJobCount() + "'");
statusBf.append("></job_summary></" + topLevelTag + ">\r\n"); statusBf.append("></job_summary></" + topLevelTag + ">\r\n");
String statusReport = statusBf.toString(); String statusReport = statusBf.toString();
@@ -1001,16 +1022,25 @@ public abstract class Broadcast
myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e); myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e);
} }
} }
close();
setState(BroadcastState.COMPLETED);
destroyResources();
postback.wrapup();
postback = null;
myLogger.info("Broadcast " + getId() + " terminated"); myLogger.info("Broadcast " + getId() + " terminated");
} }
/**
* Derived class destroy resources needed for providing service
*/
protected void destroyResources() {}

/** /**
* Derived may release resources here. * Derived may release resources here.
*/ */
protected void close() protected void close()
{ {
postback.shutdownWhenDone();
myLogger.debug("In close()");;
postback.wrapup();
postback = null; postback = null;
} }
@@ -1033,10 +1063,11 @@ public abstract class Broadcast
} }
private boolean threadsShouldStop()
private boolean serviceThreadsShouldStop()
{ {
return state == BroadcastState.CANCELING || return state == BroadcastState.CANCELING ||
state == BroadcastState.CANCELED || state.isFinal;
state == BroadcastState.CANCELED || state.isFinal
|| pendingJobs() == 0;
} }


private boolean threadsShouldPause() private boolean threadsShouldPause()
@@ -1104,7 +1135,7 @@ public abstract class Broadcast
// No more rescheduling on cancel, expire, or pause // No more rescheduling on cancel, expire, or pause
completedJobCount++; completedJobCount++;
logJobCount("Completed a job"); logJobCount("Completed a job");
if (allDone() && state==BroadcastState.RUNNING) setState(BroadcastState.ALLDONE);
//if (allDone() && state==BroadcastState.RUNNING) setState(BroadcastState.ALLDONE);
} }
else if (rescheduleTimeMS == 0) else if (rescheduleTimeMS == 0)
{ {
@@ -1118,18 +1149,18 @@ public abstract class Broadcast
} }
/** /**
* Logs effectiveJobCount, completedJobCount, readyQueue.size(),
* Logs completedJobCount, readyQueue.size(),
* active job count, and total. * active job count, and total.
* Job statistics are collected by length of readyQueue, completedJobCount, * Job statistics are collected by length of readyQueue, completedJobCount,
* and effectiveJobCount.
*/ */
private void logJobCount(String title) private void logJobCount(String title)
{ {
myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, total jobs: %d, remaining %d",
myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, scheduled %d, total jobs: %d, remaining %d",
title, title,
completedJobCount, completedJobCount,
getActiveJobCount(), getActiveJobCount(),
readyQueue.size(), readyQueue.size(),
scheduledJobs,
jobsTotal, jobsTotal,
getRemainingJobCount() getRemainingJobCount()
)); ));
@@ -1137,17 +1168,17 @@ public abstract class Broadcast


/** /**
* Number of jobs to be completed. * Number of jobs to be completed.
* Computed from effectiveJobCount and completedJobCount
* @return * @return
*/ */
private int getRemainingJobCount() private int getRemainingJobCount()
{ {
return jobsTotal - completedJobCount;
return readyQueue.size() + scheduledJobs;
} }


public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS) public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
{ {
Runnable r = new Runnable() { public void run() { addJob(job);}};
scheduledJobs++;
Runnable r = new Runnable() { public void run() { scheduledJobs--; addJob(job);}};
return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS);
} }
@@ -1156,14 +1187,14 @@ public abstract class Broadcast
return state; return state;
} }


public int getReadyJobCount()
public int getPendingJobCount()
{ {
switch (state) switch (state)
{ {
case RUNNING: case RUNNING:
case PAUSING: case PAUSING:
case PAUSED: case PAUSED:
return readyQueue.size();
return pendingJobs();
default: default:
return 0; return 0;
@@ -1182,4 +1213,15 @@ public abstract class Broadcast
public boolean allDone() { public boolean allDone() {
return (completedJobCount == jobsTotal); return (completedJobCount == jobsTotal);
} }
/*
public boolean stopOnEmptyPostQueue()
{
if (state == state.ALLDONE) return true;
// Add the ALLDONE message to postQueue
setState(BroadcastState.ALLDONE);
// Do not stop as one report has just been added by the previous setState().
return false;
}
*/
} }

+ 3
- 3
src/main/java/altk/comm/engine/CommEngine.java View File

@@ -402,19 +402,19 @@ public abstract class CommEngine extends HttpServlet
{ {
out.write(broadcast.mkStatusReport()); out.write(broadcast.mkStatusReport());
} }
out.write("<job_summary completed='" + getCompletedJobCount() + "' ready='" + getReadyJobCount() + "' active='" + getActiveJobCount() + "'/>");
out.write("<job_summary completed='" + getCompletedJobCount() + "' pending='" + getPendingJobCount() + "' active='" + getActiveJobCount() + "'/>");
out.write("</" + tag + ">"); out.write("</" + tag + ">");
} }
} }


public int getReadyJobCount()
public int getPendingJobCount()
{ {
int readyCount = 0; int readyCount = 0;
synchronized(broadcasts) synchronized(broadcasts)
{ {
for (Broadcast broadcast : broadcasts.values()) for (Broadcast broadcast : broadcasts.values())
{ {
readyCount += broadcast.getReadyJobCount();
readyCount += broadcast.getPendingJobCount();
} }
} }
return readyCount; return readyCount;


+ 57
- 31
src/main/java/altk/comm/engine/Postback.java View File

@@ -87,6 +87,10 @@ public class Postback


private int maxRetries; private int maxRetries;
private Broadcast broadcast; private Broadcast broadcast;
protected int postedTransactions;

private boolean shutdownWhenDone;


private static Logger myLogger = Logger.getLogger(Postback.class); private static Logger myLogger = Logger.getLogger(Postback.class);


@@ -100,7 +104,6 @@ public class Postback
class Sender extends Thread class Sender extends Thread
{ {
private boolean threadShouldStop;


private Sender(String name) private Sender(String name)
{ {
@@ -110,7 +113,6 @@ public class Postback
public void run() public void run()
{ {
threadShouldStop = false;
myLogger.info(getName() + " started"); myLogger.info(getName() + " started");
@@ -118,32 +120,27 @@ public class Postback
for (;;) // Each iteration sends a batch for (;;) // Each iteration sends a batch
{ {
if (threadShouldStop)
{
myLogger.info(getName() + " terminating");
System.out.println(getName() + " terminating");
return;
}
myLogger.debug("Looking for reports"); myLogger.debug("Looking for reports");
List<String> reportList = null;
List<String> reportList = new ArrayList<String>();
synchronized(postQueue) synchronized(postQueue)
{ {
// Each iteration examines the queue for a batch to send // Each iteration examines the queue for a batch to send
for (;;)
{
reportList = new ArrayList<String>();
//for (;;)
//{
//reportList = new ArrayList<String>();
for (int i = 0; i < maxBatchSize ; i++) for (int i = 0; i < maxBatchSize ; i++)
{ {
report = postQueue.poll(); report = postQueue.poll();
if (report == null) break; if (report == null) break;
reportList.add(report); reportList.add(report);
} }
/*
if (reportList.size() > 0) if (reportList.size() > 0)
{ {
myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size()));
postQueue.notifyAll(); postQueue.notifyAll();
break; // break out to do the work. break; // break out to do the work.
}
}TODO Auto-generated catch block
// No reports // No reports
BroadcastState toState = null; BroadcastState toState = null;
@@ -187,13 +184,14 @@ public class Postback
postQueue.notifyAll(); postQueue.notifyAll();
return; return;
} }
// Nothing to do, so wait a while, and look at the // Nothing to do, so wait a while, and look at the
// queue again. // queue again.


try try
{ {
myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); myLogger.debug("Going to wait " + QUEUE_WAIT * 1000);
postQueue.wait(QUEUE_WAIT * 1000);
postQueue.wait(); //QUEUE_WAIT * 1000);
} }
catch (InterruptedException e) catch (InterruptedException e)
{ {
@@ -203,9 +201,10 @@ public class Postback
CommonLogger.health.info("Surfacing from wait"); CommonLogger.health.info("Surfacing from wait");
System.out.println(getName() + " surfacing from wait"); System.out.println(getName() + " surfacing from wait");
continue; continue;
}
*/
// }
} // synchronized() } // synchronized()
if (reportList != null && reportList.size() > 0)
if (reportList.size() > 0)
{ {
switch (post(reportList)) switch (post(reportList))
{ {
@@ -234,6 +233,27 @@ public class Postback
} }
default: default:
} }
incrementPostedTransactions(reportList.size());
}
else
{
// empty post queue
if (threadsShouldStop())
{
myLogger.info(getName() + " terminating");
System.out.println(getName() + " terminating");
return;
}
synchronized (postQueue)
{
try
{
postQueue.wait();
} catch (InterruptedException e)
{
// Do nothing
}
}
} }
} }
} }
@@ -265,6 +285,7 @@ public class Postback
httpPost.setEntity(requestEntity); httpPost.setEntity(requestEntity);
myLogger.debug("Posting to " + postBackURL + ": " + xml); myLogger.debug("Posting to " + postBackURL + ": " + xml);
response = httpclient.execute(httpPost, new BasicHttpContext()); response = httpclient.execute(httpPost, new BasicHttpContext());
incrementPostedTransactions(reportList.size());
StatusLine statusLine = response.getStatusLine(); StatusLine statusLine = response.getStatusLine();
int statusCode = statusLine.getStatusCode(); int statusCode = statusLine.getStatusCode();
if (statusCode != 200) if (statusCode != 200)
@@ -347,16 +368,6 @@ public class Postback
return PostbackStatus.SUCCESS; return PostbackStatus.SUCCESS;
} }


public void terminate()
{
if (threadShouldStop) return;
threadShouldStop = true;
//Wait for at most 100 ms for thread to stop
interrupt();
}
} }
/** /**
@@ -434,7 +445,12 @@ public class Postback
} }
} }
/**
public void incrementPostedTransactions(int size)
{
postedTransactions += size;
}

/**
* Queues report to postQueue only if the queue size has not reached the * Queues report to postQueue only if the queue size has not reached the
* maxQueueSize. * maxQueueSize.
* @param report * @param report
@@ -451,7 +467,7 @@ public class Postback
{ {
if (postQueue.size() < maxQueueSize) if (postQueue.size() < maxQueueSize)
{ {
myLogger.debug("Queing report" + report);
myLogger.debug("Queueing report" + report);
postQueue.add(report); postQueue.add(report);
myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); myLogger.debug("Added 1 report - postQueue size: " + postQueue.size());
postQueue.notifyAll(); postQueue.notifyAll();
@@ -478,12 +494,22 @@ public class Postback
return false; return false;
} }

public void shutdownWhenDone()
private boolean threadsShouldStop()
{ {

return (shutdownWhenDone); // && postedTransactions == broadcast.transactions);
}
public void wrapup()
{
myLogger.debug("Wrapping up");
shutdownWhenDone = true;
try try
{ {
synchronized (postQueue)
{
postQueue.notifyAll();
}
// Wait for all postback threads to terminate // Wait for all postback threads to terminate
for (Sender sender : senderPool) for (Sender sender : senderPool)
{ {


Loading…
Cancel
Save