package altk.comm.engine; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; import altk.comm.engine.exception.BroadcastError; import altk.comm.engine.exception.BroadcastException; import altk.comm.engine.exception.EngineException; import altk.comm.engine.exception.PlatformError; import altk.comm.engine.exception.PlatformException; import altk.comm.engine.postback.PostBack; /** * Broadcast class absorbs what was formerly known as Dispatcher class. * * @author Yuk-Ming * */ public abstract class Broadcast { private static final int SCHEDULER_THREAD_POOL_SIZE = 5; private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id"; private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0; public final String broadcastType; private String broadcastId; private BroadcastState state = BroadcastState.ACCEPTED; String haltReason; String stateErrorText; public final long receiveTime; public long changeStateTime; /** * Count of jobs that are completed (excluding those that are * being rescheduled). */ private int completedJobCount = 0; /** * Dynamically keeps count of the total number jobs scheduled * in readyQueue. Initially it is set to be the size of the * recipientList. Then as jobs are processed, and when one is * to be repeated by re-adding it to the readyQueue, then this * number is incremented by 1. */ private int effectiveJobCount = 0; protected String activityRecordIdParamName; private String jobReportRootNodeName; /** * Set when reading request XML, but never used. */ private String launchRecordId; // protected XPath xpathEngine; protected String postBackURL; private PostBack postBack; public long expireTime; /** * Sleep time in milliseconds between consecutive job processing (actualliy batch) */ protected long sleepBetweenJobs; protected static Logger myLogger = Logger.getLogger(Broadcast.class); private Queue readyQueue; protected List serviceThreadPool; private Object resumeFlag; // Semaphore for dispatcher threads to resume. protected List recipientList; //private int remainingJobs; private ScheduledExecutorService scheduler; private int serviceThreadPoolSize; public static enum BroadcastState { ACCEPTED, RUNNING, PAUSING, PAUSED, CANCELING, CANCELED(true), // Final state PURGED(true), // Final state ABORTED(true), // final state EXPIRED(true), // final state COMPLETED(true); // Final state final public boolean isFinal; private BroadcastState() { isFinal = false; } private BroadcastState(boolean isFinal) { this.isFinal = isFinal; } } public enum StateChangeStatus { SUCCESS, NO_CHANGE, FORBIDDEN } /** * When a Broadcast is first created, its state is INSTALLING, during which * time, the broadcast request XML is digested. After this period, the * state of the broadcast enters into RUNNING, and it begins with adding * all VoiceJobs in the request to the Dispatcher's queues. During this period * calls are dispatched, setup, and terminate. When the last call is terminated, * the broadcast enters into the COMPLETED state. * * Transitions from INSTALLING to RUNNING, and from RUNNING to COMPLETED happen * automatically without any external influence. * * At any time, the broadcast may be canceled by user action, which causes * the broadcast to transition from the RUNNING state into the CANCELED state. * * Since the RUNNING state may go to the COMLETED or to the CANCELED state, * each due to a different thread, there needs to be a mutex to guarantee * state and data integrity. * * A INSTALLING or RUNNING broadcast may be paused by user action, stopping the * Dispatcher from making new calls and causing the broadcast to go to * the HALTED state. Certain error conditions result in the HALTED state. * * User may order a broadcast to be removed entirely with a purge command, * causing the broadcast to go to the PURGED state. This state is there so * objects, that has reference to a broadcast which has been purged, know * what to do in that situation. * * We need the pause operation to set a RUNNING machine to be in the PAUSING state, * allow ongoing jobs to proceed to normal termination, * whereas no new jobs are started. State goes from PAUSING to HALTED. * * Cancel-nice operation is pause, followed by the automatic transition * from HALTED to CANCELED. * * Cancel operation forces all jobs to abort, and the state transitions to CANCELED * immediately. * * Because the Dispatcher and Broadcast is one-to-one, and because they access each other's * data, Dispatcher should be combined into Broadcast. * * @author Yuk-Ming * */ static Map> toStates; static { // Initialize legal transitions of state machine. // For each state, define a list of legal states that this state can transition to toStates = new HashMap>(); // Transitions from INSTALLING toStates.put(BroadcastState.ACCEPTED, Arrays.asList( BroadcastState.RUNNING, // Normal transition BroadcastState.CANCELING, // User action BroadcastState.CANCELED, // User action BroadcastState.PAUSING, // User action BroadcastState.PAUSED, // User action BroadcastState.PURGED, // User action BroadcastState.ABORTED, // TTS error BroadcastState.EXPIRED, BroadcastState.COMPLETED // When recipient list is empty )); // Transitions from RUNNING toStates.put(BroadcastState.RUNNING, Arrays.asList( BroadcastState.CANCELING, // User action BroadcastState.CANCELED, // User action BroadcastState.PAUSING, // User action BroadcastState.PAUSED, // User action BroadcastState.PURGED, // User action BroadcastState.ABORTED, // Service provider irrecoverable error BroadcastState.EXPIRED, BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. )); // Transitions from CANCELING toStates.put(BroadcastState.CANCELING, Arrays.asList( BroadcastState.CANCELED, // User action BroadcastState.PURGED, // User action BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. )); // Transitions from HALTING toStates.put(BroadcastState.PAUSING, Arrays.asList( BroadcastState.RUNNING, // User action BroadcastState.CANCELED, // User action BroadcastState.PAUSED, BroadcastState.PURGED, // User action BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. )); // Transitions from HALTED toStates.put(BroadcastState.PAUSED, Arrays.asList( BroadcastState.RUNNING, // User action BroadcastState.CANCELED, // User action BroadcastState.CANCELING, // User action BroadcastState.PURGED, // User action BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. )); } public static class StateChangeResult { public StateChangeStatus stateChangeStatus; public BroadcastState currentState; public BroadcastState previousState; public StateChangeResult(StateChangeStatus stateChangeStatus, BroadcastState currentState, BroadcastState previousState) { this.stateChangeStatus = stateChangeStatus; this.currentState = currentState; this.previousState = previousState; } } protected class Service extends Thread { Object serviceProviderPeer; protected Service(String name) throws BroadcastException { serviceProviderPeer = getInitializedServiceProviderPeer(); setName(name); } public void run() { myLogger.info("Thread starting..."); for (;;) { if (threadsShouldStop()) { closeServiceProvider(serviceProviderPeer); return; } synchronized (resumeFlag) { if (threadsShouldPause()) { try { resumeFlag.wait(); } catch (InterruptedException e) { myLogger.warn("Dispatcher thread interrupted while waiting to resume"); return; } } } List batch = null; /** * Includes allocation from capacity. Only returns when the required allocation * is obtained. Example, RTP port allocation, limit due to total number of allowable calls. */ ServicePrerequisites prerequisites = null; synchronized(readyQueue) { // get a batch of jobs Job job = readyQueue.peek(); if (job == null) try { readyQueue.wait(); continue; } catch (InterruptedException e) { return; } myLogger.debug("Found some jobs"); // Check if expired if (System.currentTimeMillis() >= expireTime) { setState(BroadcastState.EXPIRED); continue; } /** * Includes allocation from capacity. Only returns when the required allocation * is obtained. Example, RTP port allocation, limit due to total number of allowable calls. */ prerequisites = secureServicePrerequisites(); if (threadsShouldStop() || threadsShouldPause()) { returnPrerequisites(prerequisites); continue; } // Check again if expired if (System.currentTimeMillis() >= expireTime) { returnPrerequisites(prerequisites); setState(BroadcastState.EXPIRED); continue; } // Now that we can go ahead with this job, let us remove this from queue readyQueue.poll(); batch = new ArrayList(); batch.add(job); // We we are to get a batch of more than one, let us fill in the rest. for (int i = 1; i < getJobBatchSize(); i++) { job = readyQueue.poll(); if (job == null) break; batch.add(job); } } if (batch != null && batch.size() > 0) { // Mark start time long now = System.currentTimeMillis(); for (Job job : batch) { job.startTime = now; } // Service the jobs try { processJobs(batch, serviceProviderPeer, prerequisites); } catch (EngineException e) { terminate(BroadcastState.ABORTED, e.getMessage()); } catch (Throwable t) { // This is unexpected. Log stack trace myLogger.error("Caught unexpected Throwable", t); terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); } } if (sleepBetweenJobs > 0) { try { Thread.sleep(sleepBetweenJobs); } catch (InterruptedException e1) { // Do nothing? } } } } } /** * * @param broadcastType * @param activityRecordIdParamName - if null, default is used. * @param jobReportRootNodeName */ protected Broadcast(String broadcastType, String activityRecordIdParamName, String jobReportRootNodeName) { this.broadcastType = broadcastType; this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName; this.jobReportRootNodeName = jobReportRootNodeName; sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT; readyQueue = new LinkedBlockingQueue(); serviceThreadPool = new ArrayList(); recipientList = new ArrayList(); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); resumeFlag = new Object(); receiveTime = System.currentTimeMillis(); } /** * Experimental formulation where it takes over directing * the activity of a Broadcast, as it should, instead of relegating * it to CommEngine. This is directly invoked by CommEngine.doPost method, * and is much easier to read and comprehend. *

It is responsible for *

    *
  • Replying with HTTP response, and closing HTTP request and response *
  • Does broadcast and posts real-time progress, using post back queues * from CommEngine. *
* Strategy of execution: *
    *
  • Decode xml request *
  • Set up Service threads *
  • Invite derived class to set up contexts, one for each of the Service threads. *
  • Do broadcast. *
* This method is to be invoked by the CommEngine.doPost method, which fields a HTTP POST. *

* Adopting the use of this method by CommEngine * is 100% compatible with derived classes VoiceBroadcast and EmailBroadcast * existing today, 6/18/2016. * * * @param request * @param response * @param commEngine */ protected void doPost(HttpServletRequest request, HttpServletResponse response, CommEngine commEngine) { myLogger.debug("Entering Broadcast.doPost method"); BroadcastException myException = null; this.serviceThreadPoolSize = commEngine.getServiceThreadPoolSize(); try { boolean notInService = commEngine.notInService(); decode(request, notInService); // Now that have decoded the id of this broadcast, // ask CommEngine to install it with its id. commEngine.installBroadcast(this); if (notInService) { throw new PlatformException(PlatformError.RUNTIME_ERROR, "Not in service"); } if (recipientList.size() == 0) { // TODO: Got to return HTTP content before returning. CommonLogger.activity.info("Broadcast " + getBroadcastId() + ": No recipients"); setState(BroadcastState.COMPLETED, "No recipients", null); return; } postBack = (PostBack)commEngine.getPostBack(getPostBackURL(), broadcastType); initSync(commEngine.getResources()); init(postBack); if (getState() == BroadcastState.COMPLETED) return; } catch (BroadcastException e) { // TODO: Got to return HTTP content before returning. myException = e; setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); return; } catch (Throwable t) { // Caught stray unexpected runtime problem CommonLogger.alarm.error("Broadcast aborted: " + t); myLogger.error("Broadcast aborted", t); myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage()); setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText); } finally { // Return regular or error response String responseXML = getResponseXML(myException); PrintWriter writer; try { writer = response.getWriter(); writer.write(responseXML); writer.close(); } catch (IOException e) { myLogger.error("Failed to write reponse to requester. Aborts broadcast." ); if (state != BroadcastState.ABORTED) { setState(BroadcastState.ABORTED, "Failed to reply to requester", e.getMessage()); } return; } if (myException != null) return; } // So far so good, we now go ahead with completing // initialization. try { initAsync(); effectiveJobCount = recipientList.size(); // Create service thread pool to dispatch jobs, // at the same time, setting up a list of service thread names // for use by derived class to set up contexts in which // these threads run. myLogger.debug("At creating service threads, serviceThreadPoolSize = " + serviceThreadPoolSize); List serviceThreadNames = new ArrayList(); for (int i = 0; i < serviceThreadPoolSize; i++) { String threadName = broadcastId + "_service_thread_" + i; Service serviceThread = new Service(threadName); serviceThreadPool.add(serviceThread); serviceThreadNames.add(threadName); } //initServiceThreadContexts(serviceThreadNames); doBroadcast(); } catch (BroadcastException e) { setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); } } protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); /** * Creates and initializes a service provider, to be used by only one service thread. * If service provider is not thread-specific, then this method may return null, and * a common service provider is created outside of this method. * * @return service provider as a class Object instance. * @throws BroadcastException */ protected abstract Object getInitializedServiceProviderPeer() throws BroadcastException; /** * Obtains the required components to support a service; e.g. RTP port, or a place * in maximum total number of calls. Does not return till the reequired prerequisites are obtained. * @return null, if no prerequisite is required, as in the case of email and sms engines. */ abstract protected ServicePrerequisites secureServicePrerequisites(); abstract public void closeServiceProvider(Object serviceProvider); /** * Makes a state transition to the given newState if the transition from * the current state is legal. * @param newState * @return StateChangeResult */ public StateChangeResult setState(BroadcastState newState) { return setState(newState, null, null); } /** * Makes a state transition to the given newState if the transition from * the current state is legal. Also posts back a state change notification. * @param newState * @return StateChangeResult */ public synchronized StateChangeResult setState(BroadcastState newState, String haltReason, String stateErrorText) { boolean isLegal; BroadcastState prev = null; synchronized (this) { if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null); List to = toStates.get(state); isLegal = (to == null? false : to.contains(newState)); prev = state; if (isLegal) { state = newState; changeStateTime = System.currentTimeMillis(); } } if (isLegal) { this.haltReason = haltReason; this.stateErrorText = stateErrorText; CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state)); if (postBack != null) { postBack.queueReport(mkStatusReport()); } return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); } else { myLogger.warn(String.format("Broadcast %s: Transition from %s to %s forbidden", broadcastId, prev, newState)); return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null); } } protected void setBroadcastId(String broadcastId) { if (broadcastId == null) throw new IllegalArgumentException( "Argument broadcastId in Broadcast.setBroadcastId method cannot be null"); if (this.broadcastId != null) throw new IllegalStateException( "Broadcast.setBroadcastId method cannot be invoked more than once for a Broadcast"); this.broadcastId = broadcastId; } protected void setLaunchRecordId(String launchRecordId) { if (launchRecordId == null) throw new IllegalArgumentException( "Argument launchRecordId in Broadcast.setLaunchRecordId method cannot be null"); if (this.launchRecordId != null) throw new IllegalStateException( "Broadcast.setLaunchRecordId method cannot be invoked more than once for a Broadcast"); this.launchRecordId = launchRecordId; } public String getBroadcastId() { return broadcastId; } public String getLaunchRecordId() { return launchRecordId; } /** * * @param e * @return HTTP response for normal case (when exception e is null), or with exception */ public String getResponseXML(BroadcastException e) { String tagName = broadcastType + "_response"; StringBuffer responseXML = new StringBuffer("<" + tagName); if (broadcastId != null && broadcastId.length() > 0) { responseXML.append(" broadcast_id=\""); responseXML.append(broadcastId); responseXML.append("\""); } responseXML.append(" accepted='"); responseXML.append(e != null || getState() == BroadcastState.COMPLETED ? "FALSE" : "TRUE"); responseXML.append("'"); if (e == null) { if (haltReason != null && haltReason.length() > 0) { responseXML.append(" error='" + haltReason + "'"); } responseXML.append('>'); } else { if (e.errorCode != null) { responseXML.append(" error='"); responseXML.append(e.errorCode.toString()); responseXML.append("'"); } responseXML.append('>'); if (e.errorText != null) { responseXML.append(""); responseXML.append(e.errorText); responseXML.append(""); } } responseXML.append("'); return responseXML.toString(); } public String getPostBackURL() { return postBackURL; } protected String mkResponseXML(String errorCode, String errorText) { String tagName = broadcastType + "_response"; StringBuffer responseXML = new StringBuffer("<" + tagName); String broadcastId = getBroadcastId(); if (broadcastId != null && broadcastId.length() > 0) { responseXML.append(" broadcast_id=\""); responseXML.append(broadcastId); responseXML.append("\""); } responseXML.append(" accepted='"); responseXML.append(errorCode == null ? "TRUE" : "FALSE"); responseXML.append("'"); if (errorCode == null) { responseXML.append('>'); } else { responseXML.append(" error='"); responseXML.append(errorCode); responseXML.append("'"); responseXML.append('>'); if (errorText != null) { responseXML.append(""); responseXML.append(errorText.replaceAll("\\&", "&") .replaceAll("<", "<")); responseXML.append(""); } } responseXML.append("'); return responseXML.toString(); } /** * If finalState is final, then this state is set, and dispatcher threads are stopped. * Overriding implementation may release all other resources, like timers. * @param finalState */ public void terminate(BroadcastState finalState) { terminate(finalState, null); } /** * If finalState is final, then this state is set, and dispatcher threads are stopped. * Overriding implementation may release all other resources, like timers. * @param finalState */ public void terminate(BroadcastState finalState, String reason) { if (!finalState.isFinal) throw new IllegalArgumentException("Argument finalState " + finalState + " in Broadcast.terminate method is not final"); StateChangeResult result = setState(finalState, reason, null); switch (result.stateChangeStatus) { case SUCCESS: break; case NO_CHANGE: return; case FORBIDDEN: myLogger.error("Not allow to terminate broadcast in " + result.currentState + " state"); return; default: // Should not happen return; } // Wake up all dispatcher threads waiting on readyQueue so they will all stop synchronized(readyQueue) { readyQueue.notifyAll(); } // Wake up all sleeping dispatcher threads for same reason. for(Thread t : serviceThreadPool) { try { t.interrupt(); } catch (Exception e) { myLogger.warn("Interrupted while waiting for Thread " + t.getName() + " to terminate"); } } // Quiesce scheduler, and terminate it. scheduler.shutdownNow(); } /** * Creates status report. * @return status report in XML. */ protected String mkStatusReport() { StringBuffer statusBf = new StringBuffer(); String topLevelTag = broadcastType; String broadcastId = getBroadcastId(); if (broadcastId == null) broadcastId = ""; statusBf.append("<" + topLevelTag + " broadcast_id='" + broadcastId + "' receive_time='" + receiveTime + "' recipient_count='" + recipientList.size() + "'"); if (launchRecordId != null) { statusBf.append(" launch_record_id='" + launchRecordId + "'"); } statusBf.append(">\r\n" + state + "" + changeStateTime + "\r\n"); if (state == BroadcastState.PAUSED || state == BroadcastState.ABORTED) { if (haltReason != null) { statusBf.append("" + haltReason + "\r\n"); } if (stateErrorText != null) { statusBf.append("" + stateErrorText + ""); } } statusBf.append("\r\n"); String statusReport = statusBf.toString(); return statusReport; } protected void onExpire() { } protected void setExpireTime(long expireTime) { this.expireTime = expireTime; } public long getExpireTime() { return expireTime; } /** * * @return number of active jobs, including those being * rescheduled by a timer. * Computed from effectiveJobCount, completedJobCount and readyQueue.size() */ protected int getActiveJobCount() { return effectiveJobCount - completedJobCount - readyQueue.size(); } /** * Parses broadcastId and return if notInService is true. * Otherwise, continue parsing postBackUrl, expireTime, recipientList, * and implementation-specific data from request. * Avoid throwing an exception before parsing and setting broadcastId. * @param notInService * @throws EngineException */ protected abstract void decode(HttpServletRequest request, boolean notInService) throws EngineException; /** * Remembers postBack, and * Creates thread pool of size dictated by broadcast, which determines the size based * on the chosen service provider. * * Overriding implementation must invoke this method at the end, and process information * contained in the broadcast, in preparation for the invocation of the process * method. * * If there is no error, the overriding implementation must return this base method. * * @param commEngine * * @throws BroadcastException */ protected final void init(PostBack postBack) { // Remember postBack this.postBack = postBack; for (Recipient recipient : recipientList) { readyQueue.add(mkJob(recipient)); } //remainingJobs = readyQueue.size(); } protected abstract void initSync(EngineResources resources) throws BroadcastException; protected Job mkJob(Recipient recipient) { return new Job(recipient); } /** * Overriding implementation performs time consuming initialization, after returning * POST http status indicating accepting broadcast for processing. * * @throws BroadcastException */ protected void initAsync() throws BroadcastException { // Do nothing in base class. } public String getId() { return broadcastId; } /** * Sets the stateMachine to CANCEL */ protected void cancel(PrintWriter out) { BroadcastState targetState = getActiveJobCount() == 0? BroadcastState.CANCELED : BroadcastState.CANCELING; StateChangeResult result = setState(targetState); String responseContent = null; switch (result.stateChangeStatus) { case SUCCESS: responseContent = "OK"; break; case NO_CHANGE: responseContent = "Not canceled: Already cancelled"; break; case FORBIDDEN: responseContent = "Not canceled: Not allowed to cancel a broadcast in " + result.currentState + " state"; } out.write(responseContent); synchronized(resumeFlag) { resumeFlag.notifyAll(); } } protected void pause() { // Sets state to HALTED, which is monitored by Broadcast.Service threads. setState(BroadcastState.PAUSING); } protected void resume() { synchronized (resumeFlag) { if (threadsShouldPause()) { setState(BroadcastState.RUNNING); resumeFlag.notifyAll(); } } } /** * Derived class may make its own Implementation of JobReport * @return */ protected JobReport mkJobReport() { return new JobReport(); } public void addJob(Job job) { synchronized(readyQueue) { readyQueue.add(job); readyQueue.notifyAll(); } } /** * For use by the new Broadcast.doPost method. * It changes state to RUNNING and waits for all Service threads * to terminate after starting them. * @throws BroadcastException */ public void doBroadcast() throws BroadcastException { setState(BroadcastState.RUNNING); // Start the dispatcher threads for (Service thread : serviceThreadPool) { thread.start(); } for (Service thread : serviceThreadPool) { try { thread.join(); } catch (InterruptedException e) { myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e); } } close(); myLogger.info("Broadcast " + getId() + " terminated"); } /** * Derived may release resources here. */ protected void close() { // Do nothing in base class } public void startProcessing() throws BroadcastException { effectiveJobCount = recipientList.size(); // Create service thread pool to dispatch jobs myLogger.debug("At creating service threads, serviceThreadPoolSize = " + serviceThreadPoolSize); for (int i = 0; i < serviceThreadPoolSize; i++) { String threadName = broadcastId + "_service_thread_" + i; Service serviceThread = new Service(threadName); serviceThreadPool.add(serviceThread); } initServiceThreadContexts(); setState(BroadcastState.RUNNING); // Start the dispatcher threads for (Service thread : serviceThreadPool) { thread.start(); } } /** * Derived class may set up environment before starting Service threads. * @param serviceThreadNames */ protected void initServiceThreadContexts() { // Do nothing in base class } /** * Experimental - needed to go with the also experimental method exec. * @param serviceThreadNames */ protected void initServiceThreadContexts(List serviceThreadNames) { // Do nothing in base class } private boolean threadsShouldStop() { BroadcastState state = getState(); return state == BroadcastState.CANCELING || state.isFinal; } private boolean threadsShouldPause() { BroadcastState state = getState(); return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING; } /** * job status is reported back to this broadcast, via the logAndQueueForPostBack method. * @param batch * @param prerequisites */ abstract protected void processJobs(List batch, Object serviceProvider, ServicePrerequisites prerequisites) throws EngineException; /** * Size of a batch of jobs to be processed together. For email, this may be more than 1, * and this method should be overridden. * @return size of a batch of jobs to be processed together. */ protected int getJobBatchSize() { return 1; } /** * Sets jobStatus in job, and post job report. * If jobStatus is final, and no rescheduling, * then decrement number of remainingJobs,and increment completedJobCount. * @param job * @param jobStatus * @param errorText */ public synchronized void postJobStatus(Job job) { postJobStatus(job, -1); /* if (postBack != null) { JobReport report = mkJobReport(); report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); report.init(job); postBack.queueReport(report.toString()); } if (job.jobStatus.isTerminal()) { remainingJobs--; completedJobCount++; if (remainingJobs == 0) { terminate(BroadcastState.COMPLETED); } else if (getActiveJobCount() == 0) { if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED); } } */ } /** * Sets jobStatus in job, and post job report. * Optionally reschedules job. * If no rescheduling, then decrement number of remainingJobs, * @param job * @param rescheduleTimeMS - reschedule time in milliseconds (-1 means do not reschedule). */ protected void postJobStatus(Job job, long rescheduleTimeMS) { //postJobStatus(job); logJobCount("Enering postJobStatus"); myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS); if (postBack != null) { JobReport report = mkJobReport(); report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); report.init(job); postBack.queueReport(report.toString()); } //if (job.jobStatus.isTerminal()) if (rescheduleTimeMS < 0 // No more rescheduling on cancel, expire, or pause || state == BroadcastState.CANCELING || state == BroadcastState.CANCELED || state == BroadcastState.EXPIRED || state == BroadcastState.PAUSED || state == BroadcastState.PAUSING ) { //remainingJobs--; completedJobCount++; logJobCount("Completed a job"); if (getRemainingJobCount() == 0) { terminate(BroadcastState.COMPLETED); } else if (getActiveJobCount() == 0) { if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED); } } else if (rescheduleTimeMS == 0) { addJob(job); effectiveJobCount++; logJobCount("Added a job to queue"); } else if (rescheduleTimeMS > 0) { rescheduleJob(job, rescheduleTimeMS); } } /** * Logs effectiveJobCount, completedJobCount, readyQueue.size(), * active job count, and total which recipientList.size() * Job statistics are collected by length of readyQueue, completedJobCount, * and effectiveJobCount. */ private void logJobCount(String title) { myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, total jobs: %d, remaining %d, effectiveJobCount %d", title, completedJobCount, getActiveJobCount(), readyQueue.size(), recipientList.size(), getRemainingJobCount(), effectiveJobCount )); } /** * Number of jobs to be completed. * Computed from effectiveJobCount and completedJobCount * @return */ private int getRemainingJobCount() { return effectiveJobCount - completedJobCount; } public ScheduledFuture rescheduleJob(final Job job, long rescheduleTimeMS) { Runnable r = new Runnable() { public void run() { addJob(job);}}; return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); } public BroadcastState getState() { return state; } public int getReadyJobCount() { switch (state) { case RUNNING: case PAUSING: case PAUSED: return readyQueue.size(); default: return 0; } } public int getCompletedJobCount() { return completedJobCount; } protected void setServiceThreadPoolsize(int serviceThreadPoolSize) { this.serviceThreadPoolSize = serviceThreadPoolSize; } }