package altk.comm.engine; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Vector; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.servlet.ServletContext; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; import altk.comm.engine.Broadcast.BroadcastState; import altk.comm.engine.exception.BroadcastError; import altk.comm.engine.exception.BroadcastException; import altk.comm.engine.exception.PlatformError; import altk.comm.engine.exception.PlatformException; import altk.comm.engine.postback.PostBack; public abstract class CommEngine extends HttpServlet { /** * */ private static final long serialVersionUID = 6887055442875818654L; static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; private static final int SCHEDULER_THREAD_POOL_SIZE = 1; private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60; /** * Maps a broadcastId to a broadcast. */ private Map broadcasts; protected boolean notInService; protected Properties config; protected Map postBackMap; protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice" private long startupTimestamp; // Sequencing naming of broadcast that fails to yield its broadcastId private int unknownBroadcastIdNdx = 1; private BroadcastException myException; /** * Used to communicate media-specific platform resources to broadcasts */ protected EngineResources resources; private static Logger myLogger = Logger.getLogger(CommEngine.class); private ScheduledExecutorService scheduler; private long deadBroadcastViewingMinutes; private int completedJobCount = 0; abstract protected Broadcast mkBroadcast(); public CommEngine(String engineName) { this.engineName = engineName; broadcasts = new HashMap(); startupTimestamp = System.currentTimeMillis(); myException = null; } /** * Invoked by servlet container during initialization of servlet. */ public final void init() { myLogger.info("init() invoked"); // check init parameters ServletContext servletContext = getServletContext(); String propertiesFilePath; propertiesFilePath = servletContext.getInitParameter(getPropertiesContextName()); File propertiesFile = new File(propertiesFilePath); CommonLogger.startup.info("Using configuration file " + propertiesFile.getAbsolutePath()); config = new Properties(); try { config.load(new FileInputStream(propertiesFile)); } catch (FileNotFoundException e) { CommonLogger.alarm.fatal("Properties file " + propertiesFile.getAbsolutePath() + " not found -- abort"); notInService = true; return; } catch (IOException e) { CommonLogger.alarm.fatal("Problem in reading properties file " + propertiesFile.getAbsolutePath() + ": " + e.getMessage()); notInService = true; return; } postBackMap = new HashMap(); // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes String periodStr = config.getProperty("dead_broadcast_viewing_period", new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString()); deadBroadcastViewingMinutes = Long.parseLong(periodStr); CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}}, deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES); initChild(); } protected void purgeStaleBroadcasts() { long now = System.currentTimeMillis(); synchronized (broadcasts) { for (String id : broadcasts.keySet()) { if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000) { Broadcast broadcast = broadcasts.get(id); completedJobCount += broadcast.getCompletedJobCount(); broadcasts.remove(id); } } } } /** * * @return name of parameter in Tomcat context file, specifying properties file * for this SMSEngine. */ protected abstract String getPropertiesContextName(); @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) { try { Broadcast broadcast = mkBroadcast(); try { broadcast.decode(request, notInService); if (notInService) { throw new PlatformException(PlatformError.RUNTIME_ERROR, "Not in service"); } if (broadcast.recipientList.size() == 0) { CommonLogger.activity.info("Broadcast " + broadcast.getBroadcastId() + ": No recipients"); broadcast.setState(BroadcastState.COMPLETED, "No recipients", null); return; } // Determine postBackUrl String postBackURL = broadcast.getPostBackURL(); PostBack postBack = null; if (postBackURL != null) { postBack = postBackMap.get(postBackURL); if (postBack == null) { postBack = new PostBack(postBackURL, broadcast.broadcastType + "_status"); postBackMap.put(postBackURL, postBack); } } broadcast.initSync(resources); broadcast.init(postBack); if (broadcast.getState() == BroadcastState.COMPLETED) return; } catch (BroadcastException e) { myException = e; broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); return; } catch (Throwable t) { // Caught stray unexpected runtime problem CommonLogger.alarm.error("Broadcast aborted: " + t); myLogger.error("Broadcast aborted", t); myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage()); broadcast.setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText); } finally { // Put broadcast in broadcasts map. String broadcastId = broadcast.getBroadcastId(); if (broadcastId.length() != 0) { broadcasts.put(broadcastId, broadcast); } else { String makeUpId = "Unknown" + unknownBroadcastIdNdx++; broadcasts.put(makeUpId, broadcast); } // Return regular or error response String responseXML = broadcast.getResponseXML(myException); PrintWriter writer = response.getWriter(); writer.write(responseXML); writer.close(); } try { broadcast.initAsync(); broadcast.startProcessing(); } catch (BroadcastException e) { broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); } } catch (Throwable t) { // Caught stray unexpected runtime problem CommonLogger.alarm.error("Broadcast aborted: " + t.getMessage()); myLogger.error("Broadcast aborted", t); } } /** * Functions covered are * get=status * get=cancel_broadcast&broadcast_id=XXX */ @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) { PrintWriter out; try { out = response.getWriter(); } catch (IOException e) { CommonLogger.alarm.error("Cannot write a reply: " + e); return; } String get = (String)request.getParameter("get"); if (get == null) { // Return http status BAD REQUEST int httpStatus = HttpServletResponse.SC_BAD_REQUEST; try { response.sendError(httpStatus); } catch (IOException e) { myLogger.warn("Unnable to return HTTP error code " + httpStatus); } return; } if (get.equalsIgnoreCase("status")) { getStatus(request, out); } else if (get.equalsIgnoreCase("cancel_broadcast")) { cancelBroadcast(request, out); } else if (get.equalsIgnoreCase("pause_broadcast")) { pauseBroadcast(request, out); } else if (get.equalsIgnoreCase("resume_broadcast")) { resumeBroadcast(request, out); } else { out.write(get + " not supported"); } out.close(); } private void cancelBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.cancel(); } protected void pauseBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.pause(); } protected void resumeBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.resume(); } /** * * status of each broadcast * tttnnn * */ private void getStatus(HttpServletRequest request, PrintWriter out) { String broadcastId = request.getParameter("broadcast_id"); if (broadcastId != null) { broadcastId = broadcastId.trim(); if (broadcastId.length() == 0) { out.write("broadcast_id request parameter cannot be empty"); return; } Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.write("No such broadcast"); } else { out.write(broadcast.mkStatusReport()); } return; } else { String tag = engineName + "_status"; out.write("<" + tag + ">\r\n"); out.write("" + startupTimestamp + "\r\n"); // First get a copy of broadcasts, to avoid mutex deadlock. Vector broadcastList = new Vector(); synchronized(broadcasts) { for (String key : broadcasts.keySet()) { broadcastList.add(broadcasts.get(key)); } } // We have released the lock. // Then append status of each broadcast to outBuf. for (Broadcast broadcast : broadcastList) { out.write(broadcast.mkStatusReport()); } out.write(""); out.write(""); } } public int getReadyJobCount() { int readyCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { readyCount += broadcast.getReadyJobCount(); } } return readyCount; } public int getActiveJobCount() { int activeCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { activeCount += broadcast.getActiveJobCount(); } } return activeCount; } public int getCompletedJobCount() { int additionalCompletedJobCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { additionalCompletedJobCount += broadcast.getCompletedJobCount(); } } return completedJobCount + additionalCompletedJobCount; } public void removeBroadcast(String broadcastId) { CommonLogger.activity.info("Removing broadcast " + broadcastId); synchronized(broadcasts) { broadcasts.remove(broadcastId); } } public boolean notInService() { return notInService; } /** * Decode http GET request for broadcast_id value * @param request * @return broadcast_id */ private String getBroadcastId(HttpServletRequest request) { return request.getParameter("broadcast_id"); } /** * Invoked by servlet container when servlet is destroyed. */ public final void destroy() { System.out.println(engineName + " destroyed"); // Kill threads in each PostBack, which is remembered in postBackMap. for (PostBack postback : postBackMap.values()) { postback.terminate(); } for (Broadcast broadcast : broadcasts.values()) { broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); } destroyChild(); } /** * Indirectly invoked by servlet container during servlet initialization. */ abstract protected void initChild(); /** * Indirectly invoked by serlet container during destruction of servlet. */ abstract protected void destroyChild(); }