commit 1f2710069616b9108b149f3da7ea4727c82c3bd4 Author: ymlam Date: Mon Mar 5 07:27:37 2012 +0000 *** empty log message *** diff --git a/src/META-INF/MANIFEST.MF b/src/META-INF/MANIFEST.MF new file mode 100644 index 0000000..254272e --- /dev/null +++ b/src/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Class-Path: + diff --git a/src/altk/comm/engine/Broadcast.java b/src/altk/comm/engine/Broadcast.java new file mode 100644 index 0000000..a4c2ce9 --- /dev/null +++ b/src/altk/comm/engine/Broadcast.java @@ -0,0 +1,893 @@ +package altk.comm.engine; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import javax.servlet.http.HttpServletRequest; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import altk.comm.engine.exception.BroadcastException; +import altk.comm.engine.exception.EngineException; +import altk.comm.engine.postback.PostBack; + +/** + * Broadcast class absorbs what was formerly known as Dispatcher class. + * + * @author Yuk-Ming + * + */ +public abstract class Broadcast +{ + private static final int SCHEDULER_THREAD_POOL_SIZE = 5; + public final String broadcastType; + private String broadcastId; + + /** + * Set when reading request XML, but never used. + */ + private String launchRecordId; + + // protected XPath xpathEngine; + protected String postBackURL; + private PostBack postBack; + public long expireTime; + + static Logger myLogger = Logger.getLogger(Broadcast.class); + + /** + * This queue is designed for only one server + */ + private Queue readyQueue; + private List serviceThreadPool; + private Object resumeFlag; // Semaphore for dispatcher threads to resume. + protected List recipientList; + private int remainingJobs; + + private ScheduledExecutorService scheduler; + + public static enum BroadcastState + { + INSTALLING, + RUNNING, + HALTING, + HALTED, + CANCELING, + CANCELED, // Final state + PURGED, // Final state + ABORTED, // final state + COMPLETED // Final state + } + + public enum StateChangeStatus + { + SUCCESS, + NO_CHANGE, + FORBIDDEN + } + + /** + * When a Broadcast is first created, its state is INSTALLING, during which + * time, the broadcast request XML is digested. After this period, the + * state of the broadcast enters into RUNNING, and it begins with adding + * all VoiceJobs in the request to the Dispatcher's queues. During this period + * calls are dispatched, setup, and terminate. When the last call is terminated, + * the broadcast enters into the COMPLETED state. + * + * Transitions from INSTALLING to RUNNING, and from RUNNING to COMPLETED happen + * automatically without any external influence. + * + * At any time, the broadcast may be canceled by user action, which causes + * the broadcast to transition from the RUNNING state into the CANCELED state. + * + * Since the RUNNING state may go to the COMLETED or to the CANCELED state, + * each due to a different thread, there needs to be a mutex to guarantee + * state and data integrity. + * + * A INSTALLING or RUNNING broadcast may be paused by user action, stopping the + * Dispatcher from making new calls and causing the broadcast to go to + * the HALTED state. Certain error conditions result in the HALTED state. + * + * User may order a broadcast to be removed entirely with a purge command, + * causing the broadcast to go to the PURGED state. This state is there so + * objects, that has reference to a broadcast which has been purged, know + * what to do in that situation. + * + * We need the pause operation to set a RUNNING machine to be in the PAUSING state, + * allow ongoing jobs to proceed to normal termination, + * whereas no new jobs are started. State goes from PAUSING to HALTED. + * + * Cancel-nice operation is pause, followed by the automatic transition + * from HALTED to CANCELED. + * + * Cancel operation forces all jobs to abort, and the state transitions to CANCELED + * immediately. + * + * Because the Dispatcher and Broadcast is one-to-one, and because they access each other's + * data, Dispatcher should be combined into Broadcast. + * + * @author Yuk-Ming + * + */ + static Map> toStates; + + static + { + // Initialize legal transitions of state machine. + // For each state, define a list of legal states that this state can transition to + toStates = new HashMap>(); + + // Transitions from INSTALLING + toStates.put(BroadcastState.INSTALLING, Arrays.asList( + BroadcastState.RUNNING, // Normal transition + BroadcastState.CANCELING, // User action + BroadcastState.CANCELED, // User action + BroadcastState.HALTING, // User action + BroadcastState.HALTED, // User action + BroadcastState.PURGED, // User action + BroadcastState.ABORTED, // TTS error + BroadcastState.COMPLETED // When recipient list is empty + )); + + // Transitions from RUNNING + toStates.put(BroadcastState.RUNNING, Arrays.asList( + BroadcastState.CANCELING, // User action + BroadcastState.CANCELED, // User action + BroadcastState.HALTING, // User action + BroadcastState.HALTED, // User action + BroadcastState.PURGED, // User action + BroadcastState.ABORTED, // Service provider irrecoverable error + BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. + )); + + // Transitions from CANCELING + toStates.put(BroadcastState.CANCELING, Arrays.asList( + BroadcastState.CANCELED, // User action + BroadcastState.PURGED, // User action + BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. + )); + + // Transitions from HALTING + toStates.put(BroadcastState.HALTING, Arrays.asList( + BroadcastState.RUNNING, // User action + BroadcastState.CANCELED, // User action + BroadcastState.HALTED, + BroadcastState.PURGED, // User action + BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. + )); + + // Transitions from HALTED + toStates.put(BroadcastState.HALTED, Arrays.asList( + BroadcastState.RUNNING, // User action + BroadcastState.CANCELED, // User action + BroadcastState.CANCELING, // User action + BroadcastState.PURGED, // User action + BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. + )); + + } + + public static class StateChangeResult + { + public StateChangeStatus stateChangeStatus; + public BroadcastState currentState; + public BroadcastState previousState; + + public StateChangeResult(StateChangeStatus stateChangeStatus, + BroadcastState currentState, BroadcastState previousState) + { + this.stateChangeStatus = stateChangeStatus; + this.currentState = currentState; + this.previousState = previousState; + } + } + + private BroadcastState state = BroadcastState.INSTALLING; + + String haltReason; + String stateErrorText; + public long changeStateTime; + + protected class Service extends Thread + { + Object serviceProvider; + + protected Service(String name) throws BroadcastException + { + serviceProvider = getInitializedServiceProvider(); + setName(name); + } + + public void run() + { + NDC.push(getName()); + for (;;) + { + if (threadsShouldStop()) + { + closeServiceProvider(serviceProvider); + return; + } + + synchronized (resumeFlag) + { + if (threadsShouldPause()) + { + try + { + resumeFlag.wait(); + } + catch (InterruptedException e) + { + myLogger.warn("Dispatcher thread interrupted while waiting to resume"); + return; + } + } + } + + List batch = null; + + /** + * Includes allocation from capacity. Only returns when the required allocation + * is obtained. Example, RTP port allocation, limit due to total number of allowable calls. + */ + ServicePrerequisites prerequisites = null; + synchronized(readyQueue) + { + // get a batch of jobs + Job job = readyQueue.peek(); + + if (job == null) + try + { + readyQueue.wait(); + continue; + } + catch (InterruptedException e) + { + return; + } + + prerequisites = secureServicePrerequisites(); + + if (threadsShouldStop() || threadsShouldPause()) + { + returnPrerequisites(prerequisites); + continue; + } + + // Now that we can go ahead with this job, let us remove this from queue + readyQueue.poll(); + + batch = new ArrayList(); + batch.add(job); + + // We we are to get a batch of more than one, let us fill in the rest. + for (int i = 1; i < getJobBatchSize(); i++) + { + job = readyQueue.poll(); + if (job == null) break; + batch.add(job); + } + } + if (batch != null && batch.size() > 0) + { + // Mark start time + long now = System.currentTimeMillis(); + for (Job job : batch) + { + job.startTime = now; + } + + // Service the jobs + try + { + processJobs(batch, serviceProvider, prerequisites); + } + catch (EngineException e) + { + terminate(BroadcastState.ABORTED, e.getMessage()); + } + } + } + } + + } + + protected Broadcast(String broadcastType) + { + this.broadcastType = broadcastType; + readyQueue = new LinkedBlockingQueue(); + serviceThreadPool = new ArrayList(); + recipientList = new ArrayList(); + scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); + resumeFlag = new Object(); + } + + protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); + + /** + * Creates and initializes a service provider, to be used by only one service thread. + * If service provider is not thread-specific, then this method may return null, and + * a common service provider is created outside of this method. + * + * @return service provider as a class Object instance. + * @throws BroadcastException + */ + protected abstract Object getInitializedServiceProvider() throws BroadcastException; + + /** + * Obtains the required components to support a service; e.g. RTP port, or a place + * in maximum total number of calls. Does not return till the reequired prerequisites are obtained. + * @return null, if no prerequisite is required, as in the case of email and sms engines. + */ + abstract protected ServicePrerequisites secureServicePrerequisites(); + + abstract public void closeServiceProvider(Object serviceProvider); + + /** + * Makes a state transition to the given newState if the transition from + * the current state is legal. + * @param newState + * @return StateChangeResult + */ + public StateChangeResult setState(BroadcastState newState) + { + return setState(newState, null, null); + } + + /** + * Makes a state transition to the given newState if the transition from + * the current state is legal. + * @param newState + * @return StateChangeResult + */ + public StateChangeResult setState(BroadcastState newState, + String haltReason, String stateErrorText) + { + boolean isLegal; + BroadcastState prev = null; + synchronized (this) + { + if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null); + List to = toStates.get(state); + isLegal = (to == null? false : to.contains(newState)); + prev = state; + if (isLegal) + { + state = newState; + changeStateTime = System.currentTimeMillis(); + } + } + if (isLegal) + { + this.haltReason = haltReason; + this.stateErrorText = stateErrorText; + + CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state)); + if (postBack != null) + { + postBack.queueReport(mkStatusReport()); + } + + return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); + } + else + { + myLogger.warn(String.format("Broadcast %s: Transition from %s to %s forbidden", broadcastId, prev, newState)); + return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null); + } + } + + protected void setBroadcastId(String broadcastId) + { + if (broadcastId == null) + throw new IllegalArgumentException( + "Argument broadcastId in Broadcast.setBroadcastId method cannot be null"); + if (this.broadcastId != null) + throw new IllegalStateException( + "Broadcast.setBroadcastId method cannot be invoked more than once for a Broadcast"); + this.broadcastId = broadcastId; + } + + protected void setLaunchRecordId(String launchRecordId) + { + if (launchRecordId == null) + throw new IllegalArgumentException( + "Argument launchRecordId in Broadcast.setLaunchRecordId method cannot be null"); + if (this.launchRecordId != null) + throw new IllegalStateException( + "Broadcast.setLaunchRecordId method cannot be invoked more than once for a Broadcast"); + this.launchRecordId = launchRecordId; + } + + public String getBroadcastId() + { + return broadcastId; + } + + public String getLaunchRecordId() + { + return launchRecordId; + } + + public String getResponseXML(BroadcastException e) + { + String tagName = broadcastType + "_response"; + StringBuffer responseXML = new StringBuffer("<" + tagName); + if (broadcastId != null && broadcastId.length() > 0) + { + responseXML.append(" broadcast_id=\""); + responseXML.append(broadcastId); + responseXML.append("\""); + } + responseXML.append(" accepted='"); + responseXML.append(e != null || getState() == BroadcastState.COMPLETED ? "FALSE" : "TRUE"); + responseXML.append("'"); + if (e == null) + { + responseXML.append('>'); + } + else + { + if (e.errorCode != null) + { + responseXML.append(" error='"); + responseXML.append(e.errorCode.toString()); + responseXML.append("'"); + } + responseXML.append('>'); + if (e.errorText != null) + { + responseXML.append(""); + responseXML.append(e.errorText); + responseXML.append(""); + } + } + responseXML.append("'); + return responseXML.toString(); + } + + public String getPostBackURL() + { + return postBackURL; + } + + protected String mkResponseXML(String errorCode, String errorText) + { + String tagName = broadcastType + "_response"; + StringBuffer responseXML = new StringBuffer("<" + tagName); + String broadcastId = getBroadcastId(); + if (broadcastId != null && broadcastId.length() > 0) + { + responseXML.append(" broadcast_id=\""); + responseXML.append(broadcastId); + responseXML.append("\""); + } + responseXML.append(" accepted='"); + responseXML.append(errorCode == null ? "TRUE" : "FALSE"); + responseXML.append("'"); + if (errorCode == null) + { + responseXML.append('>'); + } + else + { + responseXML.append(" error='"); + responseXML.append(errorCode); + responseXML.append("'"); + responseXML.append('>'); + if (errorText != null) + { + responseXML.append(""); + responseXML.append(errorText.replaceAll("\\&", "&") + .replaceAll("<", "<")); + responseXML.append(""); + } + } + responseXML.append("'); + return responseXML.toString(); + } + + private boolean stateIsFinal(BroadcastState state) + { + return state == BroadcastState.ABORTED || state == BroadcastState.CANCELED + || state == BroadcastState.COMPLETED || state == BroadcastState.PURGED; + } + + /** + * If finalState is final, then this state is set, and dispatcher threads are stopped. + * Overriding implementation may release all other resources, like timers. + * @param finalState + */ + public void terminate(BroadcastState finalState) + { + terminate(finalState, null); + } + + /** + * If finalState is final, then this state is set, and dispatcher threads are stopped. + * Overriding implementation may release all other resources, like timers. + * @param finalState + */ + public void terminate(BroadcastState finalState, String reason) + { + if (!stateIsFinal(finalState)) throw new IllegalArgumentException("Argument finalState " + finalState + " in Broadcast.terminate method is not final"); + setState(finalState, reason, null); + + // Wake up all dispatcher threads waiting on readyQueue so they will all stop + synchronized(readyQueue) + { + readyQueue.notifyAll(); + } + + // Wake up all sleeping dispatcher threads for same reason. + for(Thread t : serviceThreadPool) + { + try + { + t.interrupt(); + } + catch (Exception e) + { + myLogger.warn("Interrupted while waiting for Thread " + t.getName() + " to terminate"); + } + } + + // Quiesce scheduler, and terminate it. + scheduler.shutdownNow(); + } + + /** + * Creates status report. + * @return status report in XML. + */ + protected String mkStatusReport() + { + StringBuffer statusBf = new StringBuffer(); + String topLevelTag = broadcastType; + statusBf.append("<" + topLevelTag + " broadcast_id='" + getBroadcastId() + + "' recipient_count='" + recipientList.size() + "'"); + if (launchRecordId != null) + { + statusBf.append(" launch_record_id='" + launchRecordId + "'"); + } + BroadcastState broadcastState = getState(); + statusBf.append(">\r\n" + broadcastState + "\r\n"); + if (broadcastState == BroadcastState.HALTED + || broadcastState == BroadcastState.ABORTED) + { + if (haltReason != null) + { + statusBf.append("" + haltReason + + "\r\n"); + } + if (stateErrorText != null) + { + statusBf.append("" + stateErrorText + + ""); + } + } + statusBf.append(" -1) statusBf.append(" active='" + activeCount + "'"); + statusBf.append(">\r\n"); + String statusReport = statusBf.toString(); + return statusReport; + } + + protected void onExpire() + { + } + + protected void setExpireTime(long expireTime) + { + this.expireTime = expireTime; + } + + public long getExpireTime() + { + return expireTime; + } + + /** + * + * @return number of active jobs. -1 if there is no concept of being active. + */ + private int getActiveCount() + { + return remainingJobs - readyQueue.size(); + } + + /** + * Parses broadcastId and return if notInService is true. + * Otherwise, continue parsing postBackUrl, expireTime, recipientList, + * and implementation-specific data from request. + * Avoid throwing an exception before parsing and setting broadcastId. + * @param notInService + * @throws EngineException + */ + protected abstract void decode(HttpServletRequest request, boolean notInService) + throws EngineException; + + /** + * Remembers postBack, and + * Creates thread pool of size dictated by broadcast, which determines the size based + * on the chosen service provider. + * + * Overriding implementation must invoke this method at the end, and process information + * contained in the broadcast, in preparation for the invocation of the process + * method. + * + * If there is no error, the overriding implementation must return this base method. + * + * @param commEngine + * + * @throws BroadcastException + */ + protected final void init(PostBack postBack) + { + // Remember postBack + this.postBack = postBack; + + for (Recipient recipient : recipientList) + { + readyQueue.add(mkJob(recipient)); + } + remainingJobs = readyQueue.size(); + } + + protected abstract void initSync(EngineResources resources) throws BroadcastException; + + protected Job mkJob(Recipient recipient) + { + return new Job(recipient); + } + + /** + * Overriding implementation performs time consuming initialization, after returning + * POST http status indicating accepting broadcast for processing. + * + * @throws BroadcastException + */ + protected void initAsync() throws BroadcastException + { + // Do nothing in base class. + } + + public abstract int getServiceThreadPoolSize(); + + public String getId() + { + return broadcastId; + } + + /** + * Sets the stateMachine to CANCEL + */ + protected void cancel() + { + if (this.getActiveCount() == 0) setState(BroadcastState.CANCELED); + // Sets state to CANCELING, which is monitored by its Broadcast.Service threads. + else setState(BroadcastState.CANCELING); + synchronized(resumeFlag) + { + resumeFlag.notifyAll(); + } + } + + protected void pause() + { + // Sets state to HALTED, which is monitored by Broadcast.Service threads. + setState(BroadcastState.HALTING); + } + + protected void resume() + { + synchronized (resumeFlag) + { + if (threadsShouldPause()) + { + setState(BroadcastState.RUNNING); + resumeFlag.notifyAll(); + } + } + } + + abstract protected JobReport mkJobReport(Job job); + + public void addJob(Job job) + { + synchronized(readyQueue) + { + readyQueue.add(job); + readyQueue.notifyAll(); + } + } + + public void startProcessing() throws BroadcastException + { + // Create dispatcher thread pool + int threadPoolSize = getServiceThreadPoolSize(); + for (int i = 0; i < threadPoolSize; i++) + { + String threadName = broadcastId + "_service_thread_" + i; + Service serviceThread = new Service(threadName); + serviceThreadPool.add(serviceThread); + } + + setState(BroadcastState.RUNNING); + + // Start the dispatcher threads + for (Thread thread : serviceThreadPool) + { + thread.start(); + } + } + + private boolean threadsShouldStop() + { + BroadcastState state = getState(); + return state == BroadcastState.CANCELING || stateIsFinal(state); + } + + private boolean threadsShouldPause() + { + BroadcastState state = getState(); + return state == BroadcastState.HALTED || state == BroadcastState.HALTING; + } + + /* + @Override + public void run() + { + for (;;) + { + if (threadsShouldStop()) return; + + if (threadsShouldPause()) + { + try + { + resumeFlag.wait(); + } + catch (InterruptedException e) + { + myLogger.warn("Dispatcher thread interrupted while waiting to resume"); + return; + } + } + + List batch = null; + synchronized(readyQueue) + { + // get a batch of jobs + Job job = readyQueue.poll(); + + if (job == null) + try + { + readyQueue.wait(); + continue; + } + catch (InterruptedException e) + { + return; + } + batch = new ArrayList(); + batch.add(job); + for (int i = 1; i < getJobBatchSize(); i++) + { + job = readyQueue.poll(); + if (job == null) break; + batch.add(job); + } + } + if (batch != null) + { + try + { + processJobs(batch); + } + catch (EngineException e) + { + terminate(BroadcastState.ABORTED, e.getMessage()); + } + } + } + + } + */ + + /** + * job status is reported back to this broadcast, via the logAndQueueForPostBack method. + * @param batch + * @param prerequisites + */ + abstract protected void processJobs(List batch, Object serviceProvider, ServicePrerequisites prerequisites) + throws EngineException; + + /** + * Size of a batch of jobs to be processed together. For email, this may be more than 1, + * and this method should be overridden. + * @return size of a batch of jobs to be processed together. + */ + protected int getJobBatchSize() + { + return 1; + } + + /** + * Sets jobStatus in job, and post job report. + * If no rescheduling, then decrement number of remainingJobs, + * @param job + * @param jobStatus + * @param errorText + */ + public void postJobStatus(Job job) + { + if (postBack != null) + { + JobReport report = mkJobReport(job); + postBack.queueReport(report.toString()); + } + + if (job.jobStatus.isTerminal()) + { + remainingJobs--; + if (remainingJobs == 0) + { + terminate(BroadcastState.COMPLETED); + } + else if (getActiveCount() == 0) + { + if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); + else if (state == BroadcastState.HALTING) setState(BroadcastState.HALTED); + } + } + } + + /** + * Sets jobStatus in job, and post job report. + * If no rescheduling, then decrement number of remainingJobs, + * @param job + * @param jobStatus + * @param errorText + * @param reschedule - reschedule time in milliseconds (-1 means do not reschedule). + */ + protected void postJobStatus(Job job, long rescheduleTimeMS) + { + postJobStatus(job); + if (rescheduleTimeMS == 0) + { + addJob(job); + } + else if (rescheduleTimeMS > 0) + { + rescheduleJob(job, rescheduleTimeMS); + } + } + + public ScheduledFuture rescheduleJob(final Job job, long rescheduleTimeMS) + { + Runnable r = new Runnable() { public void run() { addJob(job);}}; + return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); + } + + public BroadcastState getState() + { + return state; + } + + +} diff --git a/src/altk/comm/engine/CommEngine.java b/src/altk/comm/engine/CommEngine.java new file mode 100644 index 0000000..7fd1746 --- /dev/null +++ b/src/altk/comm/engine/CommEngine.java @@ -0,0 +1,455 @@ +package altk.comm.engine; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.log4j.Logger; + +import altk.comm.engine.Broadcast.BroadcastState; +import altk.comm.engine.exception.BroadcastError; +import altk.comm.engine.exception.BroadcastException; +import altk.comm.engine.exception.PlatformError; +import altk.comm.engine.exception.PlatformException; +import altk.comm.engine.postback.PostBack; + +public abstract class CommEngine extends HttpServlet +{ + /** + * + */ + private static final long serialVersionUID = 6887055442875818654L; + + static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; + + private static final int SCHEDULER_THREAD_POOL_SIZE = 1; + + private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60; + + /** + * Maps a broadcastId to a broadcast. + */ + private Map broadcasts; + + protected boolean notInService; + + protected Properties config; + + protected Map postBackMap; + + protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice" + + private long startupTimestamp; + + // Sequencing naming of broadcast that fails to yield its broadcastId + private int unknownBroadcastIdNdx = 1; + + private BroadcastException myException; + + /** + * Used to communicate media-specific platform resources to broadcasts + */ + protected EngineResources resources; + + private static Logger myLogger = Logger.getLogger(CommEngine.class); + + private ScheduledExecutorService scheduler; + + private long deadBroadcastViewingMinutes; + + abstract protected Broadcast mkBroadcast(); + + public CommEngine(String engineName) + { + this.engineName = engineName; + broadcasts = new HashMap(); + startupTimestamp = System.currentTimeMillis(); + myException = null; + } + + /** + * Invoked by servlet container during initialization of servlet. + */ + public final void init() + { + myLogger.info("init() invoked"); + // check init parameters + ServletContext servletContext = getServletContext(); + String propertiesFilePath; + + propertiesFilePath = servletContext.getInitParameter(getPropertiesContextName()); + File propertiesFile = new File(propertiesFilePath); + CommonLogger.startup.info("Using configuration file " + propertiesFile.getAbsolutePath()); + config = new Properties(); + try + { + config.load(new FileInputStream(propertiesFile)); + } + catch (FileNotFoundException e) + { + CommonLogger.alarm.fatal("Properties file " + propertiesFile.getAbsolutePath() + " not found -- abort"); + notInService = true; + return; + } + catch (IOException e) + { + CommonLogger.alarm.fatal("Problem in reading properties file " + propertiesFile.getAbsolutePath() + ": " + e.getMessage()); + notInService = true; + return; + } + + postBackMap = new HashMap(); + + // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes + String periodStr = config.getProperty("dead_broadcast_viewing_period", + new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString()); + deadBroadcastViewingMinutes = Long.parseLong(periodStr); + CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); + + scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); + scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}}, + deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES); + + initChild(); + } + + protected void purgeStaleBroadcasts() + { + long now = System.currentTimeMillis(); + for (String id : broadcasts.keySet()) + { + if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000) + { + broadcasts.remove(id); + } + } + + } + + /** + * + * @return name of parameter in Tomcat context file, specifying properties file + * for this SMSEngine. + */ + protected abstract String getPropertiesContextName(); + + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) + { + try + { + Broadcast broadcast = mkBroadcast(); + try + { + broadcast.decode(request, notInService); + if (notInService) + { + throw new PlatformException(PlatformError.RUNTIME_ERROR, + "Not in service"); + } + if (broadcast.recipientList.size() == 0) + { + CommonLogger.activity.info("Broadcast " + broadcast.getBroadcastId() + ": No recipients"); + broadcast.setState(BroadcastState.COMPLETED, "No recipients", null); + return; + } + // Determine postBackUrl + String postBackURL = broadcast.getPostBackURL(); + PostBack postBack = null; + if (postBackURL != null) + { + postBack = postBackMap.get(postBackURL); + if (postBack == null) + { + postBack = new PostBack(postBackURL, broadcast.broadcastType + "_status"); + postBackMap.put(postBackURL, postBack); + } + } + broadcast.initSync(resources); + broadcast.init(postBack); + if (broadcast.getState() == BroadcastState.COMPLETED) return; + } + catch (BroadcastException e) + { + myException = e; + broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); + CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); + myLogger.error("Broadcast aborted", e); + return; + } + catch (Throwable t) + { + // Caught stray unexpected runtime problem + CommonLogger.alarm.error("Broadcast aborted: " + t); + myLogger.error("Broadcast aborted", t); + myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage()); + broadcast.setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText); + } + finally + { + // Put broadcast in broadcasts map. + String broadcastId = broadcast.getBroadcastId(); + if (broadcastId.length() != 0) + { + broadcasts.put(broadcastId, broadcast); + } + else + { + String makeUpId = "Unknown" + unknownBroadcastIdNdx++; + broadcasts.put(makeUpId, broadcast); + } + + // Return regular or error response + String responseXML = broadcast.getResponseXML(myException); + PrintWriter writer = response.getWriter(); + writer.write(responseXML); + writer.close(); + } + try + { + broadcast.initAsync(); + broadcast.startProcessing(); + } + catch (BroadcastException e) + { + broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); + CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); + myLogger.error("Broadcast aborted", e); + } + } + catch (Throwable t) + { + // Caught stray unexpected runtime problem + CommonLogger.alarm.error("Broadcast aborted: " + t.getMessage()); + myLogger.error("Broadcast aborted", t); + } + } + + /** + * Functions covered are + * get=status + * get=cancel_broadcast&broadcast_id=XXX + */ + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + { + PrintWriter out; + try + { + out = response.getWriter(); + } + catch (IOException e) + { + CommonLogger.alarm.error("Cannot write a reply: " + e); + return; + } + String get = (String)request.getParameter("get"); + if (get == null) + { + // Return http status BAD REQUEST + int httpStatus = HttpServletResponse.SC_BAD_REQUEST; + try + { + response.sendError(httpStatus); + } + catch (IOException e) + { + myLogger.warn("Unnable to return HTTP error code " + httpStatus); + } + return; + } + if (get.equalsIgnoreCase("status")) + { + getStatus(request, out); + } + else if (get.equalsIgnoreCase("cancel_broadcast")) + { + cancelBroadcast(request, out); + } + else if (get.equalsIgnoreCase("pause_broadcast")) + { + pauseBroadcast(request, out); + } + else if (get.equalsIgnoreCase("resume_broadcast")) + { + resumeBroadcast(request, out); + } + else + { + out.write(get + " not supported"); + } + out.close(); + } + + private void cancelBroadcast(HttpServletRequest request, PrintWriter out) + { + // Get broadcastId from request + String broadcastId = getBroadcastId(request); + Broadcast broadcast = broadcasts.get(broadcastId); + if (broadcast == null) + { + out.format("Broadcast %s does not exist", broadcastId); + return; + } + broadcast.cancel(); + } + + protected void pauseBroadcast(HttpServletRequest request, PrintWriter out) + { + // Get broadcastId from request + String broadcastId = getBroadcastId(request); + Broadcast broadcast = broadcasts.get(broadcastId); + if (broadcast == null) + { + out.format("Broadcast %s does not exist", broadcastId); + return; + } + broadcast.pause(); + } + + + protected void resumeBroadcast(HttpServletRequest request, PrintWriter out) + { + // Get broadcastId from request + String broadcastId = getBroadcastId(request); + Broadcast broadcast = broadcasts.get(broadcastId); + if (broadcast == null) + { + out.format("Broadcast %s does not exist", broadcastId); + return; + } + broadcast.resume(); + } + + /** + * + * status of each broadcast + * tttnnn + * + */ + private void getStatus(HttpServletRequest request, PrintWriter out) + { + String broadcastId = request.getParameter("broadcast_id"); + if (broadcastId != null) + { + broadcastId = broadcastId.trim(); + if (broadcastId.length() == 0) + { + out.write("broadcast_id request parameter cannot be empty"); + return; + } + Broadcast broadcast = broadcasts.get(broadcastId); + if (broadcast == null) + { + out.write("No such broadcast"); + } + else + { + out.write(broadcast.mkStatusReport()); + } + return; + } + else + { + String tag = engineName + "_status"; + out.write("<" + tag + ">\r\n"); + + out.write("" + startupTimestamp + + "\r\n"); + + // First get a copy of broadcasts, to avoid mutex deadlock. + Vector broadcastList = new Vector(); + synchronized(broadcasts) + { + for (String key : broadcasts.keySet()) + { + broadcastList.add(broadcasts.get(key)); + } + } + // We have released the lock. + // Then append status of each broadcast to outBuf. + for (Broadcast broadcast : broadcastList) + { + out.write(broadcast.mkStatusReport()); + } + + out.write(""); + } + } + + public void removeBroadcast(String broadcastId) + { + CommonLogger.activity.info("Removing broadcast " + broadcastId); + synchronized(broadcasts) + { + broadcasts.remove(broadcastId); + } + } + + protected int getRemainingJobCount() + { + // TODO + return 0; + } + + public boolean notInService() + { + return notInService; + } + + /** + * Decode http GET request for broadcast_id value + * @param request + * @return broadcast_id + */ + private String getBroadcastId(HttpServletRequest request) + { + return request.getParameter("broadcast_id"); + } + + /** + * Invoked by servlet container when servlet is destroyed. + */ + public final void destroy() + { + System.out.println(engineName + " destroyed"); + + // Kill threads in each PostBack, which is remembered in postBackMap. + for (PostBack postback : postBackMap.values()) + { + postback.terminate(); + } + + for (Broadcast broadcast : broadcasts.values()) + { + broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); + } + + destroyChild(); + } + + /** + * Indirectly invoked by servlet container during servlet initialization. + */ + abstract protected void initChild(); + + /** + * Indirectly invoked by serlet container during destruction of servlet. + */ + abstract protected void destroyChild(); +} diff --git a/src/altk/comm/engine/CommonLogger.java b/src/altk/comm/engine/CommonLogger.java new file mode 100644 index 0000000..089b116 --- /dev/null +++ b/src/altk/comm/engine/CommonLogger.java @@ -0,0 +1,11 @@ +package altk.comm.engine; + +import org.apache.log4j.Logger; + +public class CommonLogger +{ + static public final Logger startup = Logger.getLogger("startup.console"); + static public final Logger alarm = Logger.getLogger("alarm.console"); + static public final Logger activity = Logger.getLogger("activity.console"); + public static final Logger health = Logger.getLogger("health"); +} diff --git a/src/altk/comm/engine/EngineResources.java b/src/altk/comm/engine/EngineResources.java new file mode 100644 index 0000000..91b1d2d --- /dev/null +++ b/src/altk/comm/engine/EngineResources.java @@ -0,0 +1,10 @@ +package altk.comm.engine; + +/** + * Base class to transmit engine resources from engine to broadcast. + * @author Yuk-Ming + * + */ +public class EngineResources +{ +} diff --git a/src/altk/comm/engine/Job.java b/src/altk/comm/engine/Job.java new file mode 100644 index 0000000..02e28b2 --- /dev/null +++ b/src/altk/comm/engine/Job.java @@ -0,0 +1,85 @@ +package altk.comm.engine; + +/** + * Derived classes may add more class attributes, e.g. list of phone numbers, call status. + * @author Yuk-Ming + * + */ +public class Job +{ + /** + * + */ + static public enum JobStatus + { + READY(false), + TRYING(false), + GO_NEXT_PHONE(false), + TRUNK_ERROR(false), + LONG_DURATION(true), + SUCCESS(true), + NO_MORE_RETRY(true), + RECIPIENT_ADDR_ERROR(true), + SERVICE_PROVIDER_PROTOCOL_ERROR(true), + SERVICE_PROVIDER_GENERATED_ERROR(true), + SERVICE_PROVIDER_HTTP_ERROR(true), + SERVICE_PROVIDER_CONNECT_ERROR(true), + SERVICE_PROVIDER_OTHER_ERROR(true), + CANCELED(true), + EXPIRED(true), + MESSAGE_ERROR(true), + SERVICE_ACCESS_BLOCKED(true), + PLATFORM_ERROR(true), + FAILED(true), // for + UNKNOWN_ERROR(true), + ABORT(true); + + private boolean isTerminal; + + private JobStatus(boolean isTerminal) + { + this.isTerminal = isTerminal; + } + + public boolean isTerminal() + { + return isTerminal; + } + + } + + public Recipient recipient; + + public long startTime; + + public JobStatus jobStatus; + + public String errorText; + + public Job(Recipient recipient) + { + this.recipient = recipient; + jobStatus = JobStatus.READY; + } + + public void setStatus(JobStatus jobStatus) + { + this.jobStatus = jobStatus; + } + + public void setStatus(JobStatus jobStatus, String errorText) + { + this.jobStatus = jobStatus; + this.errorText = errorText; + } + + /** + * + * @return error in text form. + */ + public String getErrorText() + { + return errorText; + } + +} diff --git a/src/altk/comm/engine/JobReport.java b/src/altk/comm/engine/JobReport.java new file mode 100644 index 0000000..e4b78b7 --- /dev/null +++ b/src/altk/comm/engine/JobReport.java @@ -0,0 +1,91 @@ +package altk.comm.engine; + +abstract public class JobReport +{ + private static final String ACTIVITY_RECORD_ID_NAME = "activity_record_id"; + public final long reportTime; + public final String broadcastId; + public final String launchRecordId; + public final String contactId; + public final String recordId; // id into table, e.g. call_record, sms_record, etc. + public final String jobStatus; // Note: not enum + public final String errorText; + public long startTime; + + public JobReport(Job job, String broadcastId, String launchRecordId) + { + if (broadcastId == null || broadcastId.length() == 0) + { + throw new IllegalArgumentException("JobReport given null or empty broadcastId"); + } + if (launchRecordId == null || launchRecordId.length() == 0) + { + throw new IllegalArgumentException("JobReport given null or empty launchRecordId"); + } + + this.broadcastId = broadcastId; + this.launchRecordId = launchRecordId; + startTime = job.startTime; + contactId = job.recipient.contact_id; + recordId = job.recipient.activity_record_id; + jobStatus = job.jobStatus.toString(); + errorText = job.errorText; + reportTime = System.currentTimeMillis(); + } + + /** + * @return "email" for example. + */ + abstract protected String getXMLRootNodeName(); + + /** + * @return "email_record_id" for example. + abstract protected String getActivityRecordIdname(); + */ + + public String toString() + { + StringBuffer xml = new StringBuffer(); + appendXML(xml); + return xml.toString(); + } + + public final StringBuffer appendXML(StringBuffer xml) + { + xml.append("<" + getXMLRootNodeName() + " broadcast_id=\"" + broadcastId + + "\" launch_record_id=\"" + launchRecordId + + "\" " + getActivityRecordIdName() + "=\"" + recordId + + "\" contact_id=\"" + contactId + + "\" recipient_status=\"" + jobStatus + "\" >\r\n"); + xml.append("" + startTime/1000 + "\r\n"); + xml = appendSpecificXML(xml); + if (errorText != null && errorText.length() > 0) + { + xml.append(""); + xml.append(errorText.replaceAll("&", "&").replaceAll("<", "<")); + xml.append("\r\n"); + } + xml.append(""); + return xml; + + } + + /** + * Derived class may override this method to re-define name of activity_record_id, + * e.g. email_record_id, sms_record_id, etc. + * @return + */ + protected String getActivityRecordIdName() + { + return ACTIVITY_RECORD_ID_NAME; + } + + /** + * Append data to xml which is specific to the derived class. For example, email + * address for EmailJobReport. + * @param xml + * @return + */ + protected abstract StringBuffer appendSpecificXML(StringBuffer xml); + +} diff --git a/src/altk/comm/engine/Recipient.java b/src/altk/comm/engine/Recipient.java new file mode 100644 index 0000000..9ac23a2 --- /dev/null +++ b/src/altk/comm/engine/Recipient.java @@ -0,0 +1,37 @@ +package altk.comm.engine; + +import java.util.Map; + +public class Recipient +{ + /** + * contact_id in client database + */ + public final String contact_id; + + /** + * E.g. email_record_id, call_record_id, sms_record_id. + */ + public final String activity_record_id; + + /** + * Like names, address, polling places, etc. + */ + public final Map attributes; + + /** + * + * @param contact_id cannot be empty + * @param activity_record_id cannot be empty + * @param attributes may be null + */ + public Recipient(String contact_id, String activity_record_id, Map attributes) + { + if (contact_id == null || (contact_id=contact_id.trim()).length() == 0) throw new IllegalArgumentException("empty contact_id given to Recipient"); + if (activity_record_id == null || (activity_record_id=activity_record_id.trim()).length() == 0) throw new IllegalArgumentException("empty activity_record_id given to Recipient"); + this.contact_id = contact_id; + this.activity_record_id = activity_record_id; + this.attributes = attributes; + } + +} diff --git a/src/altk/comm/engine/ServicePrerequisites.java b/src/altk/comm/engine/ServicePrerequisites.java new file mode 100644 index 0000000..f8b412d --- /dev/null +++ b/src/altk/comm/engine/ServicePrerequisites.java @@ -0,0 +1,11 @@ +package altk.comm.engine; + +/** + * Base class to transmit allocated resources necessary to service a job + * @author Yuk-Ming + * + */ +public class ServicePrerequisites +{ + +} diff --git a/src/altk/comm/engine/ServiceProviderFactoryException.java b/src/altk/comm/engine/ServiceProviderFactoryException.java new file mode 100644 index 0000000..d791b52 --- /dev/null +++ b/src/altk/comm/engine/ServiceProviderFactoryException.java @@ -0,0 +1,11 @@ +package altk.comm.engine; + +@SuppressWarnings("serial") +public class ServiceProviderFactoryException extends Exception +{ + public ServiceProviderFactoryException(String arg0) + { + super(arg0); + } + +} diff --git a/src/altk/comm/engine/Util.java b/src/altk/comm/engine/Util.java new file mode 100644 index 0000000..cec41a6 --- /dev/null +++ b/src/altk/comm/engine/Util.java @@ -0,0 +1,72 @@ +package altk.comm.engine; + +import java.util.Properties; + +public class Util +{ + static public String getStringParameter(String name, Properties config) + { + return getStringParameter(name, config, null); + } + + static public String getStringParameter(String name, Properties config, StringBuffer errorText) + { + String value = config.getProperty(name); + if (value == null || (value=value.trim()).length() == 0) + { + if (errorText == null) return null; + + if (errorText.length() > 0) errorText.append(", "); + errorText.append("Missing parameter " + name); + return null; + } + return value; + } + + static public int getIntegerParameter(String name, Properties config) + { + return getIntegerParameter(name, config, null); + } + + static public int getIntegerParameter(String name, Properties config, StringBuffer errorText) + { + String value = getStringParameter(name, config, errorText); + if (value != null) + { + try + { + return Integer.parseInt(value); + } + catch (Exception e) + { + if (errorText != null) errorText.append("Parameter " + name + " not integer"); + return 0; + } + } + return 0; + } + + static public boolean getBooleanParameter(String name, Properties config) + { + return getBooleanParameter(name, config, null); + } + + static public boolean getBooleanParameter(String name, Properties config, StringBuffer errorText) + { + String value = getStringParameter(name, config, errorText); + if (value != null) + { + try + { + return Boolean.parseBoolean(value); + } + catch (Exception e) + { + errorText.append("Parameter " + name + " not integer"); + return false; + } + } + return false; + } + +} diff --git a/src/altk/comm/engine/XMLDOMBroadcast.java b/src/altk/comm/engine/XMLDOMBroadcast.java new file mode 100644 index 0000000..3938cdd --- /dev/null +++ b/src/altk/comm/engine/XMLDOMBroadcast.java @@ -0,0 +1,416 @@ +package altk.comm.engine; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; +import java.util.zip.ZipInputStream; + +import javax.servlet.http.HttpServletRequest; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NamedNodeMap; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +import altk.comm.engine.exception.BroadcastError; +import altk.comm.engine.exception.BroadcastException; +import altk.comm.engine.exception.BroadcastMsgException; +import altk.comm.engine.exception.EngineException; +import altk.comm.engine.exception.InternalErrorException; +import altk.comm.engine.exception.NonExistentException; +import altk.comm.engine.exception.PlatformError; +import altk.comm.engine.exception.PlatformException; + +/** + * Abstract class extending Broadcast by providing an implementation of the decode + * method to parse HTTP POST input into XML DOM, and extracting the required class + * attributes broadcast_id, launch_record_id, expire_time, postback and recipientList. + * @author Yuk-Ming + * + */ +public abstract class XMLDOMBroadcast extends Broadcast +{ + protected XPath xpathEngine; + protected DocumentBuilder builder; + protected Element broadcastNode; + + protected XMLDOMBroadcast(String broadcastType) + { + super(broadcastType); + } + + /** + * Sets up XMLDoc and parses broadcast_id, expire_time, postBackUrl and recipientList. + * Derived class cannot override this method, which is final. + * @throws EngineException + */ + @Override + protected final void decode(HttpServletRequest request, boolean notInService) throws EngineException + { + try + { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + builder = factory.newDocumentBuilder(); + xpathEngine = XPathFactory.newInstance().newXPath(); + } + catch (Exception e) + { + CommonLogger.alarm.error(e.getMessage()); + throw new PlatformException(PlatformError.RUNTIME_ERROR, e.getMessage()); + } + + + // parse request into multiple BroadcastSMS pass then to Dispatcher + Document xmlDoc; + try + { + InputStream inb; + String contentType = request.getContentType(); + if (contentType != null && contentType.split("/")[1].equalsIgnoreCase("zip")) + { + ZipInputStream zip = new ZipInputStream(request.getInputStream()); + zip.getNextEntry(); + inb = zip; + } + else + { + inb = request.getInputStream(); + } + int contentLength = request.getContentLength(); + CommonLogger.activity.info("Receiving " + contentLength + " bytes of data"); + byte[] content = new byte[contentLength]; + int offset = 0; + int length = contentLength; + int read; + while (length > 0) + { + read = inb.read(content, offset, length); + if (read < 0) break; + offset += read; + length -= read; + } + CommonLogger.activity.info("Received: " + new String(content, "UTF-8")); + xmlDoc = builder.parse(new ByteArrayInputStream(content), "UTF-8"); + } + catch (SAXException e) + { + throw new BroadcastException(BroadcastError.BAD_REQUEST_DATA, e.getMessage()); + } + catch (IOException e) + { + throw new BroadcastException(BroadcastError.READ_REQUEST_ERROR, "Problem in reading request"); + } + + //String xpath = "//" + broadcastName; // get all first level elements + NodeList broadcastNodes = null; + broadcastNodes = xmlDoc.getElementsByTagName(broadcastType); + if (broadcastNodes.getLength() == 0) + { + throw new BroadcastException(BroadcastError.BAD_REQUEST_DATA, "No <" + broadcastType + "> tag"); + } + + if (notInService) return; + + // Only one broadcast node is processed. The rest are silently ignored. + broadcastNode = (Element)broadcastNodes.item(0); + if (!broadcastType.equals(broadcastNode.getNodeName())) + { + throw new BroadcastMsgException("Broadcast node name does not match " + broadcastType); + } + + // broadcast_id + String broadcastId = broadcastNode.getAttribute("broadcast_id"); + if (broadcastId.length() == 0) + { + throw new BroadcastMsgException("Missing broadcast_id"); + } + setBroadcastId(broadcastId); + + // expireTime + long now = System.currentTimeMillis(); + try + { + long expireTime = getLongValue("expire_time", broadcastNode); + // defaults to 20 minutes + myLogger.debug("expire_time decoded to be " + expireTime); + if (expireTime == 0) expireTime = now + 20 * 60 * 1000; + myLogger.debug("expire time adjusted to be " + expireTime); + setExpireTime(expireTime); + + setLaunchRecordId(broadcastNode.getAttribute("launch_record_id")); + } + catch (Exception e) + { + throw new BroadcastMsgException("Missing or bad expire_time"); + } + + // Postback + postBackURL = getStringValue("async_status_post_back", broadcastNode); + if (postBackURL != null && (postBackURL=postBackURL.trim()).length() == 0) postBackURL = null; + if (postBackURL == null) + { + CommonLogger.alarm.warn("Missing asyn_status_post_back in POST data"); + } + + NodeList recipientNodes = null; + String xpath = "recipient_list/recipient"; + try + { + recipientNodes = (NodeList) xpathEngine.evaluate(xpath, + broadcastNode, XPathConstants.NODESET); + } + catch (XPathExpressionException e) + { + throw new InternalErrorException("Bad xpath 'recipient_list/recipient", e); + } + Element recipientNode; + String contact_id; + String activity_record_id; + xpath = "email"; + for (int i = 0; i < recipientNodes.getLength(); i++) + { + recipientNode = (Element)recipientNodes.item(i); + contact_id = recipientNode.getAttribute("contact_id"); + if (contact_id == null || (contact_id=contact_id.trim()).length() == 0) + { + myLogger.warn("Missing or empty contact_id for a recipient in broadcast " + getBroadcastId()); + continue; + } + activity_record_id = recipientNode.getAttribute(getActivityRecordIdName()); + if (activity_record_id == null || (activity_record_id = activity_record_id.trim()).length() == 0) + { + throw new BroadcastMsgException("Missing or empty " + getActivityRecordIdName() + " attribute for a recipient in broadcast " + getBroadcastId()); + } + + Map attributes = new HashMap(); + NodeList children = recipientNode.getChildNodes(); + for (int j = 0; j < children.getLength(); j++) + { + Node child = children.item(j); + if (child.getNodeType() != Node.ELEMENT_NODE) continue; + attributes.put(child.getNodeName(), child.getTextContent()); + } + recipientList.add(new Recipient(contact_id, activity_record_id, attributes)); + } + } + + /** + * Gets String value of child with given name. + * @param childNodeName + * @param element + * @return null if no such child exists. + * @throws IllegalArgumentException when element is null + */ + protected String getStringValue(String childNodeName, Element element) + throws IllegalArgumentException + { + if (element == null) + { + throw new IllegalArgumentException("Element cannot be null"); + } + NodeList children = element.getChildNodes(); + for (int i = 0; i < children.getLength(); i++) + { + Node child = children.item(i); + if (child.getNodeType() == Node.ELEMENT_NODE && + child.getNodeName().equals(childNodeName)) + { + return child.getTextContent(); + } + } + return null; + } + + /** + * + * @param childNodeName + * @param element + * @return + * @throws IllegalArgumentException when element is null + */ + protected String getNonNullStringValue(String childNodeName, Element element) + throws IllegalArgumentException + { + String value = getStringValue(childNodeName, element); + return value==null?"":value; + } + + /** + * + * @param childNodeName + * @param element + * @return + * @throws IllegalArgumentException when element is null + */ + public boolean getBooleanValue(String childNodeName, Element element) + throws IllegalArgumentException + { + String str = getStringValue(childNodeName, element); + if (str == null) return false; + return Boolean.parseBoolean(str.trim()); + } + + /** + * + * @param childNodeName + * @param element + * @return + * @throws NonExistentException + * @throws NumberFormatException + * @throws IllegalArgumentException when element is null + */ + protected int getIntValue(String childNodeName, Element element) + throws NonExistentException, NumberFormatException, IllegalArgumentException + { + if (element == null) + { + throw new IllegalArgumentException("Element cannot be null, when invoking getIntValue method"); + } + try + { + String str = getStringValue(childNodeName, element); + if (str == null) + { + throw new NonExistentException("No child \"" + childNodeName + "\" exists for element \"" + xml2Str(element) + "\""); + } + return Integer.parseInt(str); + } + catch (NumberFormatException e) + { + throw new NumberFormatException("Value of child \"" + childNodeName + "\" not integer for element \"" + xml2Str(element) + "\""); + } + } + + /** + * + * @param childNodeName + * @param element + * @return + * @throws NonExistentException + * @throws NumberFormatException + * @throws IllegalArgumentException when element is null + */ + protected Long getLongValue(String childNodeName, Element element) + throws NonExistentException, NumberFormatException, IllegalArgumentException + { + if (element == null) + { + throw new IllegalArgumentException("Element cannot be null, when invoking getIntValue method"); + } + try + { + String str = getStringValue(childNodeName, element); + if (str == null) + { + throw new NonExistentException("No child \"" + childNodeName + "\" exists for element \"" + xml2Str(element) + "\""); + } + return Long.parseLong(str); + } + catch (NumberFormatException e) + { + throw new NumberFormatException("Value of child \"" + childNodeName + "\" not integer for element \"" + xml2Str(element) + "\""); + } + } + + protected NodeList getNodeList(String xpath, Element element) + { + try + { + NodeList nodes = (NodeList) xpathEngine.evaluate(xpath, element, XPathConstants.NODESET); + return nodes; + } + catch (XPathExpressionException e) + { + throw new InternalErrorException("Improper xpath \"" + xpath + "\"", e); + } + } + + protected Node getNode(String xpath, Element element) + { + try + { + Node node = (Node) xpathEngine.evaluate(xpath, element, XPathConstants.NODE); + return node; + } + catch (XPathExpressionException e) + { + throw new InternalErrorException("Improper xpath \"" + xpath + "\"", e); + } + } + + static public void appendXML(Element topElement, StringBuffer buf) + { + // starting tag + buf.append('<'); + String elementName = topElement.getNodeName(); + buf.append(elementName); + + // handle attributes first + NamedNodeMap attributes = topElement.getAttributes(); + for (int i = 0; i < attributes.getLength(); i++) + { + Node attr = attributes.item(i); + buf.append(' '); + buf.append(attr.getNodeName()); + buf.append("=\""); + buf.append(attr.getNodeValue()); + buf.append("\""); + } + buf.append('>'); + + // handling text + Vector childElements = new Vector(); + NodeList children = topElement.getChildNodes(); + for (int i = 0; i < children.getLength(); i++) + { + Node node = children.item(i); + switch (node.getNodeType()) + { + case Node.ELEMENT_NODE: + // defer handling + childElements.add(node); + break; + case Node.TEXT_NODE: + buf.append(node.getNodeValue()); + break; + default: + } + } + // handling children elements + for (Node child : childElements) + { + appendXML((Element)child, buf); + } + // ending tag + buf.append("'); + } + + public String xml2Str(Element element) + { + StringBuffer buf = new StringBuffer(); + appendXML(element, buf); + return buf.toString(); + } + + /** + * Derived class may override this method to redefine the name of activity_record_id, + * e.g. email_record_id, sms_record_id, etc. + * @return name of activity_record_9d. + */ + protected String getActivityRecordIdName() + { + return "activity_record_id"; + } + +} diff --git a/src/altk/comm/engine/exception/BroadcastError.java b/src/altk/comm/engine/exception/BroadcastError.java new file mode 100644 index 0000000..1ca8dbd --- /dev/null +++ b/src/altk/comm/engine/exception/BroadcastError.java @@ -0,0 +1,12 @@ +package altk.comm.engine.exception; + +public enum BroadcastError +{ + EXPIRED, + READ_REQUEST_ERROR, + BAD_REQUEST_DATA, + INTERNAL_ERROR, + CONFIGURAION_ERROR, + PLATFORM_ERROR + +} diff --git a/src/altk/comm/engine/exception/BroadcastException.java b/src/altk/comm/engine/exception/BroadcastException.java new file mode 100644 index 0000000..edc253d --- /dev/null +++ b/src/altk/comm/engine/exception/BroadcastException.java @@ -0,0 +1,76 @@ +package altk.comm.engine.exception; + + +@SuppressWarnings("serial") +public class BroadcastException extends EngineException +{ + public final BroadcastError errorCode; + + public BroadcastException(BroadcastError errorCode, String errorText) + { + this(errorCode, errorText, null); + } + + public BroadcastException(BroadcastError errorCode, String errorText, Throwable t) + { + super(errorCode.toString(), errorText, t); + this.errorCode = errorCode; + } + + /* + public BroadcastException(Broadcast broadcast, BroadcastError errorCode, String errorText) + { + this(broadcast.broadcastName, broadcast.broadcastId, errorCode, errorText); + } + + public BroadcastException(String broadcastName, String broadcastId, + BroadcastError errorCode, String errorText) + { + this(broadcastName, broadcastId, errorCode, errorText, null); + } + + public BroadcastException(Broadcast broadcast, BroadcastError errorCode, String errorText, Throwable t) + { + this(broadcast.broadcastName, broadcast.broadcastId, errorCode, errorText, t); + } + + public BroadcastException(String broadcastName, String broadcastId, + BroadcastError errorCode, String errorText, Throwable t) + { + super(errorCode.toString() + ": " + errorText, t); + this.broadcastName = broadcastName; + this.broadcastId = broadcastId; + this.errorCode = errorCode; + this.errorText = errorText; + } + + public String mkResponseXML() + { + StringBuffer responseXML = new StringBuffer("<" + broadcastName + "_response"); + if (broadcastId != null && broadcastId.length() > 0) + { + responseXML.append(" broadcast_id=\""); + responseXML.append(broadcastId); + responseXML.append("\""); + } + if (errorCode != null) + { + responseXML.append(" error=\""); + responseXML.append(errorCode.toString()); + responseXML.append("\""); + if (errorText != null) + { + responseXML.append(">"); + responseXML.append(errorText); + responseXML.append(""); + } + else + { + responseXML.append("/>"); + } + } + return responseXML.toString(); + } + */ + +} diff --git a/src/altk/comm/engine/exception/BroadcastExpiredException.java b/src/altk/comm/engine/exception/BroadcastExpiredException.java new file mode 100644 index 0000000..2e2f192 --- /dev/null +++ b/src/altk/comm/engine/exception/BroadcastExpiredException.java @@ -0,0 +1,12 @@ +package altk.comm.engine.exception; + + +@SuppressWarnings("serial") +public class BroadcastExpiredException extends BroadcastException +{ + public BroadcastExpiredException(String errorText) + { + super(BroadcastError.EXPIRED, errorText); + } + +} diff --git a/src/altk/comm/engine/exception/BroadcastInternalErrorException.java b/src/altk/comm/engine/exception/BroadcastInternalErrorException.java new file mode 100644 index 0000000..6de7cf8 --- /dev/null +++ b/src/altk/comm/engine/exception/BroadcastInternalErrorException.java @@ -0,0 +1,13 @@ +package altk.comm.engine.exception; + +import altk.comm.engine.Broadcast; + +@SuppressWarnings("serial") +public class BroadcastInternalErrorException extends BroadcastException +{ + public BroadcastInternalErrorException(Broadcast broadcast, String errorText) + { + super(BroadcastError.INTERNAL_ERROR, errorText); + } + +} diff --git a/src/altk/comm/engine/exception/BroadcastMsgException.java b/src/altk/comm/engine/exception/BroadcastMsgException.java new file mode 100644 index 0000000..9c2e5aa --- /dev/null +++ b/src/altk/comm/engine/exception/BroadcastMsgException.java @@ -0,0 +1,30 @@ +package altk.comm.engine.exception; + + +@SuppressWarnings("serial") +public class BroadcastMsgException extends BroadcastException +{ + + public BroadcastMsgException(String errorText) + { + this(errorText, null); + } + + public BroadcastMsgException(String errorText, Throwable t) + { + super(BroadcastError.BAD_REQUEST_DATA, errorText, t); + } + + /* + public BroadcastMsgException(Broadcast broadcast, String errorText) + { + super(broadcast, BroadcastError.BAD_REQUEST_DATA, errorText); + } + + public BroadcastMsgException(Broadcast broadcast, String errorText, Throwable t) + { + super(broadcast, BroadcastError.BAD_REQUEST_DATA, errorText, t); + } + */ + +} diff --git a/src/altk/comm/engine/exception/EngineException.java b/src/altk/comm/engine/exception/EngineException.java new file mode 100644 index 0000000..b2211b4 --- /dev/null +++ b/src/altk/comm/engine/exception/EngineException.java @@ -0,0 +1,28 @@ +package altk.comm.engine.exception; + +/** + * Base class of Exceptions used in this platform. Derived classes use + * their own enum for errorCode. + * + * @author Yuk-Ming + * + */ +@SuppressWarnings("serial") +public abstract class EngineException extends Exception +{ + public final String errorCodeText; + public final String errorText; + + public EngineException(String errorCodeText, String errorText) + { + this(errorCodeText, errorText, null); + } + + public EngineException(String errorCodeText, String errorText, Throwable cause) + { + super(errorCodeText + ": " + errorText, cause); + this.errorCodeText = errorCodeText; + this.errorText = errorText; + } + +} diff --git a/src/altk/comm/engine/exception/InternalErrorException.java b/src/altk/comm/engine/exception/InternalErrorException.java new file mode 100644 index 0000000..5efe1c2 --- /dev/null +++ b/src/altk/comm/engine/exception/InternalErrorException.java @@ -0,0 +1,19 @@ +package altk.comm.engine.exception; + +@SuppressWarnings("serial") +public class InternalErrorException extends RuntimeException +{ + public InternalErrorException(Throwable t) + { + super(t); + } + public InternalErrorException(String message) + { + super(message); + } + + public InternalErrorException(String message, Exception e) + { + super(message, e); + } +} diff --git a/src/altk/comm/engine/exception/NonExistentException.java b/src/altk/comm/engine/exception/NonExistentException.java new file mode 100644 index 0000000..73f975c --- /dev/null +++ b/src/altk/comm/engine/exception/NonExistentException.java @@ -0,0 +1,10 @@ +package altk.comm.engine.exception; + +@SuppressWarnings("serial") +public class NonExistentException extends Exception +{ + public NonExistentException(String msg) + { + super(msg); + } +} diff --git a/src/altk/comm/engine/exception/PlatformError.java b/src/altk/comm/engine/exception/PlatformError.java new file mode 100644 index 0000000..1f54f34 --- /dev/null +++ b/src/altk/comm/engine/exception/PlatformError.java @@ -0,0 +1,8 @@ +package altk.comm.engine.exception; + +public enum PlatformError +{ + INTERNAL_ERROR, + CONFIGURATION_ERROR, + RUNTIME_ERROR +} diff --git a/src/altk/comm/engine/exception/PlatformException.java b/src/altk/comm/engine/exception/PlatformException.java new file mode 100644 index 0000000..682e38a --- /dev/null +++ b/src/altk/comm/engine/exception/PlatformException.java @@ -0,0 +1,18 @@ +package altk.comm.engine.exception; + +@SuppressWarnings("serial") +public class PlatformException extends EngineException +{ + public final PlatformError errorCode; + + public PlatformException(PlatformError errorCode, String errorText) + { + this(errorCode, errorText, null); + } + + public PlatformException(PlatformError errorCode, String errorText, Throwable t) + { + super(errorCode.toString(), errorText, t); + this.errorCode = errorCode; + } +} diff --git a/src/altk/comm/engine/exception/ServiceException.java b/src/altk/comm/engine/exception/ServiceException.java new file mode 100644 index 0000000..eb68ef7 --- /dev/null +++ b/src/altk/comm/engine/exception/ServiceException.java @@ -0,0 +1,21 @@ +package altk.comm.engine.exception; + +import altk.comm.engine.Job.JobStatus; + + +@SuppressWarnings("serial") +public class ServiceException extends EngineException +{ + public final JobStatus status; + + public ServiceException(JobStatus status, String errorText) + { + this(status, errorText, null); + } + + public ServiceException(JobStatus status, String errorText, Throwable t) + { + super(status.toString(), errorText, t); + this.status = status; + } +} diff --git a/src/altk/comm/engine/postback/PostBack.java b/src/altk/comm/engine/postback/PostBack.java new file mode 100644 index 0000000..9e68ec3 --- /dev/null +++ b/src/altk/comm/engine/postback/PostBack.java @@ -0,0 +1,416 @@ +package altk.comm.engine.postback; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.apache.commons.httpclient.ConnectTimeoutException; +import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.StringRequestEntity; +import org.apache.commons.httpclient.params.HttpMethodParams; +import org.apache.log4j.Logger; +import org.w3c.dom.Document; +import org.w3c.dom.Node; + +import altk.comm.engine.CommonLogger; + + +/** + * Queues JobReports to be posted back to attribute postBackURL. + * Multiple internal class Sender members consume this postQueue, sending items + * in postQueue to postBackURL. + * + * In the future, if postBackURL has problem, or if + * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing + * everything to backingFile. + * + * @author Kwong + * + */ +public class PostBack +{ + private static final String XML_VERSION_1_0_ENCODING_UTF_8 = ""; + + private static final int QUEUE_WAIT = 300; // seconds + private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds + private static final int THREADPOOL_SIZE_DEFAULT = 2; + private static final int MAX_QUEUE_SIZE_DEFAULT = 10000; + private static final int MAX_BATCH_SIZE_DEFAULT = 100; + + private final String postBackURL; + private final String xmlTopElement; + private Queue postQueue; + private final int maxQueueSize; + private final int senderPoolSize; + private List senderPool; + private final String myName; + private int maxBatchSize; + + private static Logger myLogger = Logger.getLogger(PostBack.class); + + public enum PostBackStatus + { + SUCCESS, + SERVER_IO_ERROR, + IRRECOVERABLE_ERROR, + HTTP_STATUS_ERROR + } + + class Sender extends Thread + { + private boolean threadShouldStop; + + private Sender(String name) + { + setName(name); + start(); + } + + public void run() + { + threadShouldStop = false; + + myLogger.info(getName() + " started"); + + String report; + + for (;;) // Each iteration sends a batch + { + if (threadShouldStop) + { + myLogger.info(getName() + " terminating"); + System.out.println(getName() + " terminating"); + return; + } + + List reportList = null; + synchronized(postQueue) + { + // Each iteration examines the queue for a batch to send + for (;;) + { + reportList = new ArrayList(); + for (int i = 0; i < maxBatchSize ; i++) + { + report = postQueue.poll(); + if (report == null) break; + reportList.add(report); + } + if (reportList.size() > 0) break; // break out to do the work. + + // Nothing to do, so wait a while, and look at the + // queue again. + try + { + postQueue.wait(QUEUE_WAIT * 1000); + } + catch (InterruptedException e) + { + CommonLogger.alarm.info(getName() + ": Postback queue interrupted while waiting: " + e); + break; + } + CommonLogger.health.info(getName() + " surfacing from wait"); + System.out.println(getName() + " surfacing from wait"); + continue; + } + } // synchronized() + if (reportList != null && reportList.size() > 0) + { + switch (post(reportList)) + { + case IRRECOVERABLE_ERROR: + case SUCCESS: + break; + + case SERVER_IO_ERROR: + // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log. + // Re-queue this job + queueReports(reportList); + // Sleep for a while before retrying this PostBack server. + CommonLogger.alarm.warn(getName() + ": Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); + try + { + Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000); + } + catch (InterruptedException e) + { + CommonLogger.alarm.warn(getName() + ": Caught while PostBack thread sleeps: " + e); + } + default: + } + } + } + } + + /** + * + * @param reportList + * @return SUCCESS, + * SERVER_IO_ERROR, when postback receiver has problem + * IRRECOVERABLE_ERROR + */ + private PostBackStatus post(List reportList) + { + StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8); + xml.append("<"); xml.append(xmlTopElement); xml.append(">"); + for (String report : reportList) + { + xml.append(report); + } + xml.append(""); + + CommonLogger.activity.info(getName() + ": posting " + xml.toString()); + PostMethod post = new PostMethod(postBackURL); + String responseBody = null; + try + { + post.setRequestEntity(new StringRequestEntity(xml.toString(), + "application/xml", "utf-8")); + } + catch (UnsupportedEncodingException e) + { + CommonLogger.alarm.warn(getName() + ": While adding this application/xml content to PostBack: " + xml + " -- " + e); + return PostBackStatus.IRRECOVERABLE_ERROR; + } + post.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, + new DefaultHttpMethodRetryHandler(3, false)); + + HttpClient client = new HttpClient(); + try + { + client.getHttpConnectionManager().getParams().setConnectionTimeout(5 * 1000); + int statusCode = client.executeMethod(post); + + if (statusCode != 200) + { + CommonLogger.alarm.warn(getName() + ": Received problem status code " + statusCode + " from posting to \"" + postBackURL + "\": " + xml); + return PostBackStatus.HTTP_STATUS_ERROR; + } + responseBody = post.getResponseBodyAsString().trim(); + CommonLogger.activity.info(getName() + ": Received response: " + (responseBody.length() == 0? "[empty]" : responseBody)); + if (responseBody.trim().length() == 0) return PostBackStatus.SUCCESS; + } + catch (ConnectTimeoutException e) + { + CommonLogger.alarm.warn(getName() + ": IO problem while posting to \"" + postBackURL + "\": " + xml + " -- " + e.getMessage()); + return PostBackStatus.SERVER_IO_ERROR; + } + catch (IOException e) + { + CommonLogger.alarm.warn(getName() + ": IO problem while posting to \"" + postBackURL + "\": " + xml + " -- " + e.getMessage()); + return PostBackStatus.SERVER_IO_ERROR; + } + catch (IllegalArgumentException e) + { + CommonLogger.alarm.warn(getName() + ": When posting to \"" + postBackURL + "\": " + e.getMessage()); + return PostBackStatus.IRRECOVERABLE_ERROR; + } + + // parse into xml doc + Document xmlDoc = null; + try + { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = factory.newDocumentBuilder(); + xmlDoc = builder.parse(new ByteArrayInputStream(responseBody.getBytes())); + } + catch (Exception e) + { + CommonLogger.alarm.warn(getName() + ": xml parse problem on received response from " + postBackURL + ": " + responseBody); + return PostBackStatus.IRRECOVERABLE_ERROR; + } + if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement)) + { + CommonLogger.alarm.warn(getName() + ": xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + responseBody); + return PostBackStatus.IRRECOVERABLE_ERROR; + } + XPath xpathEngine = XPathFactory.newInstance().newXPath(); + String xpath = null; + try + { + xpath = "@error"; + Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE); + if (errorNode != null) + { + String errorCode = errorNode.getNodeValue(); + xpath = "error_text"; + String errorText = (String)xpathEngine.evaluate(xpath, + xmlDoc.getDocumentElement(), XPathConstants.STRING); + CommonLogger.alarm.warn(getName() + ": Error response to <" + xmlTopElement + "> post back to " + + postBackURL + " -- error code=\"" + errorCode + "\", error text = \"" + + errorText + "\""); + return PostBackStatus.IRRECOVERABLE_ERROR; + } + } + catch (XPathExpressionException e) + { + CommonLogger.alarm.warn("Bad xpath: " + xpath); + return PostBackStatus.IRRECOVERABLE_ERROR; + } + catch (Exception e) + { + CommonLogger.alarm.warn(getName() + ": While decoding post back response from server: " + e); + return PostBackStatus.IRRECOVERABLE_ERROR; + } + return PostBackStatus.SUCCESS; + } + + public void terminate() + { + if (threadShouldStop) return; + + threadShouldStop = true; + + //Wait for at most 100 ms for thread to stop + interrupt(); + } + + } + + /** + * Constructs a pool of threads doing posting from a common job queue, + * to the supplied postBackURL. The top element of the XML that gets + * posted back has the give name. + * + * Requires these System properties: + * postback_max_queue_size + * postback_threadpool_size + * + * @param postBackURL + * @param xmlTopElementName + * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is + * not supplied nor valid. + */ + public PostBack(String postBackURL, String xmlTopElementName) throws IllegalArgumentException + { + if (postBackURL == null || postBackURL.length() == 0) + { + throw new IllegalArgumentException("PostBack class given null postBackURL"); + } + myName = "Postback-" + postBackURL; + if (xmlTopElementName == null || xmlTopElementName.length() == 0) + { + throw new IllegalArgumentException(myName + ": PostBack class given null xmlTopElement"); + } + this.postBackURL = postBackURL; + this.xmlTopElement = xmlTopElementName; + postQueue = new LinkedList(); + + int max_queue_size = 0; + String maxQueueSizeStr = System.getProperty("postback_max_queue_size"); + if (maxQueueSizeStr != null && (maxQueueSizeStr=maxQueueSizeStr.trim()).length() > 0) + { + max_queue_size = Integer.parseInt(maxQueueSizeStr); + } + maxQueueSize = max_queue_size > 0? max_queue_size : MAX_QUEUE_SIZE_DEFAULT; + CommonLogger.activity.info("Postback max queue size = " + maxQueueSize); + + int poolSize = 0; + String senderSizeStr = System.getProperty("postback_threadpool_size"); + if (senderSizeStr != null && (senderSizeStr=senderSizeStr.trim()).length() > 0) + { + poolSize = Integer.parseInt(senderSizeStr); + } + senderPoolSize = poolSize > 0? poolSize : THREADPOOL_SIZE_DEFAULT; + CommonLogger.activity.info("Postback threadpool size = " + senderPoolSize); + + int configuredMax = 0; + String maxBatchSizeStr = System.getProperty("postback_max_batch_size"); + if (maxBatchSizeStr != null && (maxBatchSizeStr=maxBatchSizeStr.trim()).length() > 0) + { + configuredMax = Integer.parseInt(maxBatchSizeStr); + } + maxBatchSize = configuredMax > 0? configuredMax: MAX_BATCH_SIZE_DEFAULT; + CommonLogger.activity.info("Postback max batch size = " + maxBatchSize); + + senderPool = new ArrayList(); + for (int i = 0; i < senderPoolSize; i++) + { + Sender sender = new Sender(myName + '-' + i); + senderPool.add(sender); + } + } + + /** + * Queues report to postQueue only if the queue size has not reached the + * maxQueueSize. + * @param report + * @return true if report is added to queue, false otherwise (queue full) + */ + public boolean queueReport(String report) + { + // Log for recovery in case of problem in posting report. + CommonLogger.activity.info(myName + " queing report: " + report); + + myLogger.debug(myName + ": postQueue size: " + postQueue.size()); + synchronized(postQueue) + { + if (postQueue.size() < maxQueueSize) + { + postQueue.add(report); + postQueue.notify(); + return true; + } + } + CommonLogger.alarm.warn(myName + ".queueReport method returning false"); + return false; + } + + /** + * Queues reports to postQueue only if the queue size has not reached the + * maxQueueSize. + * @param reports to be added back to postQueue + * @return true if all jobs have been added to queue, false otherwise (queue full) + */ + public boolean queueReports(List reports) + { + myLogger.debug(myName + ": postQueue size: " + postQueue.size()); + synchronized(postQueue) + { + Iterator iter = reports.iterator(); + int count = 0; // Number of reports added back to postQueue + while (iter.hasNext()) + { + String report = iter.next(); + if (postQueue.size() < maxQueueSize) + { + postQueue.add(report); + count++; + } + } + if (count > 0) postQueue.notify(); + boolean returnValue = (count == reports.size()); + if (!returnValue) + { + CommonLogger.alarm.warn(myName + + ".queueReport method returning false, having queued " + + count + " out of " + reports.size()); + } + return returnValue; + } + } + + public void terminate() + { + for (Sender sender : senderPool) + { + sender.terminate(); + } + + } + +} diff --git a/src/altk/common/engine/util/Base64Coder.java b/src/altk/common/engine/util/Base64Coder.java new file mode 100644 index 0000000..59f181d --- /dev/null +++ b/src/altk/common/engine/util/Base64Coder.java @@ -0,0 +1,145 @@ +package altk.common.engine.util; + +/** +* A Base64 Encoder/Decoder. +* +*

+* This class is used to encode and decode data in Base64 format as described in RFC 1521. +* +*

+* This is "Open Source" software and released under the GNU/LGPL license.
+* It is provided "as is" without warranty of any kind.
+* Copyright 2003: Christian d'Heureuse, Inventec Informatik AG, Switzerland.
+* Home page: www.source-code.biz
+* +*

+* Version history:
+* 2003-07-22 Christian d'Heureuse (chdh): Module created.
+* 2005-08-11 chdh: Lincense changed from GPL to LGPL.
+* 2006-11-21 chdh:
+*   Method encode(String) renamed to encodeString(String).
+*   Method decode(String) renamed to decodeString(String).
+*   New method encode(byte[],int) added.
+*   New method decode(String) added.
+*/ + +public class Base64Coder { + +// Mapping table from 6-bit nibbles to Base64 characters. +private static char[] map1 = new char[64]; + static { + int i=0; + for (char c='A'; c<='Z'; c++) map1[i++] = c; + for (char c='a'; c<='z'; c++) map1[i++] = c; + for (char c='0'; c<='9'; c++) map1[i++] = c; + map1[i++] = '+'; map1[i++] = '/'; } + +// Mapping table from Base64 characters to 6-bit nibbles. +private static byte[] map2 = new byte[128]; + static { + for (int i=0; iin. +* @return A character array with the Base64 encoded data. +*/ +public static char[] encode (byte[] in, int iLen) { + int oDataLen = (iLen*4+2)/3; // output length without padding + int oLen = ((iLen+2)/3)*4; // output length including padding + char[] out = new char[oLen]; + int ip = 0; + int op = 0; + while (ip < iLen) { + int i0 = in[ip++] & 0xff; + int i1 = ip < iLen ? in[ip++] & 0xff : 0; + int i2 = ip < iLen ? in[ip++] & 0xff : 0; + int o0 = i0 >>> 2; + int o1 = ((i0 & 3) << 4) | (i1 >>> 4); + int o2 = ((i1 & 0xf) << 2) | (i2 >>> 6); + int o3 = i2 & 0x3F; + out[op++] = map1[o0]; + out[op++] = map1[o1]; + out[op] = op < oDataLen ? map1[o2] : '='; op++; + out[op] = op < oDataLen ? map1[o3] : '='; op++; } + return out; } + +/** +* Decodes a string from Base64 format. +* @param s a Base64 String to be decoded. +* @return A String containing the decoded data. +* @throws IllegalArgumentException if the input is not valid Base64 encoded data. +*/ +public static String decodeString (String s) { + return new String(decode(s)); } + +/** +* Decodes a byte array from Base64 format. +* @param s a Base64 String to be decoded. +* @return An array containing the decoded data bytes. +* @throws IllegalArgumentException if the input is not valid Base64 encoded data. +*/ +public static byte[] decode (String s) { + return decode(s.toCharArray()); } + +/** +* Decodes a byte array from Base64 format. +* No blanks or line breaks are allowed within the Base64 encoded data. +* @param in a character array containing the Base64 encoded data. +* @return An array containing the decoded data bytes. +* @throws IllegalArgumentException if the input is not valid Base64 encoded data. +*/ +public static byte[] decode (char[] in) { + int iLen = in.length; + if (iLen%4 != 0) throw new IllegalArgumentException ("Length of Base64 encoded input string is not a multiple of 4."); + while (iLen > 0 && in[iLen-1] == '=') iLen--; + int oLen = (iLen*3) / 4; + byte[] out = new byte[oLen]; + int ip = 0; + int op = 0; + while (ip < iLen) { + int i0 = in[ip++]; + int i1 = in[ip++]; + int i2 = ip < iLen ? in[ip++] : 'A'; + int i3 = ip < iLen ? in[ip++] : 'A'; + if (i0 > 127 || i1 > 127 || i2 > 127 || i3 > 127) + throw new IllegalArgumentException ("Illegal character in Base64 encoded data."); + int b0 = map2[i0]; + int b1 = map2[i1]; + int b2 = map2[i2]; + int b3 = map2[i3]; + if (b0 < 0 || b1 < 0 || b2 < 0 || b3 < 0) + throw new IllegalArgumentException ("Illegal character in Base64 encoded data."); + int o0 = ( b0 <<2) | (b1>>>4); + int o1 = ((b1 & 0xf)<<4) | (b2>>>2); + int o2 = ((b2 & 3)<<6) | b3; + out[op++] = (byte)o0; + if (op