package altk.comm.engine; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletRequest; import org.apache.log4j.Logger; import org.apache.log4j.NDC; import altk.comm.engine.exception.BroadcastException; import altk.comm.engine.exception.EngineException; import altk.comm.engine.postback.PostBack; /** * Broadcast class absorbs what was formerly known as Dispatcher class. * * @author Yuk-Ming * */ public abstract class Broadcast { private static final int SCHEDULER_THREAD_POOL_SIZE = 5; private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id"; public final String broadcastType; private String broadcastId; private BroadcastState state = BroadcastState.INSTALLING; String haltReason; String stateErrorText; public final long receiveTime; public long changeStateTime; private int completedJobCount = 0; protected String activityRecordIdParamName; private String jobReportRootNodeName; /** * Set when reading request XML, but never used. */ private String launchRecordId; // protected XPath xpathEngine; protected String postBackURL; private PostBack postBack; public long expireTime; static Logger myLogger = Logger.getLogger(Broadcast.class); /** * This queue is designed for only one server */ private Queue readyQueue; private List serviceThreadPool; private Object resumeFlag; // Semaphore for dispatcher threads to resume. protected List recipientList; private int remainingJobs; private ScheduledExecutorService scheduler; private int serviceThreadPoolSize; public static enum BroadcastState { INSTALLING, RUNNING, HALTING, HALTED, CANCELING, CANCELED, // Final state PURGED, // Final state ABORTED, // final state COMPLETED // Final state } public enum StateChangeStatus { SUCCESS, NO_CHANGE, FORBIDDEN } /** * When a Broadcast is first created, its state is INSTALLING, during which * time, the broadcast request XML is digested. After this period, the * state of the broadcast enters into RUNNING, and it begins with adding * all VoiceJobs in the request to the Dispatcher's queues. During this period * calls are dispatched, setup, and terminate. When the last call is terminated, * the broadcast enters into the COMPLETED state. * * Transitions from INSTALLING to RUNNING, and from RUNNING to COMPLETED happen * automatically without any external influence. * * At any time, the broadcast may be canceled by user action, which causes * the broadcast to transition from the RUNNING state into the CANCELED state. * * Since the RUNNING state may go to the COMLETED or to the CANCELED state, * each due to a different thread, there needs to be a mutex to guarantee * state and data integrity. * * A INSTALLING or RUNNING broadcast may be paused by user action, stopping the * Dispatcher from making new calls and causing the broadcast to go to * the HALTED state. Certain error conditions result in the HALTED state. * * User may order a broadcast to be removed entirely with a purge command, * causing the broadcast to go to the PURGED state. This state is there so * objects, that has reference to a broadcast which has been purged, know * what to do in that situation. * * We need the pause operation to set a RUNNING machine to be in the PAUSING state, * allow ongoing jobs to proceed to normal termination, * whereas no new jobs are started. State goes from PAUSING to HALTED. * * Cancel-nice operation is pause, followed by the automatic transition * from HALTED to CANCELED. * * Cancel operation forces all jobs to abort, and the state transitions to CANCELED * immediately. * * Because the Dispatcher and Broadcast is one-to-one, and because they access each other's * data, Dispatcher should be combined into Broadcast. * * @author Yuk-Ming * */ static Map> toStates; static { // Initialize legal transitions of state machine. // For each state, define a list of legal states that this state can transition to toStates = new HashMap>(); // Transitions from INSTALLING toStates.put(BroadcastState.INSTALLING, Arrays.asList( BroadcastState.RUNNING, // Normal transition BroadcastState.CANCELING, // User action BroadcastState.CANCELED, // User action BroadcastState.HALTING, // User action BroadcastState.HALTED, // User action BroadcastState.PURGED, // User action BroadcastState.ABORTED, // TTS error BroadcastState.COMPLETED // When recipient list is empty )); // Transitions from RUNNING toStates.put(BroadcastState.RUNNING, Arrays.asList( BroadcastState.CANCELING, // User action BroadcastState.CANCELED, // User action BroadcastState.HALTING, // User action BroadcastState.HALTED, // User action BroadcastState.PURGED, // User action BroadcastState.ABORTED, // Service provider irrecoverable error BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. )); // Transitions from CANCELING toStates.put(BroadcastState.CANCELING, Arrays.asList( BroadcastState.CANCELED, // User action BroadcastState.PURGED, // User action BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. )); // Transitions from HALTING toStates.put(BroadcastState.HALTING, Arrays.asList( BroadcastState.RUNNING, // User action BroadcastState.CANCELED, // User action BroadcastState.HALTED, BroadcastState.PURGED, // User action BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. )); // Transitions from HALTED toStates.put(BroadcastState.HALTED, Arrays.asList( BroadcastState.RUNNING, // User action BroadcastState.CANCELED, // User action BroadcastState.CANCELING, // User action BroadcastState.PURGED, // User action BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. )); } public static class StateChangeResult { public StateChangeStatus stateChangeStatus; public BroadcastState currentState; public BroadcastState previousState; public StateChangeResult(StateChangeStatus stateChangeStatus, BroadcastState currentState, BroadcastState previousState) { this.stateChangeStatus = stateChangeStatus; this.currentState = currentState; this.previousState = previousState; } } protected class Service extends Thread { Object serviceProvider; protected Service(String name) throws BroadcastException { serviceProvider = getInitializedServiceProvider(); setName(name); } public void run() { NDC.push(getName()); for (;;) { if (threadsShouldStop()) { closeServiceProvider(serviceProvider); return; } synchronized (resumeFlag) { if (threadsShouldPause()) { try { resumeFlag.wait(); } catch (InterruptedException e) { myLogger.warn("Dispatcher thread interrupted while waiting to resume"); return; } } } List batch = null; /** * Includes allocation from capacity. Only returns when the required allocation * is obtained. Example, RTP port allocation, limit due to total number of allowable calls. */ ServicePrerequisites prerequisites = null; synchronized(readyQueue) { // get a batch of jobs Job job = readyQueue.peek(); if (job == null) try { readyQueue.wait(); continue; } catch (InterruptedException e) { return; } prerequisites = secureServicePrerequisites(); if (threadsShouldStop() || threadsShouldPause()) { returnPrerequisites(prerequisites); continue; } // Now that we can go ahead with this job, let us remove this from queue readyQueue.poll(); batch = new ArrayList(); batch.add(job); // We we are to get a batch of more than one, let us fill in the rest. for (int i = 1; i < getJobBatchSize(); i++) { job = readyQueue.poll(); if (job == null) break; batch.add(job); } } if (batch != null && batch.size() > 0) { // Mark start time long now = System.currentTimeMillis(); for (Job job : batch) { job.startTime = now; } // Service the jobs try { processJobs(batch, serviceProvider, prerequisites); completedJobCount++; } catch (EngineException e) { terminate(BroadcastState.ABORTED, e.getMessage()); } } } } } /** * * @param broadcastType * @param activityRecordIdParamName - if null, default is used. * @param jobReportRootNodeName */ protected Broadcast(String broadcastType, String activityRecordIdParamName, String jobReportRootNodeName) { this.broadcastType = broadcastType; this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName; this.jobReportRootNodeName = jobReportRootNodeName; readyQueue = new LinkedBlockingQueue(); serviceThreadPool = new ArrayList(); recipientList = new ArrayList(); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); resumeFlag = new Object(); receiveTime = System.currentTimeMillis(); } protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); /** * Creates and initializes a service provider, to be used by only one service thread. * If service provider is not thread-specific, then this method may return null, and * a common service provider is created outside of this method. * * @return service provider as a class Object instance. * @throws BroadcastException */ protected abstract Object getInitializedServiceProvider() throws BroadcastException; /** * Obtains the required components to support a service; e.g. RTP port, or a place * in maximum total number of calls. Does not return till the reequired prerequisites are obtained. * @return null, if no prerequisite is required, as in the case of email and sms engines. */ abstract protected ServicePrerequisites secureServicePrerequisites(); abstract public void closeServiceProvider(Object serviceProvider); /** * Makes a state transition to the given newState if the transition from * the current state is legal. * @param newState * @return StateChangeResult */ public StateChangeResult setState(BroadcastState newState) { return setState(newState, null, null); } /** * Makes a state transition to the given newState if the transition from * the current state is legal. * @param newState * @return StateChangeResult */ public StateChangeResult setState(BroadcastState newState, String haltReason, String stateErrorText) { boolean isLegal; BroadcastState prev = null; synchronized (this) { if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null); List to = toStates.get(state); isLegal = (to == null? false : to.contains(newState)); prev = state; if (isLegal) { state = newState; changeStateTime = System.currentTimeMillis(); } } if (isLegal) { this.haltReason = haltReason; this.stateErrorText = stateErrorText; CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state)); if (postBack != null) { postBack.queueReport(mkStatusReport()); } return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); } else { myLogger.warn(String.format("Broadcast %s: Transition from %s to %s forbidden", broadcastId, prev, newState)); return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null); } } protected void setBroadcastId(String broadcastId) { if (broadcastId == null) throw new IllegalArgumentException( "Argument broadcastId in Broadcast.setBroadcastId method cannot be null"); if (this.broadcastId != null) throw new IllegalStateException( "Broadcast.setBroadcastId method cannot be invoked more than once for a Broadcast"); this.broadcastId = broadcastId; } protected void setLaunchRecordId(String launchRecordId) { if (launchRecordId == null) throw new IllegalArgumentException( "Argument launchRecordId in Broadcast.setLaunchRecordId method cannot be null"); if (this.launchRecordId != null) throw new IllegalStateException( "Broadcast.setLaunchRecordId method cannot be invoked more than once for a Broadcast"); this.launchRecordId = launchRecordId; } public String getBroadcastId() { return broadcastId; } public String getLaunchRecordId() { return launchRecordId; } public String getResponseXML(BroadcastException e) { String tagName = broadcastType + "_response"; StringBuffer responseXML = new StringBuffer("<" + tagName); if (broadcastId != null && broadcastId.length() > 0) { responseXML.append(" broadcast_id=\""); responseXML.append(broadcastId); responseXML.append("\""); } responseXML.append(" accepted='"); responseXML.append(e != null || getState() == BroadcastState.COMPLETED ? "FALSE" : "TRUE"); responseXML.append("'"); if (e == null) { responseXML.append('>'); } else { if (e.errorCode != null) { responseXML.append(" error='"); responseXML.append(e.errorCode.toString()); responseXML.append("'"); } responseXML.append('>'); if (e.errorText != null) { responseXML.append(""); responseXML.append(e.errorText); responseXML.append(""); } } responseXML.append("'); return responseXML.toString(); } public String getPostBackURL() { return postBackURL; } protected String mkResponseXML(String errorCode, String errorText) { String tagName = broadcastType + "_response"; StringBuffer responseXML = new StringBuffer("<" + tagName); String broadcastId = getBroadcastId(); if (broadcastId != null && broadcastId.length() > 0) { responseXML.append(" broadcast_id=\""); responseXML.append(broadcastId); responseXML.append("\""); } responseXML.append(" accepted='"); responseXML.append(errorCode == null ? "TRUE" : "FALSE"); responseXML.append("'"); if (errorCode == null) { responseXML.append('>'); } else { responseXML.append(" error='"); responseXML.append(errorCode); responseXML.append("'"); responseXML.append('>'); if (errorText != null) { responseXML.append(""); responseXML.append(errorText.replaceAll("\\&", "&") .replaceAll("<", "<")); responseXML.append(""); } } responseXML.append("'); return responseXML.toString(); } private boolean stateIsFinal(BroadcastState state) { return state == BroadcastState.ABORTED || state == BroadcastState.CANCELED || state == BroadcastState.COMPLETED || state == BroadcastState.PURGED; } /** * If finalState is final, then this state is set, and dispatcher threads are stopped. * Overriding implementation may release all other resources, like timers. * @param finalState */ public void terminate(BroadcastState finalState) { terminate(finalState, null); } /** * If finalState is final, then this state is set, and dispatcher threads are stopped. * Overriding implementation may release all other resources, like timers. * @param finalState */ public void terminate(BroadcastState finalState, String reason) { if (!stateIsFinal(finalState)) throw new IllegalArgumentException("Argument finalState " + finalState + " in Broadcast.terminate method is not final"); setState(finalState, reason, null); // Wake up all dispatcher threads waiting on readyQueue so they will all stop synchronized(readyQueue) { readyQueue.notifyAll(); } // Wake up all sleeping dispatcher threads for same reason. for(Thread t : serviceThreadPool) { try { t.interrupt(); } catch (Exception e) { myLogger.warn("Interrupted while waiting for Thread " + t.getName() + " to terminate"); } } // Quiesce scheduler, and terminate it. scheduler.shutdownNow(); } /** * Creates status report. * @return status report in XML. */ protected String mkStatusReport() { StringBuffer statusBf = new StringBuffer(); String topLevelTag = broadcastType; String broadcastId = getBroadcastId(); if (broadcastId == null) broadcastId = ""; statusBf.append("<" + topLevelTag + " broadcast_id='" + broadcastId + "' receive_time='" + receiveTime + "' recipient_count='" + recipientList.size() + "'"); if (launchRecordId != null) { statusBf.append(" launch_record_id='" + launchRecordId + "'"); } statusBf.append(">\r\n" + state + "" + changeStateTime + "\r\n"); if (state == BroadcastState.HALTED || state == BroadcastState.ABORTED) { if (haltReason != null) { statusBf.append("" + haltReason + "\r\n"); } if (stateErrorText != null) { statusBf.append("" + stateErrorText + ""); } } statusBf.append("\r\n"); String statusReport = statusBf.toString(); return statusReport; } protected void onExpire() { } protected void setExpireTime(long expireTime) { this.expireTime = expireTime; } public long getExpireTime() { return expireTime; } /** * * @return number of active jobs. -1 if there is no concept of being active. */ protected int getActiveJobCount() { return remainingJobs - readyQueue.size(); } /** * Parses broadcastId and return if notInService is true. * Otherwise, continue parsing postBackUrl, expireTime, recipientList, * and implementation-specific data from request. * Avoid throwing an exception before parsing and setting broadcastId. * @param notInService * @throws EngineException */ protected abstract void decode(HttpServletRequest request, boolean notInService) throws EngineException; /** * Remembers postBack, and * Creates thread pool of size dictated by broadcast, which determines the size based * on the chosen service provider. * * Overriding implementation must invoke this method at the end, and process information * contained in the broadcast, in preparation for the invocation of the process * method. * * If there is no error, the overriding implementation must return this base method. * * @param commEngine * * @throws BroadcastException */ protected final void init(PostBack postBack) { // Remember postBack this.postBack = postBack; for (Recipient recipient : recipientList) { readyQueue.add(mkJob(recipient)); } remainingJobs = readyQueue.size(); } protected abstract void initSync(EngineResources resources) throws BroadcastException; protected Job mkJob(Recipient recipient) { return new Job(recipient); } /** * Overriding implementation performs time consuming initialization, after returning * POST http status indicating accepting broadcast for processing. * * @throws BroadcastException */ protected void initAsync() throws BroadcastException { // Do nothing in base class. } public String getId() { return broadcastId; } /** * Sets the stateMachine to CANCEL */ protected void cancel() { if (this.getActiveJobCount() == 0) setState(BroadcastState.CANCELED); // Sets state to CANCELING, which is monitored by its Broadcast.Service threads. else setState(BroadcastState.CANCELING); synchronized(resumeFlag) { resumeFlag.notifyAll(); } } protected void pause() { // Sets state to HALTED, which is monitored by Broadcast.Service threads. setState(BroadcastState.HALTING); } protected void resume() { synchronized (resumeFlag) { if (threadsShouldPause()) { setState(BroadcastState.RUNNING); resumeFlag.notifyAll(); } } } /** * Derived class may make its own Implementation of JobReport * @return */ protected JobReport mkJobReport() { return new JobReport(); } public void addJob(Job job) { synchronized(readyQueue) { readyQueue.add(job); readyQueue.notifyAll(); } } public void startProcessing() throws BroadcastException { // Create service thread pool to dispatch jobs for (int i = 0; i < serviceThreadPoolSize; i++) { String threadName = broadcastId + "_service_thread_" + i; Service serviceThread = new Service(threadName); serviceThreadPool.add(serviceThread); } setState(BroadcastState.RUNNING); // Start the dispatcher threads for (Thread thread : serviceThreadPool) { thread.start(); } } private boolean threadsShouldStop() { BroadcastState state = getState(); return state == BroadcastState.CANCELING || stateIsFinal(state); } private boolean threadsShouldPause() { BroadcastState state = getState(); return state == BroadcastState.HALTED || state == BroadcastState.HALTING; } /** * job status is reported back to this broadcast, via the logAndQueueForPostBack method. * @param batch * @param prerequisites */ abstract protected void processJobs(List batch, Object serviceProvider, ServicePrerequisites prerequisites) throws EngineException; /** * Size of a batch of jobs to be processed together. For email, this may be more than 1, * and this method should be overridden. * @return size of a batch of jobs to be processed together. */ protected int getJobBatchSize() { return 1; } /** * Sets jobStatus in job, and post job report. * If no rescheduling, then decrement number of remainingJobs, * @param job * @param jobStatus * @param errorText */ public void postJobStatus(Job job) { if (postBack != null) { JobReport report = mkJobReport(); report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); report.init(job); postBack.queueReport(report.toString()); } if (job.jobStatus.isTerminal()) { remainingJobs--; if (remainingJobs == 0) { terminate(BroadcastState.COMPLETED); } else if (getActiveJobCount() == 0) { if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); else if (state == BroadcastState.HALTING) setState(BroadcastState.HALTED); } } } /** * Sets jobStatus in job, and post job report. * If no rescheduling, then decrement number of remainingJobs, * @param job * @param jobStatus * @param errorText * @param reschedule - reschedule time in milliseconds (-1 means do not reschedule). */ protected void postJobStatus(Job job, long rescheduleTimeMS) { postJobStatus(job); if (rescheduleTimeMS < 0) { completedJobCount++; } if (rescheduleTimeMS == 0) { addJob(job); } else if (rescheduleTimeMS > 0) { rescheduleJob(job, rescheduleTimeMS); } } public ScheduledFuture rescheduleJob(final Job job, long rescheduleTimeMS) { Runnable r = new Runnable() { public void run() { addJob(job);}}; return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); } public BroadcastState getState() { return state; } public int getReadyJobCount() { switch (state) { case RUNNING: case HALTING: case HALTED: return readyQueue.size(); default: return 0; } } public int getCompletedJobCount() { return completedJobCount; } protected void setServiceThreadPoolsize(int serviceThreadPoolSize) { this.serviceThreadPoolSize = serviceThreadPoolSize; } }