|
|
@@ -69,19 +69,23 @@ public abstract class Broadcast |
|
|
protected long sleepBetweenJobs; |
|
|
protected long sleepBetweenJobs; |
|
|
|
|
|
|
|
|
protected static Logger myLogger = Logger.getLogger(Broadcast.class); |
|
|
protected static Logger myLogger = Logger.getLogger(Broadcast.class); |
|
|
|
|
|
|
|
|
private Queue<Job> readyQueue; |
|
|
|
|
|
protected List<Service> serviceThreadPool; |
|
|
protected List<Service> serviceThreadPool; |
|
|
private Object resumeFlag; // Semaphore for dispatcher threads to resume. |
|
|
private Object resumeFlag; // Semaphore for dispatcher threads to resume. |
|
|
protected List<Recipient> recipientList; |
|
|
protected List<Recipient> recipientList; |
|
|
|
|
|
|
|
|
private ScheduledExecutorService scheduler; |
|
|
private ScheduledExecutorService scheduler; |
|
|
private int jobsTotal; |
|
|
private int jobsTotal; |
|
|
|
|
|
|
|
|
|
|
|
/** Queue of jobs ready to be servivced. Semaphore for next 3 fields */ |
|
|
|
|
|
private Queue<Job> readyQueue; |
|
|
|
|
|
/** Count of items in scheduler */ |
|
|
private int scheduledJobs; |
|
|
private int scheduledJobs; |
|
|
private Integer transactions; |
|
|
|
|
|
|
|
|
|
|
|
/** Instantaneous number of jobs being serviced by service provider */ |
|
|
/** Instantaneous number of jobs being serviced by service provider */ |
|
|
private int serviceActivityCount; |
|
|
private int serviceActivityCount; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Integer transactions; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** Running count of successful jobs */ |
|
|
/** Running count of successful jobs */ |
|
|
private int successCount; |
|
|
private int successCount; |
|
|
@@ -281,16 +285,8 @@ public abstract class Broadcast |
|
|
public void run() |
|
|
public void run() |
|
|
{ |
|
|
{ |
|
|
myLogger.info("Thread starting..."); |
|
|
myLogger.info("Thread starting..."); |
|
|
for (;;) |
|
|
|
|
|
|
|
|
while (! serviceThreadsShouldStop()) |
|
|
{ |
|
|
{ |
|
|
if (serviceThreadsShouldStop()) |
|
|
|
|
|
{ |
|
|
|
|
|
// Exit thread |
|
|
|
|
|
myLogger.info("Thread terminating"); |
|
|
|
|
|
System.out.println(getName() + " terminating"); |
|
|
|
|
|
closeServiceProvider(serviceProviderPeer); |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
if (serviceThreadsShouldPause()) |
|
|
if (serviceThreadsShouldPause()) |
|
|
{ |
|
|
{ |
|
|
synchronized (resumeFlag) |
|
|
synchronized (resumeFlag) |
|
|
@@ -306,16 +302,7 @@ public abstract class Broadcast |
|
|
myLogger.warn("Dispatcher thread interrupted while waiting to resume"); |
|
|
myLogger.warn("Dispatcher thread interrupted while waiting to resume"); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
if (serviceThreadsShouldStop()) |
|
|
|
|
|
{ |
|
|
|
|
|
// Exit thread |
|
|
|
|
|
myLogger.info("Thread terminating"); |
|
|
|
|
|
System.out.println(getName() + " terminating"); |
|
|
|
|
|
closeServiceProvider(serviceProviderPeer); |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Get a batch of jobs, if available |
|
|
// Get a batch of jobs, if available |
|
|
myLogger.debug("Looking for jobs"); |
|
|
myLogger.debug("Looking for jobs"); |
|
|
List<Job> batch = new ArrayList<Job>(); |
|
|
List<Job> batch = new ArrayList<Job>(); |
|
|
@@ -394,6 +381,10 @@ public abstract class Broadcast |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
// Exit thread |
|
|
|
|
|
myLogger.info("Thread terminating"); |
|
|
|
|
|
System.out.println(getName() + " terminating"); |
|
|
|
|
|
closeServiceProvider(serviceProviderPeer); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@@ -535,15 +526,16 @@ public abstract class Broadcast |
|
|
// initialization. |
|
|
// initialization. |
|
|
try |
|
|
try |
|
|
{ |
|
|
{ |
|
|
initAsync(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
jobsTotal = recipientList.size(); |
|
|
jobsTotal = recipientList.size(); |
|
|
postback = new Postback(this, |
|
|
postback = new Postback(this, |
|
|
getPostbackMaxQueueSize(), |
|
|
getPostbackMaxQueueSize(), |
|
|
getPostbackSenderPoolSize(), |
|
|
getPostbackSenderPoolSize(), |
|
|
getPostbackMaxBatchSize()); |
|
|
getPostbackMaxBatchSize()); |
|
|
|
|
|
|
|
|
// Create service thread pool to dispatch jobs, |
|
|
|
|
|
|
|
|
initAsync(); |
|
|
|
|
|
|
|
|
|
|
|
// Create service thread pool to dispatch jobs, |
|
|
// at the same time, setting up a list of service thread names |
|
|
// at the same time, setting up a list of service thread names |
|
|
// for use by derived class to set up contexts in which |
|
|
// for use by derived class to set up contexts in which |
|
|
// these threads run. |
|
|
// these threads run. |
|
|
@@ -568,9 +560,20 @@ public abstract class Broadcast |
|
|
} |
|
|
} |
|
|
catch (Exception e) |
|
|
catch (Exception e) |
|
|
{ |
|
|
{ |
|
|
|
|
|
setState(BroadcastState.ABORTED, BroadcastError.UNEXPECTED_EXCEPTION.toString(), e.getMessage()); |
|
|
CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); |
|
|
CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); |
|
|
myLogger.error("Broadcast aborted", e); |
|
|
myLogger.error("Broadcast aborted", e); |
|
|
} |
|
|
} |
|
|
|
|
|
finally |
|
|
|
|
|
{ |
|
|
|
|
|
destroyResources(); |
|
|
|
|
|
if (postback != null) |
|
|
|
|
|
{ |
|
|
|
|
|
postback.wrapup(); |
|
|
|
|
|
postback = null; |
|
|
|
|
|
} |
|
|
|
|
|
CommonLogger.activity.info("Broadcast " + getId() + " terminated"); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
protected int getPostbackMaxQueueSize() |
|
|
protected int getPostbackMaxQueueSize() |
|
|
@@ -791,16 +794,6 @@ public abstract class Broadcast |
|
|
return responseXML.toString(); |
|
|
return responseXML.toString(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
* If finalState is final, then this state is set, and dispatcher threads are stopped. |
|
|
|
|
|
* Overriding implementation may release all other resources, like timers. |
|
|
|
|
|
* @param finalState |
|
|
|
|
|
*/ |
|
|
|
|
|
public void terminate(BroadcastState finalState) |
|
|
|
|
|
{ |
|
|
|
|
|
terminate(finalState, null); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
* If finalState is final, then this state is set, and dispatcher threads are stopped. |
|
|
* If finalState is final, then this state is set, and dispatcher threads are stopped. |
|
|
* Overriding implementation may release all other resources, like timers. |
|
|
* Overriding implementation may release all other resources, like timers. |
|
|
@@ -952,7 +945,7 @@ public abstract class Broadcast |
|
|
* Overriding implementation performs time consuming initialization, after returning |
|
|
* Overriding implementation performs time consuming initialization, after returning |
|
|
* POST http status indicating accepting broadcast for processing. |
|
|
* POST http status indicating accepting broadcast for processing. |
|
|
* |
|
|
* |
|
|
* @throws BroadcastException |
|
|
|
|
|
|
|
|
* @throws BroadcastException - to abort broadcast |
|
|
*/ |
|
|
*/ |
|
|
protected void initAsync() throws BroadcastException |
|
|
protected void initAsync() throws BroadcastException |
|
|
{ |
|
|
{ |
|
|
@@ -1060,10 +1053,6 @@ public abstract class Broadcast |
|
|
} |
|
|
} |
|
|
waitForEndOfService(); |
|
|
waitForEndOfService(); |
|
|
} |
|
|
} |
|
|
destroyResources(); |
|
|
|
|
|
postback.wrapup(); |
|
|
|
|
|
postback = null; |
|
|
|
|
|
CommonLogger.activity.info("Broadcast " + getId() + " terminated"); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
@@ -1128,7 +1117,7 @@ public abstract class Broadcast |
|
|
} |
|
|
} |
|
|
if (getRemainingJobCount() == 0) |
|
|
if (getRemainingJobCount() == 0) |
|
|
{ |
|
|
{ |
|
|
wakeUpServiceThreads(); |
|
|
|
|
|
|
|
|
wakeUpServiceThreads(); |
|
|
return true; |
|
|
return true; |
|
|
} |
|
|
} |
|
|
return false; |
|
|
return false; |
|
|
@@ -1183,6 +1172,9 @@ public abstract class Broadcast |
|
|
{ |
|
|
{ |
|
|
if (state == BroadcastState.RUNNING) |
|
|
if (state == BroadcastState.RUNNING) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// TODO: investigate possibility that 0 remainingJobCount may |
|
|
|
|
|
// not be final. It may still change because a finishing job |
|
|
|
|
|
// may cause a job to be scheduled. |
|
|
if (getRemainingJobCount() == 0) { |
|
|
if (getRemainingJobCount() == 0) { |
|
|
setState(BroadcastState.COMPLETED); |
|
|
setState(BroadcastState.COMPLETED); |
|
|
} |
|
|
} |
|
|
@@ -1191,7 +1183,7 @@ public abstract class Broadcast |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
|
/** |
|
|
* Sets jobStatus in job, and post job report. |
|
|
* Sets jobStatus in job, and post job report. |
|
|
* If jobStatus is final, and no rescheduling, |
|
|
* If jobStatus is final, and no rescheduling, |
|
|
* then decrement number of remainingJobs,and increment completedJobCount. |
|
|
* then decrement number of remainingJobs,and increment completedJobCount. |
|
|
@@ -1314,8 +1306,8 @@ public abstract class Broadcast |
|
|
Runnable r = new Runnable() { public void run() { |
|
|
Runnable r = new Runnable() { public void run() { |
|
|
synchronized(readyQueue) { |
|
|
synchronized(readyQueue) { |
|
|
scheduledJobs--; |
|
|
scheduledJobs--; |
|
|
|
|
|
addJob(job); |
|
|
} |
|
|
} |
|
|
addJob(job); |
|
|
|
|
|
} |
|
|
} |
|
|
}; |
|
|
}; |
|
|
return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); |
|
|
return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); |
|
|
|