|
- package altk.comm.engine;
-
- import java.io.IOException;
- import java.io.PrintWriter;
- import java.time.LocalTime;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Queue;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledFuture;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
-
- import javax.servlet.http.HttpServletRequest;
- import javax.servlet.http.HttpServletResponse;
-
- import org.apache.log4j.Logger;
- import org.json.simple.JSONObject;
-
- import altk.comm.engine.Job.JobStatus;
- import altk.comm.engine.exception.BroadcastError;
- import altk.comm.engine.exception.BroadcastException;
- import altk.comm.engine.exception.EngineException;
- import altk.comm.engine.exception.PlatformError;
- import altk.comm.engine.exception.PlatformException;
-
- /**
- * Broadcast class absorbs what was formerly known as Dispatcher class.
- *
- * @author Yuk-Ming
- *
- */
- public abstract class Broadcast
- {
- private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
- private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id";
- private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0;
- static final String DAILY_STOP_KEY = "daily_stop";
- static final String DAILY_START_KEY = "daily_start";
-
- public final String broadcastType;
- private String broadcastId;
-
- private BroadcastState state = BroadcastState.ACCEPTED;
- private Object stateSemaphore = new Object();
-
- String reason;
- String stateErrorText;
- public CommEngine commEngine;
- public final long receiveTime;
- public long serviceStartTime;
- public long serviceEndTime;
- public long changeStateTime;
- protected String activityRecordIdParamName;
- private String jobReportRootNodeName;
-
- /**
- * Set when reading request XML, but never used.
- */
- private String launchRecordId;
-
- // protected XPath xpathEngine;
- protected String postbackURL;
- private Postback postback;
- public long expireTime;
- protected String daily_start = "";
- protected String daily_stop = "";
-
- /**
- * Sleep time in milliseconds between consecutive job processing (actualliy batch)
- */
- protected long sleepBetweenJobs;
-
- protected static Logger myLogger = Logger.getLogger(Broadcast.class);
- protected List<Service> serviceThreadPool;
- private Object resumeFlag; // Semaphore for dispatcher threads to resume.
- protected List<Recipient> recipientList;
-
- private ScheduledExecutorService scheduler;
- private int jobsTotal;
-
- /** Queue of jobs ready to be servivced. Semaphore for next 3 fields */
- private Queue<Job> readyQueue;
- /** Count of items in scheduler */
- private int scheduledJobs;
- /** Instantaneous number of jobs being serviced by service provider */
- private int serviceActivityCount;
-
-
- private Integer transactions;
-
-
- /** Running count of successful jobs */
- private AtomicInteger successCount;
- private int pauseThreshold;
- private int lastPauseCount;
-
-
- public static enum BroadcastState
- {
- /** Broadcast request accepted for execution */
- ACCEPTED,
- /** Servicing jobs */
- RUNNING,
- /** User action, causing system to quiesce, i.e. quiet down */
- PAUSING,
- /** System is paused and quiet. Ready to resume */
- PAUSED,
- /** User action */
- CANCELING,
- /** Ireoverable internal or service provider error */
- ABORTING,
- /** All servicing done, reporting may still be ongoing */
- COMPLETED,
- CANCELED(true), // Final state
- PURGED(true), // Final state
- ABORTED(true), // final state
- EXPIRED(true), // final state
- /** All servicing and reporting done */
- ALLDONE(true) // Final state
- ;
-
- final public boolean isFinal;
-
- private BroadcastState()
- {
- isFinal = false;
- }
-
- private BroadcastState(boolean isFinal)
- {
- this.isFinal = isFinal;
- }
- }
-
- public enum StateChangeStatus
- {
- SUCCESS,
- NO_CHANGE,
- FORBIDDEN
- }
-
- /**
- * When a Broadcast is first created, its state is INSTALLING, during which
- * time, the broadcast request XML is digested. After this period, the
- * state of the broadcast enters into RUNNING, and it begins with adding
- * all VoiceJobs in the request to the Dispatcher's queues. During this period
- * calls are dispatched, setup, and terminate. When the last call is terminated,
- * the broadcast enters into the COMPLETED state.
- *
- * Transitions from INSTALLING to RUNNING, and from RUNNING to COMPLETED happen
- * automatically without any external influence.
- *
- * At any time, the broadcast may be canceled by user action, which causes
- * the broadcast to transition from the RUNNING state into the CANCELED state.
- *
- * Since the RUNNING state may go to the COMLETED or to the CANCELED state,
- * each due to a different thread, there needs to be a mutex to guarantee
- * state and data integrity.
- *
- * A INSTALLING or RUNNING broadcast may be paused by user action, stopping the
- * Dispatcher from making new calls and causing the broadcast to go to
- * the HALTED state. Certain error conditions result in the HALTED state.
- *
- * User may order a broadcast to be removed entirely with a purge command,
- * causing the broadcast to go to the PURGED state. This state is there so
- * objects, that has reference to a broadcast which has been purged, know
- * what to do in that situation.
- *
- * We need the pause operation to set a RUNNING machine to be in the PAUSING state,
- * allow ongoing jobs to proceed to normal termination,
- * whereas no new jobs are started. State goes from PAUSING to HALTED.
- *
- * Cancel-nice operation is pause, followed by the automatic transition
- * from HALTED to CANCELED.
- *
- * Cancel operation forces all jobs to abort, and the state transitions to CANCELED
- * immediately.
- *
- * Because the Dispatcher and Broadcast is one-to-one, and because they access each other's
- * data, Dispatcher should be combined into Broadcast.
- *
- * @author Yuk-Ming
- *
- */
- static Map<BroadcastState, List<BroadcastState>> toStates;
-
- static
- {
- // Initialize legal transitions of state machine.
- // For each state, define a list of legal states that this state can transition to
- toStates = new HashMap<BroadcastState, List<BroadcastState>>();
-
- // Transitions from INSTALLING
- toStates.put(BroadcastState.ACCEPTED, Arrays.asList(
- BroadcastState.RUNNING, // Normal transition
- BroadcastState.CANCELING, // User action
- BroadcastState.CANCELED, // User action
- BroadcastState.PAUSING, // User action
- BroadcastState.PAUSED, // User action
- BroadcastState.PURGED, // User action
- BroadcastState.ABORTING,
- BroadcastState.ABORTED,
- BroadcastState.EXPIRED,
- BroadcastState.ALLDONE // When recipient list is empty
- ));
-
- // Transitions from RUNNING
- toStates.put(BroadcastState.RUNNING, Arrays.asList(
- BroadcastState.COMPLETED, // after completed all sends
- BroadcastState.CANCELING, // User action
- BroadcastState.CANCELED, // User action
- BroadcastState.PAUSING, // User action
- BroadcastState.PAUSED, // User action
- BroadcastState.PURGED, // User action
- BroadcastState.ABORTING,
- BroadcastState.ABORTED,
- BroadcastState.EXPIRED
- ));
-
- // Transitions from CANCELING
- toStates.put(BroadcastState.CANCELING, Arrays.asList(
- BroadcastState.ABORTING,
- BroadcastState.ABORTED,
- BroadcastState.COMPLETED,
- BroadcastState.CANCELED, // User action
- BroadcastState.PURGED // User action
- ));
-
- // Transitions from ABORTING
- toStates.put(BroadcastState.ABORTING, Arrays.asList(
- BroadcastState.ABORTED,
- BroadcastState.PURGED // User action
- ));
-
- // Transitions from PAUSING
- toStates.put(BroadcastState.PAUSING, Arrays.asList(
- BroadcastState.RUNNING, // User action
- BroadcastState.COMPLETED,
- BroadcastState.ABORTING,
- BroadcastState.CANCELING, // User action
- BroadcastState.CANCELED,
- BroadcastState.PAUSED,
- BroadcastState.PURGED // User action
- ));
-
- // Transitions from PAUSED
- toStates.put(BroadcastState.PAUSED, Arrays.asList(
- BroadcastState.RUNNING, // User action
- BroadcastState.CANCELED, // User action
- BroadcastState.CANCELING, // User action
- BroadcastState.ABORTING,
- BroadcastState.ABORTED,
- BroadcastState.PURGED // User action
- ));
- toStates.put(BroadcastState.COMPLETED, Arrays.asList(
- BroadcastState.ALLDONE // when all posting back is complete
- ));
- }
-
- public static class StateChangeResult
- {
- public StateChangeStatus stateChangeStatus;
- public BroadcastState currentState;
- public BroadcastState previousState;
-
- public StateChangeResult(StateChangeStatus stateChangeStatus,
- BroadcastState currentState, BroadcastState previousState)
- {
- this.stateChangeStatus = stateChangeStatus;
- this.currentState = currentState;
- this.previousState = previousState;
- }
- }
-
- protected enum PostbackThreadActionOnEmpty
- {
- CONTINUE,
- STOP,
- WAIT
- }
-
- protected class Service extends Thread
- {
- Object serviceProviderPeer;
-
- protected Service(String name) throws BroadcastException
- {
- serviceProviderPeer = getInitializedServiceProviderPeer();
- setName(name);
- }
-
- public void run()
- {
- myLogger.info("Thread starting...");
- for (;;)
- {
- if (serviceThreadsShouldPause())
- {
- synchronized (resumeFlag)
- {
- try
- {
- myLogger.debug("Paused");
- resumeFlag.wait();
- myLogger.debug("Pause ended");
- }
- catch (InterruptedException e)
- {
- myLogger.warn("Dispatcher thread interrupted while waiting to resume");
- }
- }
- }
- if (serviceThreadsShouldStop()) break;
- // Get a batch of jobs, if available
- myLogger.debug("Looking for jobs");
- List<Job> batch = new ArrayList<Job>();
- synchronized(readyQueue)
- {
- // We we are to get a batch of more than one, let us fill in the rest.
- for (int i = 0; i < getJobBatchSize(); i++)
- {
- Job job = readyQueue.poll();
- if (job == null) break;
- batch.add(job);
- }
-
- if (batch.size()== 0)
- {
- // wait for jobs
- try
- {
- myLogger.debug("Waiting for jobs");
- readyQueue.wait();
- // go back to look for jobs
- continue;
- }
- catch (Exception e)
- {
- // go back to look for jobs
- continue;
- }
- }
- updateServiceActivityCount(batch.size());
- }
-
- // Process jobs.
- // Mark start time
- long now = System.currentTimeMillis();
- for (Job job : batch)
- {
- job.startTime = now;
- }
-
- // Service the jobs
- // But first get dependent resource
- // which includes allocation from capacity. Only returns when the required allocation
- // is obtained. Example, RTP port allocation, limit due to total number of allowable calls.
- ServicePrerequisites prerequisites = secureServicePrerequisites();
-
- try
- {
- int transactions = processJobs(batch, serviceProviderPeer, prerequisites);
- incrementTransactions(transactions);
- }
- catch (EngineException e)
- {
- // Aborting
- setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText);
- }
- catch (Throwable t)
- {
- // This is unexpected. Log stack trace
- myLogger.error("Caught unexpected Throwable", t);
- terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
- }
- finally
- {
- updateServiceActivityCount(-batch.size());
- }
- if (sleepBetweenJobs > 0)
- {
- try
- {
- Thread.sleep(sleepBetweenJobs);
- }
- catch (InterruptedException e1)
- {
- // Do nothing?
- }
- }
- }
- // Exit thread
- myLogger.info("Thread terminating");
- System.out.println(getName() + " terminating");
- closeServiceProvider(serviceProviderPeer);
- }
- }
-
-
- /**
- *
- * @param broadcastType
- * @param activityRecordIdParamName - if null, default is used.
- * @param jobReportRootNodeName
- */
- protected Broadcast(String broadcastType, String activityRecordIdParamName, String jobReportRootNodeName)
- {
- this.broadcastType = broadcastType;
- this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName;
- this.jobReportRootNodeName = jobReportRootNodeName;
-
- postback = null;
- successCount = new AtomicInteger(0);
- sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT;
- readyQueue = new LinkedBlockingQueue<Job>();
- serviceThreadPool = new ArrayList<Service>();
- recipientList = new ArrayList<Recipient>();
- scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE);
- resumeFlag = new Object();
-
- receiveTime = System.currentTimeMillis();
- serviceActivityCount = Integer.valueOf(0);
- transactions = Integer.valueOf(0);
- lastPauseCount = 0;
- }
-
- private void incrementTransactions(int delta)
- {
- synchronized (transactions)
- {
- transactions += delta;
- lastPauseCount += delta;
- }
- if (pauseThreshold > 0 && lastPauseCount >= pauseThreshold)
- {
- pause(null, null);
- }
- }
-
- /**
- * Experimental formulation where it takes over directing
- * the activity of a Broadcast, as it should, instead of relegating
- * it to CommEngine. This is directly invoked by CommEngine.doPost method,
- * and is much easier to read and comprehend.
- * <p>It is responsible for
- * <ul>
- * <li>Replying with HTTP response, and closing HTTP request and response
- * <li>Does broadcast and posts real-time progress, using post back queues
- * from CommEngine.
- * </ul>
- * Strategy of execution:
- * <ul>
- * <li>Decode xml request
- * <li>Set up Service threads
- * <li> Invite derived class to set up contexts, one for each of the Service threads.
- * <li>Do broadcast.
- * </ul>
- * This method is to be invoked by the CommEngine.doPost method, which fields a HTTP POST.
- * <p>
- * Adopting the use of this method by CommEngine
- * is 100% compatible with derived classes VoiceBroadcast and EmailBroadcast
- * existing today, 6/18/2016.
- *
- *
- * @param request
- * @param response
- * @param commEngine
- */
- protected void doPost(HttpServletRequest request, HttpServletResponse response,
- CommEngine commEngine)
- {
- myLogger.debug("Entering Broadcast.doPost method");
- BroadcastException myException = null;
- this.commEngine = commEngine;
- pauseThreshold = commEngine.getPauseThreshold();
- try
- {
- // Check validity of operating hours parameters
-
- boolean notInService = commEngine.notInService();
- decode(request, notInService);
- // Now that have decoded the id of this broadcast,
- // ask CommEngine to install it with its id.
- commEngine.installBroadcast(this);
- setOperatingHours(DAILY_START_KEY, daily_start);
- setOperatingHours(DAILY_STOP_KEY, daily_stop);
-
- if (notInService)
- {
- throw new PlatformException(PlatformError.RUNTIME_ERROR,
- "Not in service");
- }
- if (recipientList.size() == 0)
- {
- CommonLogger.activity.info("Broadcast " + getBroadcastId() + ": No recipients");
- setState(BroadcastState.ALLDONE, "No recipients", null);
- return;
- }
- initSync(commEngine.getResources());
- for (Recipient recipient : recipientList)
- {
- readyQueue.add(mkJob(recipient));
- }
- if (state == BroadcastState.ALLDONE) return;
- }
- catch (BroadcastException e)
- {
- myException = e;
- setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText);
- CommonLogger.alarm.error("Broadcast aborting: " + e.getMessage());
- myLogger.error("Broadcast aborted", e);
- return;
- }
- catch (Throwable t)
- {
- // Caught stray unexpected runtime problem
- CommonLogger.alarm.error("Broadcast aborted: " + t);
- myLogger.error("Broadcast aborted", t);
- myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage());
- setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText);
- }
- finally
- {
- // Return regular or error response
- String responseXML = getResponseXML(myException);
- PrintWriter writer;
- try
- {
- writer = response.getWriter();
- writer.write(responseXML);
- writer.close();
- }
- catch (IOException e)
- {
- myLogger.error("Failed to write reponse to requester. Aborts broadcast." );
- if (state != BroadcastState.ABORTED)
- {
- setState(BroadcastState.ABORTED, "Failed to reply to requester", e.getMessage());
- }
- return;
- }
-
- if (myException != null) return;
- }
- // So far so good, we now go ahead with completing
- // initialization.
- try
- {
-
- jobsTotal = recipientList.size();
- postback = new Postback(this,
- getPostbackMaxQueueSize(),
- getPostbackSenderPoolSize(),
- getPostbackMaxBatchSize());
-
- initAsync();
-
- // Create service thread pool to dispatch jobs,
- // at the same time, setting up a list of service thread names
- // for use by derived class to set up contexts in which
- // these threads run.
- int serviceThreadPoolSize = getServiceThreadPoolSize();
- myLogger.debug("At creating service threads, serviceThreadPoolSize = " + serviceThreadPoolSize);
- List<String> serviceThreadNames = new ArrayList<String>();
- for (int i = 0; i < serviceThreadPoolSize; i++)
- {
- String threadName = broadcastId + "-service-thread." + i;
- Service serviceThread = new Service(threadName);
- serviceThreadPool.add(serviceThread);
- serviceThreadNames.add(threadName);
- }
-
- doBroadcast();
- }
- catch (BroadcastException e)
- {
- setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText);
- CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage());
- myLogger.error("Broadcast aborted", e);
- }
- catch (Exception e)
- {
- setState(BroadcastState.ABORTED, BroadcastError.UNEXPECTED_EXCEPTION.toString(), e.getMessage());
- CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage());
- myLogger.error("Broadcast aborted", e);
- }
- finally
- {
- destroyResources();
- if (postback != null)
- {
- postback.wrapup();
- postback = null;
- }
- CommonLogger.activity.info("Broadcast " + getId() + " terminated");
- }
- }
-
- protected int getPostbackMaxQueueSize()
- {
- return commEngine.getPostbackMaxQueueSize();
- }
-
- protected int getPostbackMaxBatchSize()
- {
- return commEngine.getPostbackMaxBatchSize();
- }
-
- protected int getServiceThreadPoolSize()
- {
- return commEngine.getServiceThreadPoolSize();
- }
-
- protected int getPostbackSenderPoolSize()
- {
- return commEngine.getPostbackSenderPoolSize();
- }
-
- protected abstract void returnPrerequisites(ServicePrerequisites prerequisites);
-
- /**
- * Creates and initializes a service provider, to be used by only one service thread.
- * If service provider is not thread-specific, then this method may return null, and
- * a common service provider is created outside of this method.
- *
- * @return service provider as a class Object instance.
- * @throws BroadcastException
- */
- protected abstract Object getInitializedServiceProviderPeer() throws BroadcastException;
-
- /**
- * Obtains the required components to support a service; e.g. RTP port, or a place
- * in maximum total number of calls. Does not return till the reequired prerequisites are obtained.
- * @return null, if no prerequisite is required, as in the case of email and sms engines.
- */
- abstract protected ServicePrerequisites secureServicePrerequisites();
-
- abstract public void closeServiceProvider(Object serviceProvider);
-
- /**
- * Makes a state transition to the given newState if the transition from
- * the current state is legal. Also posts back a state change notification.
- * @param newState
- * @return StateChangeResult
- */
- public StateChangeResult setState(BroadcastState newState)
- {
- return setState(newState, null, null);
- }
-
- /**
- * Makes a state transition to the given newState if the transition from
- * the current state is legal. Also posts back a state change notification.
- * @param newState
- * @return StateChangeResult
- */
- public synchronized StateChangeResult setState(BroadcastState newState,
- String reason, String stateErrorText)
- {
- boolean isLegal;
- BroadcastState prev = null;
- if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null);
- synchronized(stateSemaphore)
- {
- List<BroadcastState> to = toStates.get(state);
- isLegal = (to == null? false : to.contains(newState));
- prev = state;
- if (isLegal)
- {
- this.reason = reason;
- this.stateErrorText = stateErrorText;
-
- CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState));
- state = newState;
- changeStateTime = System.currentTimeMillis();
- if (state == BroadcastState.RUNNING) serviceStartTime = changeStateTime;
- if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime;
- if (postback != null)
- {
- if (state == BroadcastState.ALLDONE) postback.queueReport(mkStatusReport());
- else postback.queueReportFirst(mkStatusReport());
- }
- return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev);
- }
- else
- {
- // log illegal state transition with call trace
- Exception e = new Exception(String.format("Broadast %s ignored illegal transition from %s to %s", broadcastId, prev, newState));
- myLogger.error(e.getMessage());
- myLogger.debug("This exception is not thrown -- only for debugging information", e);
-
- return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null);
- }
- }
- }
-
- protected void setBroadcastId(String broadcastId)
- {
- if (broadcastId == null)
- throw new IllegalArgumentException(
- "Argument broadcastId in Broadcast.setBroadcastId method cannot be null");
- if (this.broadcastId != null)
- throw new IllegalStateException(
- "Broadcast.setBroadcastId method cannot be invoked more than once for a Broadcast");
- this.broadcastId = broadcastId;
- }
-
- protected void setLaunchRecordId(String launchRecordId)
- {
- if (launchRecordId == null)
- throw new IllegalArgumentException(
- "Argument launchRecordId in Broadcast.setLaunchRecordId method cannot be null");
- if (this.launchRecordId != null)
- throw new IllegalStateException(
- "Broadcast.setLaunchRecordId method cannot be invoked more than once for a Broadcast");
- this.launchRecordId = launchRecordId;
- }
-
- public String getBroadcastId()
- {
- return broadcastId;
- }
-
- public String getLaunchRecordId()
- {
- return launchRecordId;
- }
-
- /**
- *
- * @param e
- * @return HTTP response for normal case (when exception e is null), or with exception
- */
- public String getResponseXML(BroadcastException e)
- {
- String tagName = broadcastType + "_response";
- StringBuffer responseXML = new StringBuffer("<" + tagName);
- if (broadcastId != null && broadcastId.length() > 0)
- {
- responseXML.append(" broadcast_id=\"");
- responseXML.append(broadcastId);
- responseXML.append("\"");
- }
- responseXML.append(" accepted='");
- responseXML.append(e != null || state == BroadcastState.COMPLETED ? "FALSE" : "TRUE");
- responseXML.append("'");
- if (e == null)
- {
- if (reason != null && reason.length() > 0)
- {
- responseXML.append(" error='" + reason + "'");
- }
- responseXML.append('>');
- }
- else
- {
- if (e.errorCode != null)
- {
- responseXML.append(" error='");
- responseXML.append(e.errorCode.toString());
- responseXML.append("'");
- }
- responseXML.append('>');
- if (e.errorText != null)
- {
- responseXML.append("<error_text>");
- responseXML.append(e.errorText);
- responseXML.append("</error_text>");
- }
- }
- responseXML.append("</" + tagName + '>');
- return responseXML.toString();
- }
-
- public String getPostbackURL()
- {
- return postbackURL;
- }
-
- protected String mkResponseXML(String errorCode, String errorText)
- {
- String tagName = broadcastType + "_response";
- StringBuffer responseXML = new StringBuffer("<" + tagName);
- String broadcastId = getBroadcastId();
- if (broadcastId != null && broadcastId.length() > 0)
- {
- responseXML.append(" broadcast_id=\"");
- responseXML.append(broadcastId);
- responseXML.append("\"");
- }
- responseXML.append(" accepted='");
- responseXML.append(errorCode == null ? "TRUE" : "FALSE");
- responseXML.append("'");
- if (errorCode == null)
- {
- responseXML.append('>');
- }
- else
- {
- responseXML.append(" error='");
- responseXML.append(errorCode);
- responseXML.append("'");
- responseXML.append('>');
- if (errorText != null)
- {
- responseXML.append("<error_text>");
- responseXML.append(errorText.replaceAll("\\&", "&")
- .replaceAll("<", "<"));
- responseXML.append("</error_text>");
- }
- }
- responseXML.append("</" + tagName + '>');
- return responseXML.toString();
- }
-
- /**
- * If finalState is final, then this state is set, and dispatcher threads are stopped.
- * Overriding implementation may release all other resources, like timers.
- * @param finalState
- */
- public void terminate(BroadcastState finalState, String reason)
- {
- if (!finalState.isFinal) throw new IllegalArgumentException("Argument finalState " + finalState + " in Broadcast.terminate method is not final");
- StateChangeResult result = setState(finalState, reason, null);
- switch (result.stateChangeStatus)
- {
- case SUCCESS:
- break;
- case NO_CHANGE:
- return;
- case FORBIDDEN:
- myLogger.error("Not allow to terminate broadcast in " + result.currentState + " state");
- return;
- default: // Should not happen
- return;
- }
-
- // Wake up all dispatcher threads waiting on readyQueue so they will all stop
- synchronized(readyQueue)
- {
- readyQueue.notifyAll();
- }
-
- // Wake up all sleeping dispatcher threads for same reason.
- for(Thread t : serviceThreadPool)
- {
- try
- {
- t.interrupt();
- }
- catch (Exception e)
- {
- myLogger.warn("Interrupted while waiting for Thread " + t.getName() + " to terminate");
- }
- }
-
- // Quiesce scheduler, and terminate it.
- scheduler.shutdownNow();
- }
-
- /**
- * Defaults to current state
- * @return XML string
- */
- protected String mkStatusReport()
- {
- StringBuffer statusBf = new StringBuffer();
- String topLevelTag = broadcastType;
- String broadcastId = getBroadcastId();
- if (broadcastId == null) broadcastId = "";
- statusBf.append("<" + topLevelTag + " broadcast_id='" + broadcastId
- + "' receive_time='" + receiveTime
- + "' recipient_count='" + recipientList.size() + "'");
- if (launchRecordId != null)
- {
- statusBf.append(" launch_record_id='" + launchRecordId + "'>");
- }
- statusBf.append("<current_time>" + System.currentTimeMillis() + "</current_time>");
- if (serviceStartTime > 0) statusBf.append("<service_start_time>" + serviceStartTime
- + "</service_start_time>");
- if (state.isFinal && serviceEndTime > 0) statusBf.append("<service_end_time>" + serviceEndTime
- + "</service_end_time>");
- statusBf.append("<state>" + state + "</state><state_change_time>" + changeStateTime
- + "</state_change_time>");
- statusBf.append("<reason>" + (reason == null? "" : Util.xmlEscape(reason))
- + "</reason>");
- statusBf.append("<error_text>" + (stateErrorText == null? "" : Util.xmlEscape(stateErrorText))
- + "</error_text>");
- statusBf.append("<transactions>" + transactions + "</transactions>");
- statusBf.append("<success>" + successCount.intValue() + "</success>");
- statusBf.append("<job_summary completed='" + getCompletedJobCount() +
- "' ready='" + getPendingJobCount() + "'");
- statusBf.append(" active='" + getActiveJobCount() + "'");
- statusBf.append("></job_summary>\n");
- statusBf.append("<daily_stop>" + daily_stop + "</daily_stop>");
- statusBf.append("<daily_start>" + daily_start + "</daily_start>\n");
- statusBf.append(additionalStatusXML());
- statusBf.append("</" + topLevelTag + ">");
- String statusReport = statusBf.toString();
- return statusReport;
- }
-
- /**
- * Derived class may add additional status in a broadcast status XML posted back to portal,
- * by returning an XML tag which will be included immediately in the top-level tag
- * of the broadcast status.
- */
- protected String additionalStatusXML()
- {
- return "";
- }
-
- protected void onExpire()
- {
- }
-
- protected void setExpireTime(long expireTime)
- {
- this.expireTime = expireTime;
- }
-
- public long getExpireTime()
- {
- return expireTime;
- }
-
- /**
- *
- * @return instantaneous number of jobs being serviced by service provider
- */
- protected int getActiveJobCount()
- {
- return serviceActivityCount;
- }
-
- /**
- * Parses broadcastId and return if notInService is true.
- * Otherwise, continue parsing postBackUrl, expireTime, recipientList,
- * and implementation-specific data from request.
- * Avoid throwing an exception before parsing and setting broadcastId.
- * @param notInService
- * @throws EngineException
- */
- protected abstract void decode(HttpServletRequest request, boolean notInService)
- throws EngineException;
-
- protected abstract void initSync(EngineResources resources) throws BroadcastException;
-
- protected Job mkJob(Recipient recipient)
- {
- return new Job(recipient);
- }
-
- /**
- * Overriding implementation performs time consuming initialization, after returning
- * POST http status indicating accepting broadcast for processing.
- *
- * @throws BroadcastException - to abort broadcast
- */
- protected void initAsync() throws BroadcastException
- {
- // Do nothing in base class.
- }
-
- public String getId()
- {
- return broadcastId;
- }
-
- /**
- * Sets the stateMachine to CANCEL
- * @param reasons - may be null.
- */
- protected void cancel(String reason, PrintWriter out)
- {
- BroadcastState targetState = getActiveJobCount() == 0?
- BroadcastState.CANCELED : BroadcastState.CANCELING;
- StateChangeResult result = setState(targetState, reason, null);
- String responseContent = null;
- switch (result.stateChangeStatus)
- {
- case SUCCESS:
- responseContent = "Broadcast is being canceled";
- break;
- case NO_CHANGE:
- responseContent = "Already canceled";
- break;
- case FORBIDDEN:
- responseContent = "Not canceled: Not allowed to cancel a broadcast in " + result.currentState + " state";
- }
- out.write(responseContent);
-
- wakeUpServiceThreads();
- }
-
- /**
- *
- * @param reason
- * @param out
- */
- protected void pause(String reason, PrintWriter out)
- {
- if (state == BroadcastState.ACCEPTED || state.isFinal) return;
-
- // Sets state to PAUSING, which is monitored by Broadcast.Service threads.
- // EVentually, when all service activity ends, the state transitions to PAUSED
- StateChangeResult result = setState(BroadcastState.PAUSING, reason, null);
- switch (result.stateChangeStatus)
- {
- case FORBIDDEN:
- if (out != null) out.write("pause not allowed");
- break;
- case SUCCESS:
- lastPauseCount = 0;
- if (out != null) out.write("Broadcast is being PAUSED");
- break;
- case NO_CHANGE:
- if (out != null) out.write("Broadcast is already RUNNING");
- }
- }
-
- protected void resume(String reason, PrintWriter out)
- {
- if (state == BroadcastState.ACCEPTED || state.isFinal) return;
- if (!withinOperatingHours())
- {
- if (out != null) out.write("Cannot resume outside operating hours");
- return;
- }
- synchronized (resumeFlag)
- {
- StateChangeResult result = setState(BroadcastState.RUNNING, reason, null);
- switch (result.stateChangeStatus)
- {
- case FORBIDDEN:
- if (out != null) out.write("resume not allowed");
- break;
- case SUCCESS:
- if (out != null) out.write("Broadcast resumed");
- resumeFlag.notifyAll();
- break;
- default:
- break;
- }
- }
- }
- /**
- * Derived class may make its own Implementation of JobReport
- * @return
- */
- protected JobReport mkJobReport()
- {
- return new JobReport();
- }
-
- public void addJob(Job job)
- {
- synchronized(readyQueue)
- {
- readyQueue.add(job);
- readyQueue.notifyAll();
- }
- }
-
- /**
- * For use by the new Broadcast.doPost method.
- * It changes state to RUNNING and waits for all Service threads
- * to terminate after starting them.
- * @throws BroadcastException
- */
- public void doBroadcast() throws BroadcastException
- {
- changeStateTime = System.currentTimeMillis();
- if (!serviceThreadsShouldStop())
- {
- if (withinOperatingHours())
- {
- setState(BroadcastState.RUNNING);
- }
- else
- {
- setState(BroadcastState.PAUSED, "clock", null);
- }
-
- // Start the dispatcher threads
- for (Service thread : serviceThreadPool)
- {
- thread.start();
- }
-
- // Wait for them to finish
- for (Service thread : serviceThreadPool)
- {
- try
- {
- thread.join();
- }
- catch (InterruptedException e)
- {
- myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e);
- }
- }
- waitForEndOfService();
- }
- }
-
- private boolean withinOperatingHours() {
- int dailyStartMin = convert2Min(daily_start);
- int dailyStopMin = convert2Min(daily_stop);
- // Ensure daily stop > daily start
- if (dailyStopMin < dailyStartMin) dailyStopMin += 24 * 60;
- LocalTime now = LocalTime.now();
- int nowMin = now.getHour() * 60 + now.getMinute();
- if (nowMin < dailyStartMin) nowMin += 24 * 60;
- boolean within = nowMin >= dailyStartMin && nowMin < dailyStopMin;
- return within;
- }
-
- private int convert2Min(String hhmm) {
- String[] parts = hhmm.split(":");
- int hh = Integer.parseInt(parts[0]);
- int mm = Integer.parseInt(parts[1]);
- return hh * 60 + mm;
- }
-
- /**
- * Derived class should wait for end of service before returning.
- * At this point all service threads have already ended. If the derived
- * class has other threads still taking part in providing service, wait for
- * them to terminate.
- */
- protected void waitForEndOfService() {}
-
- /**
- * Derived class destroy resources needed for providing service
- */
- protected void destroyResources() {}
-
- /**
- * Derived may release resources here.
- */
- protected void close()
- {
- myLogger.debug("In close()");;
- postback.wrapup();
- postback = null;
- }
-
- /**
- * Derived class may set up environment before starting Service threads.
- * @param serviceThreadNames
- */
- protected void initServiceThreadContexts()
- {
- // Do nothing in base class
- }
-
- /**
- * Experimental - needed to go with the also experimental method exec.
- * @param serviceThreadNames
- */
- protected void initServiceThreadContexts(List<String> serviceThreadNames)
- {
- // Do nothing in base class
- }
-
- /**
- * Examines if service threads should stop running, or even start
- *
- * @return
- */
- private boolean serviceThreadsShouldStop()
- {
- if (System.currentTimeMillis() >= expireTime)
- {
- setState(BroadcastState.EXPIRED);
- wakeUpServiceThreads();
- return true;
- }
- if (state == BroadcastState.CANCELING ||
- state == BroadcastState.ABORTING ||
- state.isFinal)
- {
- return true;
- }
- if (getRemainingJobCount() == 0)
- {
- wakeUpServiceThreads();
- return true;
- }
- return false;
- }
-
- private void wakeUpServiceThreads()
- {
- synchronized (readyQueue)
- {
- readyQueue.notifyAll();
- }
- synchronized (resumeFlag)
- {
- resumeFlag.notifyAll();
- }
- }
-
- private boolean serviceThreadsShouldPause()
- {
- return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING;
- }
-
- /**
- * job status is reported back to this broadcast, via the logAndQueueForPostBack method.
- * This method should use the updateServiceActivityCount(+-1) method to allow Broadcast
- * to keep track of overall service progress. The serviceActvityCount is used to determine
- * if all service threads are idle or terminated.
- * @param batch
- * @param prerequisites
- * @return int - number of transactions employed to service these jobs.
- * @throw Exception to abort broadcast
- */
- abstract protected int processJobs(List<Job> batch, Object serviceProvider, ServicePrerequisites prerequisites)
- throws EngineException;
-
- /**
- * Size of a batch of jobs to be processed together. For email, this may be more than 1,
- * and this method should be overridden.
- * @return size of a batch of jobs to be processed together.
- */
- protected int getJobBatchSize()
- {
- return 1;
- }
-
- protected void updateServiceActivityCount(int increment)
- {
- synchronized (readyQueue)
- {
- serviceActivityCount += increment;
- if (increment < 0 && serviceActivityCount <= 0)
- {
- if (state == BroadcastState.RUNNING
- || state == BroadcastState.PAUSING
- || state == BroadcastState.CANCELING
- )
- {
- // TODO: investigate possibility that 0 remainingJobCount may
- // not be final. It may still change because a finishing job
- // may cause a job to be scheduled.
- if (getRemainingJobCount() == 0) {
- setState(BroadcastState.COMPLETED);
- }
- }
- }
- }
- }
-
- /**
- * Sets jobStatus in job, and post job report.
- * If jobStatus is final, and no rescheduling,
- * then decrement number of remainingJobs,and increment completedJobCount.
- * @param job
- * @param jobStatus
- * @param errorText
- */
- public void postJobStatus(Job job)
- {
- if (job.jobStatus == JobStatus.SUCCESS) successCount.incrementAndGet();
- if (postback != null)
- {
- JobReport report = mkJobReport();
- report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName);
- report.init(job);
- postback.queueReport(report.toString());
- }
-
- }
-
- /**
- * Logs completedJobCount, readyQueue.size(),
- * active job count, and total.
- * Job statistics are collected by length of readyQueue, completedJobCount,
- */
- private void logJobCount(String title)
- {
- if (postback == null) {
- myLogger.debug(title + ": postback = null");
- myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled: %d, total jobs: %d, remaining: %d, postQueue: ",
- title,
- state,
- getCompletedJobCount(),
- getActiveJobCount(),
- readyQueue.size(),
- scheduledJobs,
- jobsTotal,
- getRemainingJobCount()
- ));
- return;
- }
- if (postback.postQueue == null) {
- myLogger.debug(title + ": postback.postQueue = null");
- myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled: %d, total jobs: %d, remaining: %d, postQueue: ",
- title,
- state,
- getCompletedJobCount(),
- getActiveJobCount(),
- readyQueue.size(),
- scheduledJobs,
- jobsTotal,
- getRemainingJobCount()
- ));
- return;
- }
- myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled %d, total jobs: %d, remaining: %d, postQueue: %d",
- title,
- state,
- getCompletedJobCount(),
- getActiveJobCount(),
- readyQueue.size(),
- scheduledJobs,
- jobsTotal,
- getRemainingJobCount(),
- postback.postQueue.size()
- ));
- }
-
- /**
- * Number of jobs to be completed.
- * @return
- */
- private int getRemainingJobCount()
- {
- synchronized(readyQueue) {
- return readyQueue.size() + scheduledJobs;
- }
- }
-
- public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
- {
- // No more rescheduling on cancel, abort, expired, or alldone
- if (state == BroadcastState.CANCELING
- || state == BroadcastState.CANCELED
- || state == BroadcastState.EXPIRED
- || state == BroadcastState.ABORTED
- || state == BroadcastState.ABORTING
- || state == BroadcastState.ALLDONE
- )
- {
- return null;
- }
-
- job.errorText = "";
- if (rescheduleTimeMS == 0)
- {
- addJob(job);
- return null;
- }
-
- synchronized(readyQueue) {
- scheduledJobs++;
- }
- Runnable r = new Runnable() { public void run() {
- synchronized(readyQueue) {
- scheduledJobs--;
- addJob(job);
- }
- }
- };
- return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public BroadcastState getState()
- {
- return state;
- }
-
- public int getPendingJobCount()
- {
- switch (state)
- {
- case RUNNING:
- case PAUSING:
- case PAUSED:
- return getRemainingJobCount();
- default:
- return 0;
- }
- }
-
- public int getCompletedJobCount()
- {
- synchronized(readyQueue)
- {
- return jobsTotal - getRemainingJobCount() - serviceActivityCount;
- }
- }
-
- public String getBroadcastType() {
- return broadcastType;
- }
-
- public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() {
- myLogger.debug("getPostbackThreadActionOnEmpty(): broadcast state " + state);
- if (state.isFinal) return PostbackThreadActionOnEmpty.STOP;
- int activeJobCount = getActiveJobCount();
- myLogger.debug("getPostbackThreadActionOnEmpty(): activeJobCount = " + activeJobCount);
-
- if (activeJobCount > 0) {
- return PostbackThreadActionOnEmpty.WAIT;
- }
-
- if (state == BroadcastState.PAUSING) {
- return setState(BroadcastState.PAUSED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS?
- PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.WAIT;
- }
- if (state == BroadcastState.CANCELING) {
- return setState(BroadcastState.CANCELED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS?
- PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
- }
- else if (state == BroadcastState.ABORTING) {
- return setState(BroadcastState.ABORTED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS?
- PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
- }
- else if (state == BroadcastState.COMPLETED) {
- return setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS?
- PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
- }
- else {
- return PostbackThreadActionOnEmpty.WAIT;
- }
- }
-
- /**
- * @return null or configuration in XML
- */
- public String getConfigXML()
- {
- StringBuffer configBuf = new StringBuffer();
- configBuf.append("<broadcast_configuration broadcast_id='" + broadcastId + "'>");
- configBuf.append("<" + DAILY_STOP_KEY + ">" + daily_stop + "</" + DAILY_STOP_KEY + ">");
- configBuf.append("<" + DAILY_START_KEY + ">" + daily_start + "</" + DAILY_START_KEY + ">");
- configBuf.append("</broadcast_configuration>");
- return configBuf.toString();
- }
-
- public void configure(JSONObject configuration) throws Exception {
- boolean timeChanged = false;
- for (String key : new String[] {DAILY_STOP_KEY, DAILY_START_KEY}) {
- String value = (String)configuration.get(key);
- if (value != null) {
- if (setOperatingHours(key, value)) {
- timeChanged = true;
- }
- }
- }
- if (timeChanged) enforceOperationHours();
- }
-
- /**
- * YML: At this time, we only enforce pause action when a broadcast is
- * outside its operating hours. The current design is not satisfactory and needs
- * a better solution.
- *
- * We are not automatically resuming a paused broadcast because the difference
- * between intention of an operator-initiated pause and
- * that of a clock pause needs clarification in their operation paradigm.
- * Question is when or if we allow a operator-initiated pause be resumed
- * when someone changes the operating hours of a broadcast in such a way
- * that the broadcast is at once within its operasting hours. it may be
- * be counter to the intention of the original operator.
- *
- * On the other hand, if that places the broadcast outside it operating hours,
- * it is safer to immediately pause it.
- *
- * To add clarity, we may need to separate the PAUSE state into OPERATOR_PAUSE and CLOCK_PAUSE,
- * and similarly PAUING state.
- */
- void enforceOperationHours() {
- if (state == BroadcastState.ABORTED) return;
- if (withinOperatingHours()) {
- // resume("clock", null);
- } else {
- pause("clock", null);
- }
- }
-
- /**
- * Sets timeParam to value
- * @param timeParam
- * @param value
- * @return false if no change
- */
- private boolean setOperatingHours(String timeParam, String value) {
- String timeOfDay = CommEngine.checkTimeOfDay(value);
- if (timeOfDay == null) throw new RuntimeException(String.format("Invalid value for %s: %s", timeParam, value));
- switch (timeParam) {
- case DAILY_STOP_KEY:
- if (timeOfDay.equals(daily_stop)) return false;
- daily_stop = timeOfDay;
- return true;
- case DAILY_START_KEY:
- if (timeOfDay.equals(daily_start)) return false;
- daily_start = timeOfDay;
- return true;
- default:
- throw new RuntimeException("Unknown parameter name: " + timeParam);
- }
- }
-
- @SuppressWarnings("unchecked")
- public JSONObject getConfigJSON() {
- JSONObject dataMap = new JSONObject();
- dataMap.put(DAILY_START_KEY, daily_start);
- dataMap.put(DAILY_STOP_KEY, daily_stop);
- childAddConfigJSON(dataMap);
- return dataMap;
- }
-
- protected void childAddConfigJSON(JSONObject dataMap) {
- }
-
- }
|