package altk.comm.engine; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Vector; import java.util.concurrent.ScheduledExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import altk.comm.engine.Broadcast.BroadcastState; @SuppressWarnings("serial") public abstract class CommEngine extends HttpServlet { static final String DAILY_PAUSE_KEY = "daily_pause"; static final String DAILY_RESUME_KEY = "daily_resume"; static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60; public static final String SERVICE_THREADPOOL_SIZE_KEY = "service_threadpool_size"; private static final int SERVICE_THREADPOOL_SIZE_DEFAULT = 1; private static final String POSTBACK_THREADPOOL_SIZE_KEY = "postback_threadpool_size"; private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 20; private static final String POSTBACK_MAX_QUEUE_SIZE_KEY = "postback_max_queue_size"; private static final int POSTBACK_MAX_QUEUE_SIZE_DEFAULT = 10000; private static final String POSTBACK_MAX_BATCH_SIZE_KEY = "postback_max_batch_size"; private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100; private static final String PAUSE_THRESHOLD_KEY = "pause_threshold"; private static final int PAUSE_THRESHOLD_DEFAULT = 0; /** * Maps a broadcastId to a broadcast. */ private Map broadcasts; protected boolean notInService; protected Properties config; protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice" private long startupTimestamp; // Sequencing naming of broadcast that fails to yield its broadcastId private int unknownBroadcastIdNdx = 1; /** * Used to communicate media-specific platform resources to broadcasts */ protected EngineResources resources; protected static Logger myLogger; private ScheduledExecutorService scheduler; private long deadBroadcastViewingMinutes; private int completedJobCount = 0; protected String runtimeDirPath; protected String confDirPath; /** Daily resume all broadcasts time. No action if "" */ protected String daily_resume = ""; /** Daily pause all broadcasts time. No action if "" */ protected String daily_pause = ""; private DailyClock dailyClock; protected class DailyClock extends Thread { private boolean threadShouldStop = false; public void run() { while (!threadShouldStop) { String timeOfDay = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm")); // Check for pause for (Broadcast broadcast : broadcasts.values()) { // Check for pause if (broadcast.daily_pause.length() > 0) { if (timeOfDay.equals(broadcast.daily_pause)) broadcast.pause("clock", null); } else if (timeOfDay.equals(daily_pause)) broadcast.pause("clock", null); // Check for resume if (broadcast.daily_resume.length() > 0) { if (timeOfDay.equals(broadcast.daily_resume)) broadcast.resume("clock", null); } else if (timeOfDay.equals(daily_resume)) broadcast.resume("clock", null); } long currentTime = System.currentTimeMillis(); long sleepTime = 60000 + 30000 - currentTime % 60000; if (sleepTime > 0) { try { Thread.sleep(sleepTime); } catch (Exception e) { myLogger.error("DailyClock thread caught: " + e.getMessage(), e); return; } } } } public void terminate() { threadShouldStop = true; } } abstract protected Broadcast mkBroadcast(); public CommEngine(String engineName) { this.engineName = engineName; broadcasts = new HashMap(); startupTimestamp = System.currentTimeMillis(); } /** * Relocates a filepath relative to runtime directory if filepath is not absolute. * @param filepath * @return */ public String relocateToRuntimeDir(String filepath) { if (filepath.startsWith("/")) return filepath; // no change to absolute path String relocated = filepath; // The next 2 lines take care of pre-git era meaning convention of filepath in properties. // Then, the runtime is relative to the current working directory of tomcat. // Now they are relative to the runtimDirPath obtained from the tomcat tomcat context. // These 2 lines an be deleted when all CommEngines in production are in the git era. String unwanted_prefix = engineName + "/"; if (filepath.startsWith(unwanted_prefix)) relocated = filepath.substring(unwanted_prefix.length()); relocated = runtimeDirPath + "/" + relocated; return relocated; } /** * Invoked by servlet container during initialization of servlet. * @throws ServletException */ public final void init() throws ServletException { // check init parameters ServletContext servletContext = getServletContext(); confDirPath = servletContext.getInitParameter(getConfDirContextName()); System.out.println("Config directory is configured to be '" + confDirPath + "'. Make sure it and its content are readable by user 'tomcat'"); runtimeDirPath = servletContext.getInitParameter(getRunTimeDirContextName()); System.out.println("Runtime directory is configured to be '" + runtimeDirPath + "'. Make sure it and its content are readable by user 'tomcat'"); File propertiesFile = new File(confDirPath + "/properties"); // Configure log4j using log4j.properties file, \ // sibling to the engine properties file. // This change is backward compatible with placing the log4j.properties file in // the class path. String log4j_properties = confDirPath + "/log4j.properties"; // Relocate file property offetting it by the runtimeDirPath try { Properties prop = new Properties(); prop.load(new FileInputStream(log4j_properties)); Enumeration e = prop.keys(); while (e.hasMoreElements()) { String key = (String)e.nextElement(); if (key.toLowerCase().endsWith(".file")) { String filepath = prop.getProperty(key); String relocate = relocateToRuntimeDir(filepath); prop.setProperty(key, relocate); System.out.println(key + "=" + relocate); } } PropertyConfigurator.configure(prop); } catch (Exception e) { System.out.println("Failed to configure log4: " + e); // Do nothing, assuming the exception is FileNotFoundException. // Remaining log4j initialization will look for log4j.properties // file in the class path. } // This activates Logger class instantiation. At this point, if lo4j // is not yet configured, // it will look for the log4j.properties file in the class path. myLogger = Logger.getLogger(CommEngine.class); myLogger.info("init() invoked"); CommonLogger.startup.info("Using lo4j properites file " + log4j_properties); CommonLogger.startup.info("Using configuration file " + propertiesFile.getAbsolutePath()); config = new Properties(); try { config.load(new FileInputStream(propertiesFile)); } catch (FileNotFoundException e) { CommonLogger.alarm.fatal("Properties file " + propertiesFile.getAbsolutePath() + " not found -- abort"); notInService = true; return; } catch (IOException e) { CommonLogger.alarm.fatal("Problem in reading properties file " + propertiesFile.getAbsolutePath() + ": " + e.getMessage()); notInService = true; return; } String timeOfDay, timeOfDayStr, key; key = DAILY_RESUME_KEY; timeOfDayStr = config.getProperty(key , ""); timeOfDay = checkTimeOfDay(timeOfDayStr); if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr)); daily_resume = timeOfDay; key = DAILY_PAUSE_KEY; timeOfDayStr = config.getProperty(key, ""); timeOfDay = checkTimeOfDay(timeOfDayStr); if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr)); daily_pause = timeOfDay; Thread dailyClock; CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); CommonLogger.startup.info(String.format("service thread pool size: %d", getServiceThreadPoolSize())); CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize()); CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); CommonLogger.activity.info("daily_resume = " + daily_resume); CommonLogger.activity.info("daily_pause = " + daily_pause); dailyClock = new DailyClock(); dailyClock.start(); try { // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes String periodStr = config.getProperty("dead_broadcast_viewing_period", Long.toString(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT)); deadBroadcastViewingMinutes = Long.parseLong(periodStr); initChild(); } catch (Exception e) { throw new ServletException(e.getMessage(), e); } } public int getPauseThreshold() { return getPauseThreshold(config); } public int getPauseThreshold(Properties properties) { String str = properties. getProperty(PAUSE_THRESHOLD_KEY, String.valueOf(PAUSE_THRESHOLD_DEFAULT)); int pauseThreshold = Integer.valueOf(str); return pauseThreshold; } public int getServiceThreadPoolSize() { return getServiceThreadPoolSize(config); } public int getServiceThreadPoolSize(Properties properties) { String str = properties. getProperty(SERVICE_THREADPOOL_SIZE_KEY, String.valueOf(SERVICE_THREADPOOL_SIZE_DEFAULT)); int size = Integer.valueOf(str); return size; } public int getPostbackSenderPoolSize() { return getPostbackSenderPoolSize(config); } public int getPostbackSenderPoolSize(Properties properties) { String str = properties. getProperty(POSTBACK_THREADPOOL_SIZE_KEY, String.valueOf(POSTBACK_THREADPOOL_SIZE_DEFAULT)); int size = Integer.valueOf(str); return size; } public int getPostbackMaxQueueSize() { return getPostbackMaxQueueSize(config); } public int getPostbackMaxQueueSize(Properties properties) { String str = properties. getProperty(POSTBACK_MAX_QUEUE_SIZE_KEY, String.valueOf(POSTBACK_MAX_QUEUE_SIZE_DEFAULT)); int size = Integer.valueOf(str); return size; } public int getPostbackMaxBatchSize() { return getPostbackMaxBatchSize(config); } public int getPostbackMaxBatchSize(Properties properties) { String str = properties. getProperty(POSTBACK_MAX_BATCH_SIZE_KEY, String.valueOf(POSTBACK_MAX_BATCH_SIZE_DEFAULT)); int size = Integer.valueOf(str); return size; } protected void purgeStaleBroadcasts() { long now = System.currentTimeMillis(); synchronized (broadcasts) { Iterator iter = broadcasts.keySet().iterator(); while (iter.hasNext()) { Broadcast broadcast = broadcasts.get(iter.next()); if (broadcast.getState().isFinal && now - broadcast.changeStateTime > deadBroadcastViewingMinutes * 60 * 1000) { completedJobCount += broadcast.getCompletedJobCount(); iter.remove(); } } } } /** * * @return name of parameter in Tomcat context file, specifying properties file * for this SMSEngine. */ protected abstract String getConfDirContextName(); protected abstract String getRunTimeDirContextName(); @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) { Broadcast broadcast = mkBroadcast(); broadcast.doPost(request, response, this); } /** * Functions covered are * get=status * get=cancel_broadcast&broadcast_id=XXX */ @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) { PrintWriter out; try { out = response.getWriter(); } catch (IOException e) { CommonLogger.alarm.error("Cannot write a reply: " + e); return; } String get = (String)request.getParameter("get"); if (get == null) { // Return http status BAD REQUEST int httpStatus = HttpServletResponse.SC_BAD_REQUEST; try { response.sendError(httpStatus); } catch (IOException e) { myLogger.warn("Unnable to return HTTP error code " + httpStatus); } return; } if (get.equalsIgnoreCase("status")) { getStatus(request, out); } else if (get.equalsIgnoreCase("cancel_broadcast")) { cancelBroadcast(request, out); } else if (get.equalsIgnoreCase("pause_broadcast")) { pauseBroadcast(request, out); } else if (get.equalsIgnoreCase("resume_broadcast")) { resumeBroadcast(request, out); } else if (get.equalsIgnoreCase("configuration")) { getConfiguration(request, out); } else if (get.equalsIgnoreCase("configure")) { configure(request, out); } else { out.write(get + " not supported"); } out.close(); } /** * Writes configuration in JSON string to out * @param request * @param out */ protected void getConfiguration(HttpServletRequest request, PrintWriter out) { JSONObject configuration = getConfigJSON(); out.print(configuration); } @SuppressWarnings("unchecked") private JSONObject getConfigJSON() { JSONObject config = new JSONObject(); // engine configuration JSONObject EngineConfig = new JSONObject(); config.put("engine", EngineConfig); EngineConfig.put(DAILY_PAUSE_KEY, daily_pause); EngineConfig.put(DAILY_RESUME_KEY, daily_resume); // broadcast configuration JSONObject broadcastsConfig = new JSONObject(); synchronized (broadcasts) { for (String broadcastId : broadcasts.keySet()) { Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast.getState().isFinal) continue; broadcastsConfig.put(broadcastId, broadcast.getConfigJSON()); } } if (broadcastsConfig.size() > 0) config.put("broadcasts", broadcastsConfig); childAddConfig(config); return config; } /** * Derived class may add to configMap * @param configMap */ protected void childAddConfig(JSONObject config) { } private void cancelBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); String reason = request.getParameter("reason"); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.cancel(reason, out); } protected void pauseBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); String reason = request.getParameter("reason"); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.pause(reason, out); } protected void resumeBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); Broadcast broadcast = broadcasts.get(broadcastId); String reason = request.getParameter("reason"); if (broadcast == null) { out.format("Broadcast %s does not exist", broadcastId); return; } broadcast.resume(reason, out); } /** * Check if timeOfDay is of the form "HH::mm" * @param timeOfDay * @return timeOfDay if valid, otherwise null */ protected static String checkTimeOfDay(String timeOfDay) { timeOfDay = timeOfDay.trim(); if (timeOfDay.length() == 0) return timeOfDay; // pattern hh:mm Pattern pattern = Pattern.compile("^(\\d\\d):[0-5]\\d$"); Matcher matcher = pattern.matcher(timeOfDay); if (!matcher.find()) return null; // Check hour in range String hh = matcher.group(1); if (Integer.parseInt(hh) > 23) return null; return timeOfDay; } /** * Writes error message to out. Otherwise writes nothing to out. * @param request * @param out */ protected void configure(HttpServletRequest request, PrintWriter out) { // save original configuration for roll back in case of error JSONObject origConfigJSON = getConfigJSON(); String jsonString = request.getParameter("data"); try { JSONParser parser = new JSONParser(); JSONObject configuration = (JSONObject) parser.parse(jsonString); configure(configuration); } catch (Exception e) { myLogger.error(e); out.write("Error - " + e.getMessage()); // restore current confiuration try { configure(origConfigJSON); } catch (Exception e1) { out.write("\nInternal error in restoring original configuration: " + e1.getMessage()); } } } private void configureEngine(JSONObject configuration) throws Exception { String value, timeOfDay, key; key = DAILY_PAUSE_KEY; value = (String)configuration.get(key); if (value != null) { timeOfDay = checkTimeOfDay(value); if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); daily_pause = timeOfDay; } key = DAILY_RESUME_KEY; value = (String)configuration.get(key); if (value != null) { timeOfDay = checkTimeOfDay(value); if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); daily_resume = timeOfDay; } } private void configure(JSONObject configuration) throws Exception { // emgine JSONObject engineConfig = (JSONObject)configuration.get("engine"); configureEngine(engineConfig); // broadcasts JSONObject broadcastsConfig = (JSONObject)configuration.get("broadcasts"); if (broadcastsConfig != null) { for (Object broadcastId : broadcastsConfig.keySet()) { JSONObject broadcastConfig = (JSONObject)broadcastsConfig.get(broadcastId); Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) continue; broadcast.configure(broadcastConfig); } } // derived class configureChild(configuration); } /** * Derived class updates itself from given configuration. * @param configuration */ protected void configureChild(JSONObject configuration) { } /** * * status of each broadcast * tttnnn * */ private void getStatus(HttpServletRequest request, PrintWriter out) { purgeStaleBroadcasts(); String broadcastId = request.getParameter("broadcast_id"); if (broadcastId != null) { broadcastId = broadcastId.trim(); if (broadcastId.length() == 0) { out.write("broadcast_id request parameter cannot be empty"); return; } Broadcast broadcast = broadcasts.get(broadcastId); if (broadcast == null) { out.write("No such broadcast"); } else { out.write(broadcast.mkStatusReport()); } return; } else { String tag = engineName + "_status"; out.write("<" + tag + ">\r\n"); out.write("" + startupTimestamp + "\r\n"); // First get a copy of broadcasts, to avoid mutex deadlock. Vector broadcastList = new Vector(); synchronized(broadcasts) { for (String key : broadcasts.keySet()) { broadcastList.add(broadcasts.get(key)); } } // We have released the lock. // Then append status of each broadcast to outBuf. for (Broadcast broadcast : broadcastList) { out.write(broadcast.mkStatusReport() + "\n"); } out.write("\n"); out.write("" + daily_pause + "" + daily_resume + "\n"); out.write(""); } } public int getPendingJobCount() { int readyCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { readyCount += broadcast.getPendingJobCount(); } } return readyCount; } public int getActiveJobCount() { int activeCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { activeCount += broadcast.getActiveJobCount(); } } return activeCount; } public int getCompletedJobCount() { int additionalCompletedJobCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { additionalCompletedJobCount += broadcast.getCompletedJobCount(); } } return completedJobCount + additionalCompletedJobCount; } public void removeBroadcast(String broadcastId) { CommonLogger.activity.info("Removing broadcast " + broadcastId); synchronized(broadcasts) { broadcasts.remove(broadcastId); } } public boolean notInService() { return notInService; } /** * Decode http GET request for broadcast_id value * @param request * @return broadcast_id */ private String getBroadcastId(HttpServletRequest request) { return request.getParameter("broadcast_id"); } /** * Invoked by servlet container when servlet is destroyed. */ public final void destroy() { System.out.println("Destroying " + engineName); // Shutdown threads that periodically purge stale broadcasts. scheduler.shutdownNow(); synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); } } // Destroy dailyClock thread try { dailyClock.terminate(); dailyClock.join(); } catch (InterruptedException e) { // TODO nothing } destroyChild(); super.destroy(); } /** * Indirectly invoked by servlet container during servlet initialization. */ abstract protected void initChild(); /** * Indirectly invoked by servlet container during destruction of servlet. */ abstract protected void destroyChild(); public EngineResources getResources() { return resources; } public void addBroadcast(String broadcastId, Broadcast broadcast) { if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; synchronized (broadcasts) { broadcasts.put(broadcastId, broadcast); } } /** * If broadcast has no id, one will be created for it. * @param broadcast */ public void installBroadcast(Broadcast broadcast) { String broadcastId = broadcast.getBroadcastId(); if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; synchronized (broadcasts) { broadcasts.put(broadcastId, broadcast); } } }