package altk.comm.engine; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Category; import org.apache.log4j.Logger; import org.json.simple.JSONObject; import com.opencsv.CSVWriter; import altk.comm.engine.Job.CommJobStatus; import altk.comm.engine.exception.BroadcastError; import altk.comm.engine.exception.BroadcastException; import altk.comm.engine.exception.EngineException; import altk.comm.engine.exception.PlatformError; import altk.comm.engine.exception.PlatformException; /** * Broadcast class absorbs what was formerly known as Dispatcher class. * * @author Yuk-Ming * */ public abstract class Broadcast { private static final int SCHEDULER_THREAD_POOL_SIZE = 5; private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id"; private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0; static final String DAILY_STOP_KEY = "daily_stop"; static final String DAILY_START_KEY = "daily_start"; static final String CSV_LOGGER = "csv"; public final String broadcastType; private String broadcastId; private BroadcastState state = BroadcastState.ACCEPTED; private Object stateSemaphore = new Object(); String reason; String stateErrorText; public CommEngine commEngine; public final long receiveTime; public long serviceStartTime; public long serviceEndTime; public long changeStateTime; protected String activityRecordIdParamName; private String jobReportRootNodeName; /** * Set when reading request XML, but never used. */ private String launchRecordId; // protected XPath xpathEngine; protected String postbackURL; private Postback postback; protected String daily_start = ""; protected String daily_stop = ""; /** * Sleep time in milliseconds between consecutive job processing (actualliy batch) */ protected long sleepBetweenJobs; protected static Logger myLogger = Logger.getLogger(Broadcast.class); protected List serviceThreadPool; private Object resumeFlag; // Semaphore for dispatcher threads to resume. protected List recipientList; private ScheduledExecutorService scheduler; private int jobsTotal; /** Queue of jobs ready to be servivced. Semaphore for next 3 fields */ private Queue readyQueue; /** Count of items in scheduler */ private int scheduledJobs; /** Instantaneous number of jobs being serviced by service provider */ private int serviceActivityCount; private Integer transactions; /** Running count of successful jobs */ private AtomicInteger successCount; private int pauseThreshold; private int lastPauseCount; private Logger csvLogger; public static enum BroadcastState { /** Broadcast request accepted for execution */ ACCEPTED, /** Servicing jobs */ RUNNING, /** User action, causing system to quiesce, i.e. quiet down */ PAUSING, /** System is paused and quiet. Ready to resume */ PAUSED, /** User action */ CANCELING, /** Ireoverable internal or service provider error */ ABORTING, /** All servicing done, reporting may still be ongoing */ COMPLETED, CANCELED(true), // Final state PURGED(true), // Final state ABORTED(true), // final state /** All servicing and reporting done */ ALLDONE(true) // Final state ; final public boolean isFinal; private BroadcastState() { isFinal = false; } private BroadcastState(boolean isFinal) { this.isFinal = isFinal; } } public enum StateChangeStatus { SUCCESS, NO_CHANGE, FORBIDDEN } /** * When a Broadcast is first created, its state is INSTALLING, during which * time, the broadcast request XML is digested. After this period, the * state of the broadcast enters into RUNNING, and it begins with adding * all VoiceJobs in the request to the Dispatcher's queues. During this period * calls are dispatched, setup, and terminate. When the last call is terminated, * the broadcast enters into the COMPLETED state. * * Transitions from INSTALLING to RUNNING, and from RUNNING to COMPLETED happen * automatically without any external influence. * * At any time, the broadcast may be canceled by user action, which causes * the broadcast to transition from the RUNNING state into the CANCELED state. * * Since the RUNNING state may go to the COMLETED or to the CANCELED state, * each due to a different thread, there needs to be a mutex to guarantee * state and data integrity. * * A INSTALLING or RUNNING broadcast may be paused by user action, stopping the * Dispatcher from making new calls and causing the broadcast to go to * the HALTED state. Certain error conditions result in the HALTED state. * * User may order a broadcast to be removed entirely with a purge command, * causing the broadcast to go to the PURGED state. This state is there so * objects, that has reference to a broadcast which has been purged, know * what to do in that situation. * * We need the pause operation to set a RUNNING machine to be in the PAUSING state, * allow ongoing jobs to proceed to normal termination, * whereas no new jobs are started. State goes from PAUSING to HALTED. * * Cancel-nice operation is pause, followed by the automatic transition * from HALTED to CANCELED. * * Cancel operation forces all jobs to abort, and the state transitions to CANCELED * immediately. * * Because the Dispatcher and Broadcast is one-to-one, and because they access each other's * data, Dispatcher should be combined into Broadcast. * * @author Yuk-Ming * */ static Map> toStates; static { // Initialize legal transitions of state machine. // For each state, define a list of legal states that this state can transition to toStates = new HashMap>(); // Transitions from INSTALLING toStates.put(BroadcastState.ACCEPTED, Arrays.asList( BroadcastState.RUNNING, // Normal transition BroadcastState.CANCELING, // User action BroadcastState.CANCELED, // User action BroadcastState.PAUSING, // User action BroadcastState.PAUSED, // User action BroadcastState.PURGED, // User action BroadcastState.ABORTING, BroadcastState.ABORTED, BroadcastState.ALLDONE // When recipient list is empty )); // Transitions from RUNNING toStates.put(BroadcastState.RUNNING, Arrays.asList( BroadcastState.COMPLETED, // after completed all sends BroadcastState.CANCELING, // User action BroadcastState.CANCELED, // User action BroadcastState.PAUSING, // User action BroadcastState.PAUSED, // User action BroadcastState.PURGED, // User action BroadcastState.ABORTING, BroadcastState.ABORTED )); // Transitions from CANCELING toStates.put(BroadcastState.CANCELING, Arrays.asList( BroadcastState.ABORTING, BroadcastState.ABORTED, BroadcastState.COMPLETED, BroadcastState.CANCELED, // User action BroadcastState.PURGED // User action )); // Transitions from ABORTING toStates.put(BroadcastState.ABORTING, Arrays.asList( BroadcastState.ABORTED, BroadcastState.PURGED // User action )); // Transitions from PAUSING toStates.put(BroadcastState.PAUSING, Arrays.asList( BroadcastState.RUNNING, // User action BroadcastState.COMPLETED, BroadcastState.ABORTING, BroadcastState.CANCELING, // User action BroadcastState.CANCELED, BroadcastState.PAUSED, BroadcastState.PURGED // User action )); // Transitions from PAUSED toStates.put(BroadcastState.PAUSED, Arrays.asList( BroadcastState.RUNNING, // User action BroadcastState.CANCELED, // User action BroadcastState.CANCELING, // User action BroadcastState.ABORTING, BroadcastState.ABORTED, BroadcastState.PURGED // User action )); toStates.put(BroadcastState.COMPLETED, Arrays.asList( BroadcastState.ALLDONE // when all posting back is complete )); } public static class StateChangeResult { public StateChangeStatus stateChangeStatus; public BroadcastState currentState; public BroadcastState previousState; public StateChangeResult(StateChangeStatus stateChangeStatus, BroadcastState currentState, BroadcastState previousState) { this.stateChangeStatus = stateChangeStatus; this.currentState = currentState; this.previousState = previousState; } } protected enum PostbackThreadActionOnEmpty { CONTINUE, STOP, WAIT } protected class Service extends Thread { Object serviceProviderPeer; protected Service(String name) throws BroadcastException { serviceProviderPeer = getInitializedServiceProviderPeer(); setName(name); } public void run() { myLogger.info("Thread starting..."); for (;;) { if (serviceThreadsShouldPause()) { synchronized (resumeFlag) { try { myLogger.debug("Paused"); resumeFlag.wait(); myLogger.debug("Pause ended"); } catch (InterruptedException e) { myLogger.warn("Dispatcher thread interrupted while waiting to resume"); } } } if (serviceThreadsShouldStop()) break; // Get a batch of jobs, if available myLogger.debug("Looking for jobs"); List batch = new ArrayList(); int batchSize = getJobBatchSize(); synchronized(readyQueue) { // We we are to get a batch of more than one, let us fill in the rest. for (int i = 0; i < batchSize; i++) { Job job = readyQueue.poll(); if (job == null) break; batch.add(job); } if (batch.size()== 0) { // wait for jobs try { myLogger.debug("Waiting for jobs"); readyQueue.wait(); // go back to look for jobs continue; } catch (Exception e) { // go back to look for jobs continue; } } updateServiceActivityCount(batchSize); } // Process jobs. // Mark start time long now = System.currentTimeMillis(); for (Job job : batch) { job.startTime = now; } // Service the jobs // But first get dependent resource // which includes allocation from capacity. Only returns when the required allocation // is obtained. Example, RTP port allocation, limit due to total number of allowable calls. ServicePrerequisites prerequisites = secureServicePrerequisites(); try { int transactions = processJobs(batch, serviceProviderPeer, prerequisites); incrementTransactions(transactions); } catch (EngineException e) { // Aborting myLogger.error("Caught unexpected Exception", e); setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText); updateServiceActivityCount(-batchSize); } catch (Throwable t) { // This is unexpected. Log stack trace myLogger.error("Caught unexpected Throwable", t); terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); updateServiceActivityCount(-batchSize); } if (sleepBetweenJobs > 0) { try { Thread.sleep(sleepBetweenJobs); } catch (InterruptedException e1) { // Do nothing? } } } // Exit thread myLogger.info("Thread terminating"); synchronized(readyQueue) { readyQueue.notify(); } System.out.println(getName() + " terminating"); closeServiceProvider(serviceProviderPeer); } } /** * * @param broadcastType * @param activityRecordIdParamName - if null, default is used. * @param jobReportRootNodeName */ protected Broadcast(String broadcastType, String activityRecordIdParamName, String jobReportRootNodeName) { this.broadcastType = broadcastType; this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName; this.jobReportRootNodeName = jobReportRootNodeName; postback = null; successCount = new AtomicInteger(0); sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT; readyQueue = new LinkedBlockingQueue(); serviceThreadPool = new ArrayList(); recipientList = new ArrayList(); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); resumeFlag = new Object(); receiveTime = System.currentTimeMillis(); serviceActivityCount = Integer.valueOf(0); transactions = Integer.valueOf(0); lastPauseCount = 0; csvLogger = Logger.getLogger(CSV_LOGGER); } private void incrementTransactions(int delta) { synchronized (transactions) { transactions += delta; lastPauseCount += delta; } if (pauseThreshold > 0 && lastPauseCount >= pauseThreshold) { pause(null, null); } } /** * Experimental formulation where it takes over directing * the activity of a Broadcast, as it should, instead of relegating * it to CommEngine. This is directly invoked by CommEngine.doPost method, * and is much easier to read and comprehend. *

It is responsible for *

    *
  • Replying with HTTP response, and closing HTTP request and response *
  • Does broadcast and posts real-time progress, using post back queues * from CommEngine. *
* Strategy of execution: *
    *
  • Decode xml request *
  • Set up Service threads *
  • Invite derived class to set up contexts, one for each of the Service threads. *
  • Do broadcast. *
* This method is to be invoked by the CommEngine.doPost method, which fields a HTTP POST. *

* Adopting the use of this method by CommEngine * is 100% compatible with derived classes VoiceBroadcast and EmailBroadcast * existing today, 6/18/2016. * * * @param request * @param response * @param commEngine */ protected void doPost(HttpServletRequest request, HttpServletResponse response, CommEngine commEngine) { myLogger.debug("Entering Broadcast.doPost method"); BroadcastException myException = null; this.commEngine = commEngine; pauseThreshold = commEngine.getPauseThreshold(); try { // Check validity of operating hours parameters boolean notInService = commEngine.notInService(); decode(request, notInService); // Now that have decoded the id of this broadcast, // ask CommEngine to install it with its id. commEngine.installBroadcast(this); setOperatingHours(DAILY_START_KEY, daily_start); setOperatingHours(DAILY_STOP_KEY, daily_stop); if (notInService) { throw new PlatformException(PlatformError.RUNTIME_ERROR, "Not in service"); } if (recipientList.size() == 0) { CommonLogger.activity.info("Broadcast " + getBroadcastId() + ": No recipients"); setState(BroadcastState.ALLDONE, "No recipients", null); return; } initSync(commEngine.getResources()); for (Recipient recipient : recipientList) { readyQueue.add(mkJob(recipient)); } if (state == BroadcastState.ALLDONE) return; } catch (BroadcastException e) { myException = e; setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborting: " + e.getMessage()); myLogger.error("Broadcast aborted", e); return; } catch (Throwable t) { // Caught stray unexpected runtime problem CommonLogger.alarm.error("Broadcast aborted: " + t); myLogger.error("Broadcast aborted", t); myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage()); setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText); } finally { // Return regular or error response String responseXML = getResponseXML(myException); PrintWriter writer; try { writer = response.getWriter(); writer.write(responseXML); writer.close(); } catch (IOException e) { myLogger.error("Failed to write reponse to requester. Aborts broadcast." ); if (state != BroadcastState.ABORTED) { setState(BroadcastState.ABORTED, "Failed to reply to requester", e.getMessage()); } return; } if (myException != null) return; } // So far so good, we now go ahead with completing // initialization. try { jobsTotal = recipientList.size(); postback = new Postback(this, getPostbackMaxQueueSize(), getPostbackSenderPoolSize(), getPostbackMaxBatchSize()); initAsync(); // Create service thread pool to dispatch jobs, // at the same time, setting up a list of service thread names // for use by derived class to set up contexts in which // these threads run. int serviceThreadPoolSize = getServiceThreadPoolSize(); myLogger.debug("At creating service threads, serviceThreadPoolSize = " + serviceThreadPoolSize); List serviceThreadNames = new ArrayList(); for (int i = 0; i < serviceThreadPoolSize; i++) { String threadName = broadcastId + "-service-thread." + i; Service serviceThread = new Service(threadName); serviceThreadPool.add(serviceThread); serviceThreadNames.add(threadName); } doBroadcast(); } catch (BroadcastException e) { setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); } catch (Exception e) { setState(BroadcastState.ABORTED, BroadcastError.UNEXPECTED_EXCEPTION.toString(), e.getMessage()); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); } finally { destroyResources(); if (postback != null) { postback.wrapup(); postback = null; } CommonLogger.activity.info("Broadcast " + getId() + " terminated"); } } protected int getPostbackMaxQueueSize() { return commEngine.getPostbackMaxQueueSize(); } protected int getPostbackMaxBatchSize() { return commEngine.getPostbackMaxBatchSize(); } protected int getServiceThreadPoolSize() { return commEngine.getServiceThreadPoolSize(); } protected int getPostbackSenderPoolSize() { return commEngine.getPostbackSenderPoolSize(); } protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); /** * Creates and initializes a service provider, to be used by only one service thread. * If service provider is not thread-specific, then this method may return null, and * a common service provider is created outside of this method. * * @return service provider as a class Object instance. * @throws BroadcastException */ protected abstract Object getInitializedServiceProviderPeer() throws BroadcastException; /** * Obtains the required components to support a service; e.g. RTP port, or a place * in maximum total number of calls. Does not return till the reequired prerequisites are obtained. * @return null, if no prerequisite is required, as in the case of email and sms engines. */ abstract protected ServicePrerequisites secureServicePrerequisites(); abstract public void closeServiceProvider(Object serviceProvider); /** * Makes a state transition to the given newState if the transition from * the current state is legal. Also posts back a state change notification. * @param newState * @return StateChangeResult */ public StateChangeResult setState(BroadcastState newState) { return setState(newState, null, null); } /** * Makes a state transition to the given newState if the transition from * the current state is legal. Also posts back a state change notification. * @param newState * @return StateChangeResult */ public synchronized StateChangeResult setState(BroadcastState newState, String reason, String stateErrorText) { boolean isLegal; BroadcastState prev = null; if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null); synchronized(stateSemaphore) { List to = toStates.get(state); isLegal = (to == null? false : to.contains(newState)); prev = state; if (isLegal) { this.reason = reason; this.stateErrorText = stateErrorText; CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState)); state = newState; changeStateTime = System.currentTimeMillis(); if (state == BroadcastState.RUNNING) serviceStartTime = changeStateTime; if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime; if (postback != null) { if (state == BroadcastState.ALLDONE) postback.queueReport(mkStatusReport()); else postback.queueReportFirst(mkStatusReport()); } return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); } else { // log illegal state transition with call trace Exception e = new Exception(String.format("Broadast %s ignored illegal transition from %s to %s", broadcastId, prev, newState)); myLogger.error(e.getMessage()); myLogger.debug("This exception is not thrown -- only for debugging information", e); return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null); } } } protected void setBroadcastId(String broadcastId) { if (broadcastId == null) throw new IllegalArgumentException( "Argument broadcastId in Broadcast.setBroadcastId method cannot be null"); if (this.broadcastId != null) throw new IllegalStateException( "Broadcast.setBroadcastId method cannot be invoked more than once for a Broadcast"); this.broadcastId = broadcastId; } protected void setLaunchRecordId(String launchRecordId) { this.launchRecordId = launchRecordId; } public String getBroadcastId() { return broadcastId; } public String getLaunchRecordId() { return launchRecordId; } /** * * @param e * @return HTTP response for normal case (when exception e is null), or with exception */ public String getResponseXML(BroadcastException e) { String tagName = broadcastType + "_response"; StringBuffer responseXML = new StringBuffer("<" + tagName); if (broadcastId != null && broadcastId.length() > 0) { responseXML.append(" broadcast_id=\""); responseXML.append(broadcastId); responseXML.append("\""); } responseXML.append(" accepted='"); responseXML.append(e != null || state == BroadcastState.COMPLETED ? "FALSE" : "TRUE"); responseXML.append("'"); if (e == null) { if (reason != null && reason.length() > 0) { responseXML.append(" error='" + reason + "'"); } responseXML.append('>'); } else { if (e.errorCode != null) { responseXML.append(" error='"); responseXML.append(e.errorCode.toString()); responseXML.append("'"); } responseXML.append('>'); if (e.errorText != null) { responseXML.append(""); responseXML.append(e.errorText); responseXML.append(""); } } responseXML.append("'); return responseXML.toString(); } public String getPostbackURL() { return postbackURL; } protected String mkResponseXML(String errorCode, String errorText) { String tagName = broadcastType + "_response"; StringBuffer responseXML = new StringBuffer("<" + tagName); String broadcastId = getBroadcastId(); if (broadcastId != null && broadcastId.length() > 0) { responseXML.append(" broadcast_id=\""); responseXML.append(broadcastId); responseXML.append("\""); } responseXML.append(" accepted='"); responseXML.append(errorCode == null ? "TRUE" : "FALSE"); responseXML.append("'"); if (errorCode == null) { responseXML.append('>'); } else { responseXML.append(" error='"); responseXML.append(errorCode); responseXML.append("'"); responseXML.append('>'); if (errorText != null) { responseXML.append(""); responseXML.append(errorText.replaceAll("\\&", "&") .replaceAll("<", "<")); responseXML.append(""); } } responseXML.append("'); return responseXML.toString(); } /** * If finalState is final, then this state is set, and dispatcher threads are stopped. * Overriding implementation may release all other resources, like timers. * @param finalState */ public void terminate(BroadcastState finalState, String reason) { if (!finalState.isFinal) throw new IllegalArgumentException("Argument finalState " + finalState + " in Broadcast.terminate method is not final"); StateChangeResult result = setState(finalState, reason, null); switch (result.stateChangeStatus) { case SUCCESS: break; case NO_CHANGE: return; case FORBIDDEN: myLogger.error("Not allow to terminate broadcast in " + result.currentState + " state"); return; default: // Should not happen return; } // Wake up all dispatcher threads waiting on readyQueue so they will all stop synchronized(readyQueue) { readyQueue.notifyAll(); } // Wake up all sleeping dispatcher threads for same reason. for(Thread t : serviceThreadPool) { try { t.interrupt(); } catch (Exception e) { myLogger.warn("Interrupted while waiting for Thread " + t.getName() + " to terminate"); } } // Quiesce scheduler, and terminate it. scheduler.shutdownNow(); } /** * Defaults to current state * @return XML string */ protected String mkStatusReport() { StringBuffer statusBf = new StringBuffer(); String topLevelTag = broadcastType; String broadcastId = getBroadcastId(); if (broadcastId == null) broadcastId = ""; statusBf.append("<" + topLevelTag + " broadcast_id='" + broadcastId + "' receive_time='" + receiveTime + "' recipient_count='" + recipientList.size() + "'"); if (launchRecordId != null) { statusBf.append(" launch_record_id='" + launchRecordId + "'>"); } statusBf.append("" + System.currentTimeMillis() + ""); if (serviceStartTime > 0) statusBf.append("" + serviceStartTime + ""); if (state.isFinal && serviceEndTime > 0) statusBf.append("" + serviceEndTime + ""); statusBf.append("" + state + "" + changeStateTime + ""); statusBf.append("" + (reason == null? "" : Util.xmlEscape(reason)) + ""); statusBf.append("" + (stateErrorText == null? "" : Util.xmlEscape(stateErrorText)) + ""); statusBf.append("" + transactions + ""); statusBf.append("" + successCount.intValue() + ""); statusBf.append("\n"); statusBf.append("" + daily_stop + ""); statusBf.append("" + daily_start + "\n"); statusBf.append(additionalStatusXML()); statusBf.append(""); String statusReport = statusBf.toString(); return statusReport; } /** * Derived class may add additional status in a broadcast status XML posted back to portal, * by returning an XML tag which will be included immediately in the top-level tag * of the broadcast status. */ protected String additionalStatusXML() { return ""; } /** * * @return instantaneous number of jobs being serviced by service provider */ protected int getActiveJobCount() { return serviceActivityCount; } /** * Parses broadcastId and return if notInService is true. * Otherwise, continue parsing postBackUrl, recipientList, * and implementation-specific data from request. * Avoid throwing an exception before parsing and setting broadcastId. * @param notInService * @throws EngineException */ protected abstract void decode(HttpServletRequest request, boolean notInService) throws EngineException; protected abstract void initSync(EngineResources resources) throws BroadcastException; protected Job mkJob(Recipient recipient) { return new Job(recipient); } /** * Overriding implementation performs time consuming initialization, after returning * POST http status indicating accepting broadcast for processing. * * @throws BroadcastException - to abort broadcast */ protected void initAsync() throws BroadcastException { // Do nothing in base class. } public String getId() { return broadcastId; } /** * Sets the stateMachine to CANCEL * @param reasons - may be null. */ protected void cancel(String reason, PrintWriter out) { BroadcastState targetState = getActiveJobCount() == 0? BroadcastState.CANCELED : BroadcastState.CANCELING; StateChangeResult result = setState(targetState, reason, null); String responseContent = null; switch (result.stateChangeStatus) { case SUCCESS: responseContent = "Broadcast is being canceled"; break; case NO_CHANGE: responseContent = "Already canceled"; break; case FORBIDDEN: responseContent = "Not canceled: Not allowed to cancel a broadcast in " + result.currentState + " state"; } out.write(responseContent); wakeUpServiceThreads(); } /** * * @param reason * @param out */ protected void pause(String reason, PrintWriter out) { if (state == BroadcastState.ACCEPTED || state.isFinal) return; // Sets state to PAUSING, which is monitored by Broadcast.Service threads. // EVentually, when all service activity ends, the state transitions to PAUSED StateChangeResult result = setState(BroadcastState.PAUSING, reason, null); switch (result.stateChangeStatus) { case FORBIDDEN: if (out != null) out.write("pause not allowed"); break; case SUCCESS: lastPauseCount = 0; if (out != null) out.write("Broadcast is being PAUSED"); break; case NO_CHANGE: if (out != null) out.write("Broadcast is already RUNNING"); } } protected void resume(String reason, PrintWriter out) { if (state == BroadcastState.ACCEPTED || state.isFinal) return; if (!withinOperatingHours()) { if (out != null) out.write("Cannot resume outside operating hours"); return; } synchronized (resumeFlag) { StateChangeResult result = setState(BroadcastState.RUNNING, reason, null); switch (result.stateChangeStatus) { case FORBIDDEN: if (out != null) out.write("resume not allowed"); break; case SUCCESS: if (out != null) out.write("Broadcast resumed"); resumeFlag.notifyAll(); break; default: break; } } } /** * Derived class may make its own Implementation of JobReport * @return */ protected JobReport mkJobReport() { return new JobReport(); } public void addJob(Job job) { synchronized(readyQueue) { readyQueue.add(job); readyQueue.notifyAll(); } } /** * For use by the new Broadcast.doPost method. * It changes state to RUNNING and waits for all Service threads * to terminate after starting them. * @throws BroadcastException */ public void doBroadcast() throws BroadcastException { changeStateTime = System.currentTimeMillis(); if (!serviceThreadsShouldStop()) { if (withinOperatingHours()) { setState(BroadcastState.RUNNING); } else { setState(BroadcastState.PAUSED, "clock", null); } // Start the dispatcher threads for (Service thread : serviceThreadPool) { thread.start(); } // Wait for them to finish for (Service thread : serviceThreadPool) { try { thread.join(); } catch (InterruptedException e) { myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e); } } waitForEndOfService(); } } private boolean withinOperatingHours() { if (daily_start == null || daily_start.trim().length() == 0) return true; int dailyStartMin = convert2Min(daily_start); if (daily_stop == null || daily_stop.trim().length() == 0) return true; int dailyStopMin = convert2Min(daily_stop); // Ensure daily stop > daily start if (dailyStopMin < dailyStartMin) dailyStopMin += 24 * 60; LocalTime now = LocalTime.now(); int nowMin = now.getHour() * 60 + now.getMinute(); if (nowMin < dailyStartMin) nowMin += 24 * 60; boolean within = nowMin >= dailyStartMin && nowMin < dailyStopMin; return within; } private int convert2Min(String hhmm) { String[] parts = hhmm.split(":"); int hh = Integer.parseInt(parts[0]); int mm = Integer.parseInt(parts[1]); return hh * 60 + mm; } /** * Derived class should wait for end of service before returning. * At this point all service threads have already ended. If the derived * class has other threads still taking part in providing service, wait for * them to terminate. */ protected void waitForEndOfService() {} /** * Derived class destroy resources needed for providing service */ protected void destroyResources() {} /** * Derived may release resources here. */ protected void close() { myLogger.debug("In close()");; postback.wrapup(); postback = null; } /** * Derived class may set up environment before starting Service threads. * @param serviceThreadNames */ protected void initServiceThreadContexts() { // Do nothing in base class } /** * Experimental - needed to go with the also experimental method exec. * @param serviceThreadNames */ protected void initServiceThreadContexts(List serviceThreadNames) { // Do nothing in base class } /** * Examines if service threads should stop running, or even start * * @return */ private boolean serviceThreadsShouldStop() { if (state == BroadcastState.CANCELING || state == BroadcastState.ABORTING || state.isFinal) { return true; } if (getRemainingJobCount() == 0) { wakeUpServiceThreads(); return true; } return false; } private void wakeUpServiceThreads() { synchronized (readyQueue) { readyQueue.notifyAll(); } synchronized (resumeFlag) { resumeFlag.notifyAll(); } } private boolean serviceThreadsShouldPause() { return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING; } /** * job status is reported back to this broadcast, via the logAndQueueForPostBack method. * This method should use the updateServiceActivityCount(+-1) method to allow Broadcast * to keep track of overall service progress. The serviceActvityCount is used to determine * if all service threads are idle or terminated. * @param batch * @param prerequisites * @return int - number of transactions employed to service these jobs. * @throw Exception to abort broadcast */ abstract protected int processJobs(List batch, Object serviceProvider, ServicePrerequisites prerequisites) throws EngineException; /** * Size of a batch of jobs to be processed together. For email, this may be more than 1, * and this method should be overridden. * @return size of a batch of jobs to be processed together. */ protected int getJobBatchSize() { return 1; } protected void updateServiceActivityCount(int increment) { synchronized (readyQueue) { serviceActivityCount += increment; myLogger.debug("serviceActivityCount = " + serviceActivityCount ); if (increment < 0 && serviceActivityCount <= 0) { if (state == BroadcastState.RUNNING || state == BroadcastState.PAUSING || state == BroadcastState.CANCELING ) { if (getRemainingJobCount() == 0) { setState(BroadcastState.COMPLETED); } } } } } /** * Sets jobStatus in job, and post job report. * If jobStatus is final, and no rescheduling, * then decrement number of remainingJobs,and increment completedJobCount. * @param job * @param jobStatus * @param errorText */ public void postJobStatus(Job job) { if (job.isBroadcastFatal()) setState(BroadcastState.ABORTING, job.getErrorText(), null); if (job.jobStatus == CommJobStatus.SUCCESS) successCount.incrementAndGet(); if (postback != null) { JobReport report = mkJobReport(); report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); report.init(job); postback.queueReport(report.toString()); if (job.statusIsFinal()) updateServiceActivityCount(-1); } } /** * Logs completedJobCount, readyQueue.size(), * active job count, and total. * Job statistics are collected by length of readyQueue, completedJobCount, */ private void logJobCount(String title) { if (postback == null) { myLogger.debug(title + ": postback = null"); myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled: %d, total jobs: %d, remaining: %d, postQueue: ", title, state, getCompletedJobCount(), getActiveJobCount(), readyQueue.size(), scheduledJobs, jobsTotal, getRemainingJobCount() )); return; } if (postback.postQueue == null) { myLogger.debug(title + ": postback.postQueue = null"); myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled: %d, total jobs: %d, remaining: %d, postQueue: ", title, state, getCompletedJobCount(), getActiveJobCount(), readyQueue.size(), scheduledJobs, jobsTotal, getRemainingJobCount() )); return; } myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled %d, total jobs: %d, remaining: %d, postQueue: %d", title, state, getCompletedJobCount(), getActiveJobCount(), readyQueue.size(), scheduledJobs, jobsTotal, getRemainingJobCount(), postback.postQueue.size() )); } /** * Number of jobs to be completed. * @return */ private int getRemainingJobCount() { synchronized(readyQueue) { return readyQueue.size() + scheduledJobs; } } public ScheduledFuture rescheduleJob(final Job job, long rescheduleTimeMS) { if (rescheduleTimeMS < 0) return null; // No more rescheduling on cancel, abort, or alldone if (state == BroadcastState.CANCELING || state == BroadcastState.CANCELED || state == BroadcastState.ABORTED || state == BroadcastState.ABORTING || state == BroadcastState.ALLDONE ) { return null; } job.errorText = ""; if (rescheduleTimeMS == 0) { addJob(job); return null; } synchronized(readyQueue) { scheduledJobs++; } Runnable r = new Runnable() { public void run() { synchronized(readyQueue) { scheduledJobs--; addJob(job); } } }; return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); } public BroadcastState getState() { return state; } public int getPendingJobCount() { switch (state) { case RUNNING: case PAUSING: case PAUSED: return getRemainingJobCount(); default: return 0; } } public int getCompletedJobCount() { synchronized(readyQueue) { return jobsTotal - getRemainingJobCount() - serviceActivityCount; } } public String getBroadcastType() { return broadcastType; } /** * To be invoked only when postQueue is empty. * Side effect: sets broadcast state from PAUSING to PAUSE * when there are no active jobs being processed. * Similarly from CANCELING to CANCELED, from ABORTING to ABORTED, from COMPLETED to ALLDONE * @return */ public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() { myLogger.debug(String.format( "getPostbackThreadActionOnEmpty(): broadcast state %s, serviceActivityCount %d", state, serviceActivityCount)); if (state.isFinal) return PostbackThreadActionOnEmpty.STOP; if (serviceActivityCount > 0) { return PostbackThreadActionOnEmpty.WAIT; } if (state == BroadcastState.PAUSING) { return setState(BroadcastState.PAUSED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS? PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.WAIT; } if (state == BroadcastState.CANCELING) { return setState(BroadcastState.CANCELED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS? PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP; } else if (state == BroadcastState.ABORTING) { return setState(BroadcastState.ABORTED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS? PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP; } else if (state == BroadcastState.COMPLETED && scheduledJobs == 0) { return setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS? PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP; } else { return PostbackThreadActionOnEmpty.WAIT; } } /** * @return null or configuration in XML */ public String getConfigXML() { StringBuffer configBuf = new StringBuffer(); configBuf.append(""); configBuf.append("<" + DAILY_STOP_KEY + ">" + daily_stop + ""); configBuf.append("<" + DAILY_START_KEY + ">" + daily_start + ""); configBuf.append(""); return configBuf.toString(); } public void configure(JSONObject configuration) throws Exception { boolean timeChanged = false; for (String key : new String[] {DAILY_STOP_KEY, DAILY_START_KEY}) { String value = (String)configuration.get(key); if (setOperatingHours(key, value)) { timeChanged = true; } } if (timeChanged) enforceOperationHours(); } /** * YML: At this time, we only enforce pause action when a broadcast is * outside its operating hours. The current design is not satisfactory and needs * a better solution. * * We are not automatically resuming a paused broadcast because the difference * between intention of an operator-initiated pause and * that of a clock pause needs clarification in their operation paradigm. * Question is when or if we allow a operator-initiated pause be resumed * when someone changes the operating hours of a broadcast in such a way * that the broadcast is at once within its operasting hours. it may be * be counter to the intention of the original operator. * * On the other hand, if that places the broadcast outside it operating hours, * it is safer to immediately pause it. * * To add clarity, we may need to separate the PAUSE state into OPERATOR_PAUSE and CLOCK_PAUSE, * and similarly PAUING state. */ void enforceOperationHours() { if (state == BroadcastState.ABORTED) return; if (withinOperatingHours()) { // resume("clock", null); } else { pause("clock", null); } } /** * Sets timeParam to value * @param timeParam * @param value * @return false if no change */ private boolean setOperatingHours(String timeParam, String value) { if (value == null || value.trim().length() == 0) return false; String timeOfDay = CommEngine.checkTimeOfDay(value); if (timeOfDay == null) throw new RuntimeException(String.format("Invalid value for %s: %s", timeParam, value)); switch (timeParam) { case DAILY_STOP_KEY: if (timeOfDay.equals(daily_stop)) return false; daily_stop = timeOfDay; return true; case DAILY_START_KEY: if (timeOfDay.equals(daily_start)) return false; daily_start = timeOfDay; return true; default: throw new RuntimeException("Unknown parameter name: " + timeParam); } } @SuppressWarnings("unchecked") public JSONObject getConfigJSON() { JSONObject dataMap = new JSONObject(); dataMap.put(DAILY_START_KEY, daily_start); dataMap.put(DAILY_STOP_KEY, daily_stop); childAddConfigJSON(dataMap); return dataMap; } public void logCSV(String[] data) { StringWriter writer = new StringWriter(); @SuppressWarnings("resource") CSVWriter csvWriter = new CSVWriter(writer); csvWriter.writeNext(data); csvLogger.info(writer); } protected void childAddConfigJSON(JSONObject dataMap) { } }