|
- package altk.comm.engine;
-
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Queue;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledFuture;
- import java.util.concurrent.TimeUnit;
-
- import javax.servlet.http.HttpServletRequest;
-
- import org.apache.log4j.Logger;
- import org.apache.log4j.NDC;
-
- import altk.comm.engine.exception.BroadcastException;
- import altk.comm.engine.exception.EngineException;
- import altk.comm.engine.postback.PostBack;
-
- /**
- * Broadcast class absorbs what was formerly known as Dispatcher class.
- *
- * @author Yuk-Ming
- *
- */
- public abstract class Broadcast
- {
- private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
- public final String broadcastType;
- private String broadcastId;
-
- /**
- * Set when reading request XML, but never used.
- */
- private String launchRecordId;
-
- // protected XPath xpathEngine;
- protected String postBackURL;
- private PostBack postBack;
- public long expireTime;
-
- static Logger myLogger = Logger.getLogger(Broadcast.class);
-
- /**
- * This queue is designed for only one server
- */
- private Queue<Job> readyQueue;
- private List<Thread> serviceThreadPool;
- private Object resumeFlag; // Semaphore for dispatcher threads to resume.
- protected List<Recipient> recipientList;
- private int remainingJobs;
-
- private ScheduledExecutorService scheduler;
-
- public static enum BroadcastState
- {
- INSTALLING,
- RUNNING,
- HALTING,
- HALTED,
- CANCELING,
- CANCELED, // Final state
- PURGED, // Final state
- ABORTED, // final state
- COMPLETED // Final state
- }
-
- public enum StateChangeStatus
- {
- SUCCESS,
- NO_CHANGE,
- FORBIDDEN
- }
-
- /**
- * When a Broadcast is first created, its state is INSTALLING, during which
- * time, the broadcast request XML is digested. After this period, the
- * state of the broadcast enters into RUNNING, and it begins with adding
- * all VoiceJobs in the request to the Dispatcher's queues. During this period
- * calls are dispatched, setup, and terminate. When the last call is terminated,
- * the broadcast enters into the COMPLETED state.
- *
- * Transitions from INSTALLING to RUNNING, and from RUNNING to COMPLETED happen
- * automatically without any external influence.
- *
- * At any time, the broadcast may be canceled by user action, which causes
- * the broadcast to transition from the RUNNING state into the CANCELED state.
- *
- * Since the RUNNING state may go to the COMLETED or to the CANCELED state,
- * each due to a different thread, there needs to be a mutex to guarantee
- * state and data integrity.
- *
- * A INSTALLING or RUNNING broadcast may be paused by user action, stopping the
- * Dispatcher from making new calls and causing the broadcast to go to
- * the HALTED state. Certain error conditions result in the HALTED state.
- *
- * User may order a broadcast to be removed entirely with a purge command,
- * causing the broadcast to go to the PURGED state. This state is there so
- * objects, that has reference to a broadcast which has been purged, know
- * what to do in that situation.
- *
- * We need the pause operation to set a RUNNING machine to be in the PAUSING state,
- * allow ongoing jobs to proceed to normal termination,
- * whereas no new jobs are started. State goes from PAUSING to HALTED.
- *
- * Cancel-nice operation is pause, followed by the automatic transition
- * from HALTED to CANCELED.
- *
- * Cancel operation forces all jobs to abort, and the state transitions to CANCELED
- * immediately.
- *
- * Because the Dispatcher and Broadcast is one-to-one, and because they access each other's
- * data, Dispatcher should be combined into Broadcast.
- *
- * @author Yuk-Ming
- *
- */
- static Map<BroadcastState, List<BroadcastState>> toStates;
-
- static
- {
- // Initialize legal transitions of state machine.
- // For each state, define a list of legal states that this state can transition to
- toStates = new HashMap<BroadcastState, List<BroadcastState>>();
-
- // Transitions from INSTALLING
- toStates.put(BroadcastState.INSTALLING, Arrays.asList(
- BroadcastState.RUNNING, // Normal transition
- BroadcastState.CANCELING, // User action
- BroadcastState.CANCELED, // User action
- BroadcastState.HALTING, // User action
- BroadcastState.HALTED, // User action
- BroadcastState.PURGED, // User action
- BroadcastState.ABORTED, // TTS error
- BroadcastState.COMPLETED // When recipient list is empty
- ));
-
- // Transitions from RUNNING
- toStates.put(BroadcastState.RUNNING, Arrays.asList(
- BroadcastState.CANCELING, // User action
- BroadcastState.CANCELED, // User action
- BroadcastState.HALTING, // User action
- BroadcastState.HALTED, // User action
- BroadcastState.PURGED, // User action
- BroadcastState.ABORTED, // Service provider irrecoverable error
- BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues.
- ));
-
- // Transitions from CANCELING
- toStates.put(BroadcastState.CANCELING, Arrays.asList(
- BroadcastState.CANCELED, // User action
- BroadcastState.PURGED, // User action
- BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues.
- ));
-
- // Transitions from HALTING
- toStates.put(BroadcastState.HALTING, Arrays.asList(
- BroadcastState.RUNNING, // User action
- BroadcastState.CANCELED, // User action
- BroadcastState.HALTED,
- BroadcastState.PURGED, // User action
- BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues.
- ));
-
- // Transitions from HALTED
- toStates.put(BroadcastState.HALTED, Arrays.asList(
- BroadcastState.RUNNING, // User action
- BroadcastState.CANCELED, // User action
- BroadcastState.CANCELING, // User action
- BroadcastState.PURGED, // User action
- BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues.
- ));
-
- }
-
- public static class StateChangeResult
- {
- public StateChangeStatus stateChangeStatus;
- public BroadcastState currentState;
- public BroadcastState previousState;
-
- public StateChangeResult(StateChangeStatus stateChangeStatus,
- BroadcastState currentState, BroadcastState previousState)
- {
- this.stateChangeStatus = stateChangeStatus;
- this.currentState = currentState;
- this.previousState = previousState;
- }
- }
-
- private BroadcastState state = BroadcastState.INSTALLING;
-
- String haltReason;
- String stateErrorText;
- public long changeStateTime;
-
- protected class Service extends Thread
- {
- Object serviceProvider;
-
- protected Service(String name) throws BroadcastException
- {
- serviceProvider = getInitializedServiceProvider();
- setName(name);
- }
-
- public void run()
- {
- NDC.push(getName());
- for (;;)
- {
- if (threadsShouldStop())
- {
- closeServiceProvider(serviceProvider);
- return;
- }
-
- synchronized (resumeFlag)
- {
- if (threadsShouldPause())
- {
- try
- {
- resumeFlag.wait();
- }
- catch (InterruptedException e)
- {
- myLogger.warn("Dispatcher thread interrupted while waiting to resume");
- return;
- }
- }
- }
-
- List<Job> batch = null;
-
- /**
- * Includes allocation from capacity. Only returns when the required allocation
- * is obtained. Example, RTP port allocation, limit due to total number of allowable calls.
- */
- ServicePrerequisites prerequisites = null;
- synchronized(readyQueue)
- {
- // get a batch of jobs
- Job job = readyQueue.peek();
-
- if (job == null)
- try
- {
- readyQueue.wait();
- continue;
- }
- catch (InterruptedException e)
- {
- return;
- }
-
- prerequisites = secureServicePrerequisites();
-
- if (threadsShouldStop() || threadsShouldPause())
- {
- returnPrerequisites(prerequisites);
- continue;
- }
-
- // Now that we can go ahead with this job, let us remove this from queue
- readyQueue.poll();
-
- batch = new ArrayList<Job>();
- batch.add(job);
-
- // We we are to get a batch of more than one, let us fill in the rest.
- for (int i = 1; i < getJobBatchSize(); i++)
- {
- job = readyQueue.poll();
- if (job == null) break;
- batch.add(job);
- }
- }
- if (batch != null && batch.size() > 0)
- {
- // Mark start time
- long now = System.currentTimeMillis();
- for (Job job : batch)
- {
- job.startTime = now;
- }
-
- // Service the jobs
- try
- {
- processJobs(batch, serviceProvider, prerequisites);
- }
- catch (EngineException e)
- {
- terminate(BroadcastState.ABORTED, e.getMessage());
- }
- }
- }
- }
-
- }
-
- protected Broadcast(String broadcastType)
- {
- this.broadcastType = broadcastType;
- readyQueue = new LinkedBlockingQueue<Job>();
- serviceThreadPool = new ArrayList<Thread>();
- recipientList = new ArrayList<Recipient>();
- scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE);
- resumeFlag = new Object();
- }
-
- protected abstract void returnPrerequisites(ServicePrerequisites prerequisites);
-
- /**
- * Creates and initializes a service provider, to be used by only one service thread.
- * If service provider is not thread-specific, then this method may return null, and
- * a common service provider is created outside of this method.
- *
- * @return service provider as a class Object instance.
- * @throws BroadcastException
- */
- protected abstract Object getInitializedServiceProvider() throws BroadcastException;
-
- /**
- * Obtains the required components to support a service; e.g. RTP port, or a place
- * in maximum total number of calls. Does not return till the reequired prerequisites are obtained.
- * @return null, if no prerequisite is required, as in the case of email and sms engines.
- */
- abstract protected ServicePrerequisites secureServicePrerequisites();
-
- abstract public void closeServiceProvider(Object serviceProvider);
-
- /**
- * Makes a state transition to the given newState if the transition from
- * the current state is legal.
- * @param newState
- * @return StateChangeResult
- */
- public StateChangeResult setState(BroadcastState newState)
- {
- return setState(newState, null, null);
- }
-
- /**
- * Makes a state transition to the given newState if the transition from
- * the current state is legal.
- * @param newState
- * @return StateChangeResult
- */
- public StateChangeResult setState(BroadcastState newState,
- String haltReason, String stateErrorText)
- {
- boolean isLegal;
- BroadcastState prev = null;
- synchronized (this)
- {
- if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null);
- List<BroadcastState> to = toStates.get(state);
- isLegal = (to == null? false : to.contains(newState));
- prev = state;
- if (isLegal)
- {
- state = newState;
- changeStateTime = System.currentTimeMillis();
- }
- }
- if (isLegal)
- {
- this.haltReason = haltReason;
- this.stateErrorText = stateErrorText;
-
- CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state));
- if (postBack != null)
- {
- postBack.queueReport(mkStatusReport());
- }
-
- return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev);
- }
- else
- {
- myLogger.warn(String.format("Broadcast %s: Transition from %s to %s forbidden", broadcastId, prev, newState));
- return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null);
- }
- }
-
- protected void setBroadcastId(String broadcastId)
- {
- if (broadcastId == null)
- throw new IllegalArgumentException(
- "Argument broadcastId in Broadcast.setBroadcastId method cannot be null");
- if (this.broadcastId != null)
- throw new IllegalStateException(
- "Broadcast.setBroadcastId method cannot be invoked more than once for a Broadcast");
- this.broadcastId = broadcastId;
- }
-
- protected void setLaunchRecordId(String launchRecordId)
- {
- if (launchRecordId == null)
- throw new IllegalArgumentException(
- "Argument launchRecordId in Broadcast.setLaunchRecordId method cannot be null");
- if (this.launchRecordId != null)
- throw new IllegalStateException(
- "Broadcast.setLaunchRecordId method cannot be invoked more than once for a Broadcast");
- this.launchRecordId = launchRecordId;
- }
-
- public String getBroadcastId()
- {
- return broadcastId;
- }
-
- public String getLaunchRecordId()
- {
- return launchRecordId;
- }
-
- public String getResponseXML(BroadcastException e)
- {
- String tagName = broadcastType + "_response";
- StringBuffer responseXML = new StringBuffer("<" + tagName);
- if (broadcastId != null && broadcastId.length() > 0)
- {
- responseXML.append(" broadcast_id=\"");
- responseXML.append(broadcastId);
- responseXML.append("\"");
- }
- responseXML.append(" accepted='");
- responseXML.append(e != null || getState() == BroadcastState.COMPLETED ? "FALSE" : "TRUE");
- responseXML.append("'");
- if (e == null)
- {
- responseXML.append('>');
- }
- else
- {
- if (e.errorCode != null)
- {
- responseXML.append(" error='");
- responseXML.append(e.errorCode.toString());
- responseXML.append("'");
- }
- responseXML.append('>');
- if (e.errorText != null)
- {
- responseXML.append("<error_text>");
- responseXML.append(e.errorText);
- responseXML.append("</error_text>");
- }
- }
- responseXML.append("</" + tagName + '>');
- return responseXML.toString();
- }
-
- public String getPostBackURL()
- {
- return postBackURL;
- }
-
- protected String mkResponseXML(String errorCode, String errorText)
- {
- String tagName = broadcastType + "_response";
- StringBuffer responseXML = new StringBuffer("<" + tagName);
- String broadcastId = getBroadcastId();
- if (broadcastId != null && broadcastId.length() > 0)
- {
- responseXML.append(" broadcast_id=\"");
- responseXML.append(broadcastId);
- responseXML.append("\"");
- }
- responseXML.append(" accepted='");
- responseXML.append(errorCode == null ? "TRUE" : "FALSE");
- responseXML.append("'");
- if (errorCode == null)
- {
- responseXML.append('>');
- }
- else
- {
- responseXML.append(" error='");
- responseXML.append(errorCode);
- responseXML.append("'");
- responseXML.append('>');
- if (errorText != null)
- {
- responseXML.append("<error_text>");
- responseXML.append(errorText.replaceAll("\\&", "&")
- .replaceAll("<", "<"));
- responseXML.append("</error_text>");
- }
- }
- responseXML.append("</" + tagName + '>');
- return responseXML.toString();
- }
-
- private boolean stateIsFinal(BroadcastState state)
- {
- return state == BroadcastState.ABORTED || state == BroadcastState.CANCELED
- || state == BroadcastState.COMPLETED || state == BroadcastState.PURGED;
- }
-
- /**
- * If finalState is final, then this state is set, and dispatcher threads are stopped.
- * Overriding implementation may release all other resources, like timers.
- * @param finalState
- */
- public void terminate(BroadcastState finalState)
- {
- terminate(finalState, null);
- }
-
- /**
- * If finalState is final, then this state is set, and dispatcher threads are stopped.
- * Overriding implementation may release all other resources, like timers.
- * @param finalState
- */
- public void terminate(BroadcastState finalState, String reason)
- {
- if (!stateIsFinal(finalState)) throw new IllegalArgumentException("Argument finalState " + finalState + " in Broadcast.terminate method is not final");
- setState(finalState, reason, null);
-
- // Wake up all dispatcher threads waiting on readyQueue so they will all stop
- synchronized(readyQueue)
- {
- readyQueue.notifyAll();
- }
-
- // Wake up all sleeping dispatcher threads for same reason.
- for(Thread t : serviceThreadPool)
- {
- try
- {
- t.interrupt();
- }
- catch (Exception e)
- {
- myLogger.warn("Interrupted while waiting for Thread " + t.getName() + " to terminate");
- }
- }
-
- // Quiesce scheduler, and terminate it.
- scheduler.shutdownNow();
- }
-
- /**
- * Creates status report.
- * @return status report in XML.
- */
- protected String mkStatusReport()
- {
- StringBuffer statusBf = new StringBuffer();
- String topLevelTag = broadcastType;
- statusBf.append("<" + topLevelTag + " broadcast_id='" + getBroadcastId()
- + "' recipient_count='" + recipientList.size() + "'");
- if (launchRecordId != null)
- {
- statusBf.append(" launch_record_id='" + launchRecordId + "'");
- }
- BroadcastState broadcastState = getState();
- statusBf.append(">\r\n<state>" + broadcastState + "</state>\r\n");
- if (broadcastState == BroadcastState.HALTED
- || broadcastState == BroadcastState.ABORTED)
- {
- if (haltReason != null)
- {
- statusBf.append("<reason>" + haltReason
- + "</reason>\r\n");
- }
- if (stateErrorText != null)
- {
- statusBf.append("<error_text>" + stateErrorText
- + "</error_text>");
- }
- }
- statusBf.append("<remaining_jobs total='" + remainingJobs + "'");
- int activeCount = getActiveCount();
- if (activeCount > -1) statusBf.append(" active='" + activeCount + "'");
- statusBf.append("></remaining_jobs></" + topLevelTag + ">\r\n");
- String statusReport = statusBf.toString();
- return statusReport;
- }
-
- protected void onExpire()
- {
- }
-
- protected void setExpireTime(long expireTime)
- {
- this.expireTime = expireTime;
- }
-
- public long getExpireTime()
- {
- return expireTime;
- }
-
- /**
- *
- * @return number of active jobs. -1 if there is no concept of being active.
- */
- private int getActiveCount()
- {
- return remainingJobs - readyQueue.size();
- }
-
- /**
- * Parses broadcastId and return if notInService is true.
- * Otherwise, continue parsing postBackUrl, expireTime, recipientList,
- * and implementation-specific data from request.
- * Avoid throwing an exception before parsing and setting broadcastId.
- * @param notInService
- * @throws EngineException
- */
- protected abstract void decode(HttpServletRequest request, boolean notInService)
- throws EngineException;
-
- /**
- * Remembers postBack, and
- * Creates thread pool of size dictated by broadcast, which determines the size based
- * on the chosen service provider.
- *
- * Overriding implementation must invoke this method at the end, and process information
- * contained in the broadcast, in preparation for the invocation of the process
- * method.
- *
- * If there is no error, the overriding implementation must return this base method.
- *
- * @param commEngine
- *
- * @throws BroadcastException
- */
- protected final void init(PostBack postBack)
- {
- // Remember postBack
- this.postBack = postBack;
-
- for (Recipient recipient : recipientList)
- {
- readyQueue.add(mkJob(recipient));
- }
- remainingJobs = readyQueue.size();
- }
-
- protected abstract void initSync(EngineResources resources) throws BroadcastException;
-
- protected Job mkJob(Recipient recipient)
- {
- return new Job(recipient);
- }
-
- /**
- * Overriding implementation performs time consuming initialization, after returning
- * POST http status indicating accepting broadcast for processing.
- *
- * @throws BroadcastException
- */
- protected void initAsync() throws BroadcastException
- {
- // Do nothing in base class.
- }
-
- public abstract int getServiceThreadPoolSize();
-
- public String getId()
- {
- return broadcastId;
- }
-
- /**
- * Sets the stateMachine to CANCEL
- */
- protected void cancel()
- {
- if (this.getActiveCount() == 0) setState(BroadcastState.CANCELED);
- // Sets state to CANCELING, which is monitored by its Broadcast.Service threads.
- else setState(BroadcastState.CANCELING);
- synchronized(resumeFlag)
- {
- resumeFlag.notifyAll();
- }
- }
-
- protected void pause()
- {
- // Sets state to HALTED, which is monitored by Broadcast.Service threads.
- setState(BroadcastState.HALTING);
- }
-
- protected void resume()
- {
- synchronized (resumeFlag)
- {
- if (threadsShouldPause())
- {
- setState(BroadcastState.RUNNING);
- resumeFlag.notifyAll();
- }
- }
- }
-
- abstract protected JobReport mkJobReport(Job job);
-
- public void addJob(Job job)
- {
- synchronized(readyQueue)
- {
- readyQueue.add(job);
- readyQueue.notifyAll();
- }
- }
-
- public void startProcessing() throws BroadcastException
- {
- // Create dispatcher thread pool
- int threadPoolSize = getServiceThreadPoolSize();
- for (int i = 0; i < threadPoolSize; i++)
- {
- String threadName = broadcastId + "_service_thread_" + i;
- Service serviceThread = new Service(threadName);
- serviceThreadPool.add(serviceThread);
- }
-
- setState(BroadcastState.RUNNING);
-
- // Start the dispatcher threads
- for (Thread thread : serviceThreadPool)
- {
- thread.start();
- }
- }
-
- private boolean threadsShouldStop()
- {
- BroadcastState state = getState();
- return state == BroadcastState.CANCELING || stateIsFinal(state);
- }
-
- private boolean threadsShouldPause()
- {
- BroadcastState state = getState();
- return state == BroadcastState.HALTED || state == BroadcastState.HALTING;
- }
-
- /*
- @Override
- public void run()
- {
- for (;;)
- {
- if (threadsShouldStop()) return;
-
- if (threadsShouldPause())
- {
- try
- {
- resumeFlag.wait();
- }
- catch (InterruptedException e)
- {
- myLogger.warn("Dispatcher thread interrupted while waiting to resume");
- return;
- }
- }
-
- List<Job> batch = null;
- synchronized(readyQueue)
- {
- // get a batch of jobs
- Job job = readyQueue.poll();
-
- if (job == null)
- try
- {
- readyQueue.wait();
- continue;
- }
- catch (InterruptedException e)
- {
- return;
- }
- batch = new ArrayList<Job>();
- batch.add(job);
- for (int i = 1; i < getJobBatchSize(); i++)
- {
- job = readyQueue.poll();
- if (job == null) break;
- batch.add(job);
- }
- }
- if (batch != null)
- {
- try
- {
- processJobs(batch);
- }
- catch (EngineException e)
- {
- terminate(BroadcastState.ABORTED, e.getMessage());
- }
- }
- }
-
- }
- */
-
- /**
- * job status is reported back to this broadcast, via the logAndQueueForPostBack method.
- * @param batch
- * @param prerequisites
- */
- abstract protected void processJobs(List<Job> batch, Object serviceProvider, ServicePrerequisites prerequisites)
- throws EngineException;
-
- /**
- * Size of a batch of jobs to be processed together. For email, this may be more than 1,
- * and this method should be overridden.
- * @return size of a batch of jobs to be processed together.
- */
- protected int getJobBatchSize()
- {
- return 1;
- }
-
- /**
- * Sets jobStatus in job, and post job report.
- * If no rescheduling, then decrement number of remainingJobs,
- * @param job
- * @param jobStatus
- * @param errorText
- */
- public void postJobStatus(Job job)
- {
- if (postBack != null)
- {
- JobReport report = mkJobReport(job);
- postBack.queueReport(report.toString());
- }
-
- if (job.jobStatus.isTerminal())
- {
- remainingJobs--;
- if (remainingJobs == 0)
- {
- terminate(BroadcastState.COMPLETED);
- }
- else if (getActiveCount() == 0)
- {
- if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED);
- else if (state == BroadcastState.HALTING) setState(BroadcastState.HALTED);
- }
- }
- }
-
- /**
- * Sets jobStatus in job, and post job report.
- * If no rescheduling, then decrement number of remainingJobs,
- * @param job
- * @param jobStatus
- * @param errorText
- * @param reschedule - reschedule time in milliseconds (-1 means do not reschedule).
- */
- protected void postJobStatus(Job job, long rescheduleTimeMS)
- {
- postJobStatus(job);
- if (rescheduleTimeMS == 0)
- {
- addJob(job);
- }
- else if (rescheduleTimeMS > 0)
- {
- rescheduleJob(job, rescheduleTimeMS);
- }
- }
-
- public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
- {
- Runnable r = new Runnable() { public void run() { addJob(job);}};
- return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public BroadcastState getState()
- {
- return state;
- }
-
-
- }
|