package altk.comm.engine; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Vector; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.servlet.ServletContext; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import altk.comm.engine.Broadcast.BroadcastState; import altk.comm.engine.exception.BroadcastError; import altk.comm.engine.exception.BroadcastException; import altk.comm.engine.exception.PlatformError; import altk.comm.engine.exception.PlatformException; import altk.comm.engine.postback.PostBack; @SuppressWarnings("serial") public abstract class CommEngine extends HttpServlet { static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; private static final int SCHEDULER_THREAD_POOL_SIZE = 1; private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60; private static final int SERVICE_THREADPOOL_SIZE_DEFAULT = 1; private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 2; private static final int POSTBACK_MAX_QUEUE_SIZE_DEFAULT = 10000; private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100; /** * Maps a broadcastId to a broadcast. */ private Map broadcasts; protected boolean notInService; protected Properties config; protected Map postBackMap; protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice" private long startupTimestamp; // Sequencing naming of broadcast that fails to yield its broadcastId private int unknownBroadcastIdNdx = 1; private BroadcastException myException; /** * Used to communicate media-specific platform resources to broadcasts */ protected EngineResources resources; private static Logger myLogger; private ScheduledExecutorService scheduler; private long deadBroadcastViewingMinutes; private int completedJobCount = 0; private int serviceThreadPoolSize; private int postbackMaxQueueSize; private int postbackSenderPoolSize; private int postbackMaxBatchSize; protected String runtimeDirPath; protected String confDirPath; abstract protected Broadcast mkBroadcast(); public CommEngine(String engineName) { this.engineName = engineName; broadcasts = new HashMap(); startupTimestamp = System.currentTimeMillis(); myException = null; } /** * Relocates a filepath relative to runtime directory if filepath is not absolute. * @param filepath * @return */ public String relocateToRuntimeDir(String filepath) { if (filepath.startsWith("/")) return filepath; // no change to absolute path String relocated = filepath; // The next 2 lines take care of pre-git era meaning convention of filepath in properties. // Then, the runtime is relative to the current working directory of tomcat. // Now they are relative to the runtimDirPath obtained from the tomcat tomcat context. // These 2 lines an be deleted when all CommEngines in production are in the git era. String unwanted_prefix = engineName + "/"; if (filepath.startsWith(unwanted_prefix)) relocated = filepath.substring(unwanted_prefix.length()); relocated = runtimeDirPath + "/" + relocated; return relocated; } /** * Invoked by servlet container during initialization of servlet. */ public final void init() { // check init parameters ServletContext servletContext = getServletContext(); confDirPath = servletContext.getInitParameter(getConfDirContextName()); System.out.println("Config directory is configured to be '" + confDirPath + "'. Make sure it and its content are readable by user 'tomcat'"); runtimeDirPath = servletContext.getInitParameter(getRunTimeDirContextName()); System.out.println("Runtime directory is configured to be '" + runtimeDirPath + "'. Make sure it and its content are readable by user 'tomcat'"); File propertiesFile = new File(confDirPath + "/properties"); // Configure log4j using log4j.properties file, \ // sibling to the engine properties file. // This change is backward compatible with placing the log4j.properties file in // the class path. String log4j_properties = confDirPath + "/log4j.properties"; // Relocate file property offetting it by the runtimeDirPath try { Properties prop = new Properties(); prop.load(new FileInputStream(log4j_properties)); Enumeration e = prop.keys(); while (e.hasMoreElements()) { String key = (String)e.nextElement(); if (key.toLowerCase().endsWith(".file")) { String filepath = prop.getProperty(key); String relocate = relocateToRuntimeDir(filepath); prop.setProperty(key, relocate); System.out.println(key + "=" + relocate); } } PropertyConfigurator.configure(prop); } catch (Exception e) { System.out.println("Failed to configure log4: " + e); // Do nothing, assuming the exception is FileNotFoundException. // Remaining log4j initialization will look for log4j.properties // file in the class path. } // This activates Logger class instantiation. At this point, if lo4j // is not yet configured, // it will look for the log4j.properties file in the class path. myLogger = Logger.getLogger(CommEngine.class); myLogger.info("init() invoked"); // File propertiesFile = new File( // getClass().getClassLoader().getResource("properties").getFile() // ); CommonLogger.startup.info("Using lo4j properites file " + log4j_properties); CommonLogger.startup.info("Using configuration file " + propertiesFile.getAbsolutePath()); config = new Properties(); try { config.load(new FileInputStream(propertiesFile)); } catch (FileNotFoundException e) { CommonLogger.alarm.fatal("Properties file " + propertiesFile.getAbsolutePath() + " not found -- abort"); notInService = true; return; } catch (IOException e) { CommonLogger.alarm.fatal("Problem in reading properties file " + propertiesFile.getAbsolutePath() + ": " + e.getMessage()); notInService = true; return; } postBackMap = new HashMap(); // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes String periodStr = config.getProperty("dead_broadcast_viewing_period", new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString()); deadBroadcastViewingMinutes = Long.parseLong(periodStr); CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); String str = config.getProperty("service_threadpool_size", new Integer(SERVICE_THREADPOOL_SIZE_DEFAULT).toString()); serviceThreadPoolSize = Integer.parseInt(str); CommonLogger.startup.info(String.format("service thread pool size: %d", serviceThreadPoolSize)); String string = config.getProperty("postback_max_queue_size", new Integer(POSTBACK_MAX_QUEUE_SIZE_DEFAULT).toString()); postbackMaxQueueSize = Integer.parseInt(string); CommonLogger.activity.info("Postback max queue size = " + postbackMaxQueueSize); string = config.getProperty("postback_threadpool_size", new Integer(POSTBACK_THREADPOOL_SIZE_DEFAULT).toString()); postbackSenderPoolSize = Integer.parseInt(string); CommonLogger.activity.info("Postback threadpool size = " + postbackSenderPoolSize); string = config.getProperty("postback_max_batch_size", new Integer(POSTBACK_MAX_BATCH_SIZE_DEFAULT).toString()); postbackMaxBatchSize = Integer.parseInt(string); CommonLogger.activity.info("Postback max batch size = " + postbackMaxBatchSize); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}}, deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES); initChild(); } protected void purgeStaleBroadcasts() { long now = System.currentTimeMillis(); synchronized (broadcasts) { for (String id : broadcasts.keySet()) { if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000) { Broadcast broadcast = broadcasts.get(id); completedJobCount += broadcast.getCompletedJobCount(); broadcasts.remove(id); } } } } /** * * @return name of parameter in Tomcat context file, specifying properties file * for this SMSEngine. */ protected abstract String getConfDirContextName(); protected abstract String getRunTimeDirContextName(); @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) { Broadcast broadcast = mkBroadcast(); broadcast.doPost(request, response, this); } //@Override // TODO Not used - delete @Deprecated protected void doPost_good(HttpServletRequest request, HttpServletResponse response) { myException = null; try { Broadcast broadcast = mkBroadcast(); broadcast.setServiceThreadPoolsize(serviceThreadPoolSize); try { broadcast.decode(request, notInService); if (notInService) { throw new PlatformException(PlatformError.RUNTIME_ERROR, "Not in service"); } if (broadcast.recipientList.size() == 0) { CommonLogger.activity.info("Broadcast " + broadcast.getBroadcastId() + ": No recipients"); broadcast.setState(BroadcastState.COMPLETED, "No recipients", null); return; } // Determine postBackUrl String postBackURL = broadcast.getPostBackURL(); PostBack postBack = null; if (postBackURL != null) { postBack = postBackMap.get(postBackURL); if (postBack == null) { postBack = new PostBack(postBackURL, broadcast.broadcastType + "_status", postbackMaxQueueSize, postbackSenderPoolSize, postbackMaxBatchSize); postBackMap.put(postBackURL, postBack); } } broadcast.initSync(resources); broadcast.init(postBack); if (broadcast.getState() == BroadcastState.COMPLETED) return; } catch (BroadcastException e) { myException = e; broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); return; } catch (Throwable t) { // Caught stray unexpected runtime problem CommonLogger.alarm.error("Broadcast aborted: " + t); myLogger.error("Broadcast aborted", t); myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage()); broadcast.setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText); } finally { // Put broadcast in broadcasts map. String broadcastId = broadcast.getBroadcastId(); if (broadcastId != null && broadcastId.length() != 0) { broadcasts.put(broadcastId, broadcast); } else { String makeUpId = "Unknown" + unknownBroadcastIdNdx++; broadcasts.put(makeUpId, broadcast); } // Return regular or error response String responseXML = broadcast.getResponseXML(myException); PrintWriter writer = response.getWriter(); writer.write(responseXML); writer.close(); } try { broadcast.initAsync(); broadcast.startProcessing(); } catch (BroadcastException e) { broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); } } catch (Throwable t) { // Caught stray unexpected runtime problem CommonLogger.alarm.error("Broadcast aborted: " + t.getMessage()); myLogger.error("Broadcast aborted", t); } } /** * Functions covered are * get=status * get=cancel_broadcast&broadcast_id=XXX */ @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) { PrintWriter out; try { out = response.getWriter(); } catch (IOException e) { CommonLogger.alarm.error("Cannot write a reply: " + e); return; } String get = (String)request.getParameter("get"); if (get == null) { // Return http status BAD REQUEST int httpStatus = HttpServletResponse.SC_BAD_REQUEST; try { response.sendError(httpStatus); } catch (IOException e) { myLogger.warn("Unnable to return HTTP error code " + httpStatus); } return; } if (get.equalsIgnoreCase("status")) { getStatus(request, out); } else if (get.equalsIgnoreCase("cancel_broadcast")) { cancelBroadcast(request, out); } else if (get.equalsIgnoreCase("pause_broadcast")) { pauseBroadcast(request, out); } else if (get.equalsIgnoreCase("resume_broadcast")) { resumeBroadcast(request, out); } else { out.write(get + " not supported"); } out.close(); } private void cancelBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.cancel(out); } protected void pauseBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.pause(); } protected void resumeBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.resume(); } /** * * status of each broadcast * tttnnn * */ private void getStatus(HttpServletRequest request, PrintWriter out) { String broadcastId = request.getParameter("broadcast_id"); if (broadcastId != null) { broadcastId = broadcastId.trim(); if (broadcastId.length() == 0) { out.write("broadcast_id request parameter cannot be empty"); return; } Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.write("No such broadcast"); } else { out.write(broadcast.mkStatusReport()); } return; } else { String tag = engineName + "_status"; out.write("<" + tag + ">\r\n"); out.write("" + startupTimestamp + "\r\n"); // First get a copy of broadcasts, to avoid mutex deadlock. Vector broadcastList = new Vector(); synchronized(broadcasts) { for (String key : broadcasts.keySet()) { broadcastList.add(broadcasts.get(key)); } } // We have released the lock. // Then append status of each broadcast to outBuf. for (Broadcast broadcast : broadcastList) { out.write(broadcast.mkStatusReport()); } out.write(""); out.write(""); } } public int getReadyJobCount() { int readyCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { readyCount += broadcast.getReadyJobCount(); } } return readyCount; } public int getActiveJobCount() { int activeCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { activeCount += broadcast.getActiveJobCount(); } } return activeCount; } public int getCompletedJobCount() { int additionalCompletedJobCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { additionalCompletedJobCount += broadcast.getCompletedJobCount(); } } return completedJobCount + additionalCompletedJobCount; } public void removeBroadcast(String broadcastId) { CommonLogger.activity.info("Removing broadcast " + broadcastId); synchronized(broadcasts) { broadcasts.remove(broadcastId); } } public boolean notInService() { return notInService; } /** * Decode http GET request for broadcast_id value * @param request * @return broadcast_id */ private String getBroadcastId(HttpServletRequest request) { return request.getParameter("broadcast_id"); } /** * Invoked by servlet container when servlet is destroyed. */ public final void destroy() { System.out.println("Destroying " + engineName); // Shutdown threads that periodically purge stale broadcasts. scheduler.shutdownNow(); // Kill threads in each PostBack, which is remembered in postBackMap. for (PostBack postback : postBackMap.values()) { postback.terminate(); } for (Broadcast broadcast : broadcasts.values()) { broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); } destroyChild(); super.destroy(); } /** * Indirectly invoked by servlet container during servlet initialization. */ abstract protected void initChild(); /** * Indirectly invoked by servlet container during destruction of servlet. */ abstract protected void destroyChild(); public PostBack getPostBack(String postBackURL, String broadcastType) throws BroadcastException { if (postBackURL == null) return null; PostBack postBack = postBackMap.get(postBackURL); if (postBack != null) return postBack; try { postBack = new PostBack(postBackURL, broadcastType + "_status", postbackMaxQueueSize, postbackSenderPoolSize, postbackMaxBatchSize); } catch (KeyManagementException | IllegalArgumentException | NoSuchAlgorithmException | KeyStoreException e) { throw new BroadcastException(BroadcastError.PLATFORM_ERROR, e.getMessage(), e); } postBackMap.put(postBackURL, postBack); return postBack; } public EngineResources getResources() { return resources; } public void addBroadcast(String broadcastId, Broadcast broadcast) { if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; broadcasts.put(broadcastId, broadcast); } /** * If broadcast has no id, one will be created for it. * @param broadcast */ public void installBroadcast(Broadcast broadcast) { String broadcastId = broadcast.getBroadcastId(); if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; broadcasts.put(broadcastId, broadcast); } public int getServiceThreadPoolSize() { return serviceThreadPoolSize; } }