diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 28c3fdc..ed97912 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -19,6 +19,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; +import org.json.simple.JSONObject; import altk.comm.engine.Job.JobStatus; import altk.comm.engine.exception.BroadcastError; @@ -38,6 +39,7 @@ public abstract class Broadcast private static final int SCHEDULER_THREAD_POOL_SIZE = 5; private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id"; private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0; + public final String broadcastType; private String broadcastId; @@ -63,8 +65,8 @@ public abstract class Broadcast protected String postbackURL; private Postback postback; public long expireTime; - protected String clock_24hr_resume; - protected String clock_24hr_pause; + protected String daily_resume = ""; + protected String daily_pause = ""; /** * Sleep time in milliseconds between consecutive job processing (actualliy batch) @@ -238,7 +240,7 @@ public abstract class Broadcast BroadcastState.RUNNING, // User action BroadcastState.COMPLETED, BroadcastState.ABORTING, - BroadcastState.CANCELED, // User action + BroadcastState.CANCELING, // User action BroadcastState.PAUSED, BroadcastState.PURGED // User action )); @@ -888,6 +890,8 @@ public abstract class Broadcast "' ready='" + getPendingJobCount() + "'"); statusBf.append(" active='" + getActiveJobCount() + "'"); statusBf.append(">"); + statusBf.append("" + daily_pause + "\n"); + statusBf.append("" + daily_resume + "\n"); statusBf.append(additionalStatusXML()); statusBf.append(""); String statusReport = statusBf.toString(); @@ -1405,4 +1409,42 @@ public abstract class Broadcast } } + /** + * @return null or configuration in XML + */ + public String getConfigXML() + { + StringBuffer configBuf = new StringBuffer(); + configBuf.append(""); + configBuf.append("<" + CommEngine.DAILY_PAUSE_KEY + ">" + daily_pause + ""); + configBuf.append("<" + CommEngine.DAILY_RESUME_KEY + ">" + daily_resume + ""); + configBuf.append(""); + return configBuf.toString(); + } + + public void configure(JSONObject configuration) throws Exception { + String value, timeOfDay, key; + key = CommEngine.DAILY_PAUSE_KEY; + value = (String)configuration.get(key); + if (value != null) { + timeOfDay = CommEngine.checkTimeOfDay(value); + if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); + daily_pause = timeOfDay; + } + key = CommEngine.DAILY_RESUME_KEY; + value = (String)configuration.get(key); + if (value != null) { + timeOfDay = CommEngine.checkTimeOfDay(value); + if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); + daily_resume = timeOfDay; + } + } + + public JSONObject getConfigJSON() { + Map dataMap = new HashMap(); + dataMap.put(CommEngine.DAILY_PAUSE_KEY, daily_pause); + dataMap.put(CommEngine.DAILY_RESUME_KEY, daily_resume); + return new JSONObject(dataMap); + } + } diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index 7d34b97..43cc163 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -5,7 +5,6 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; -import java.io.StringReader; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.Enumeration; @@ -17,12 +16,14 @@ import java.util.Vector; import java.util.concurrent.ScheduledExecutorService; import javax.servlet.ServletContext; +import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; +import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -32,9 +33,9 @@ import altk.comm.engine.Broadcast.BroadcastState; public abstract class CommEngine extends HttpServlet { - private static final String CLOCK_24HR_PAUSE_KEY = "clock_24hr_pause"; + static final String DAILY_PAUSE_KEY = "daily_pause"; - private static final String CLOCK_24HR_RESUME_KEY = "clock_24hr_resume"; + static final String DAILY_RESUME_KEY = "daily_resume"; static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; @@ -87,13 +88,13 @@ public abstract class CommEngine extends HttpServlet protected String runtimeDirPath; protected String confDirPath; /** Daily resume all broadcasts time. No action if "" */ - protected String clock_24hr_resume; + protected String daily_resume = ""; /** Daily pause all broadcasts time. No action if "" */ - protected String clock_24hr_pause; + protected String daily_pause = ""; - private Clock_24hr clock_24hr; + private DailyClock dailyClock; - protected class Clock_24hr extends Thread + protected class DailyClock extends Thread { private boolean threadShouldStop = false; @@ -104,23 +105,22 @@ public abstract class CommEngine extends HttpServlet long startTime = System.currentTimeMillis();; String timeOfDay = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm")); - //System.out.println(String.format("timeOfDay %s, pause %s, resume %s", timeOfDay, clock_24hr_pause, clock_24hr_resume)); // Check for pause for (Broadcast broadcast : broadcasts.values()) { // Check for pause - if (broadcast.clock_24hr_pause != null && broadcast.clock_24hr_pause.length() > 0) + if (broadcast.daily_pause.length() > 0) { - if (timeOfDay.equals(broadcast.clock_24hr_pause)) broadcast.pause("clock", null); + if (timeOfDay.equals(broadcast.daily_pause)) broadcast.pause("clock", null); } - else if (timeOfDay.equals(clock_24hr_pause)) broadcast.pause("clock", null); + else if (timeOfDay.equals(daily_pause)) broadcast.pause("clock", null); // Check for resume - if (broadcast.clock_24hr_resume != null && broadcast.clock_24hr_resume.length() > 0) + if (broadcast.daily_resume.length() > 0) { - if (timeOfDay.equals(broadcast.clock_24hr_resume)) broadcast.resume("clock", null); + if (timeOfDay.equals(broadcast.daily_resume)) broadcast.resume("clock", null); } - else if (timeOfDay.equals(clock_24hr_resume)) broadcast.resume("clock", null); + else if (timeOfDay.equals(daily_resume)) broadcast.resume("clock", null); } // Wakes up every 1/2 minute to provide minute resolution @@ -134,7 +134,7 @@ public abstract class CommEngine extends HttpServlet } catch (Exception e) { - myLogger.error("Clock_24hr thread caught: " + e.getMessage(), e); + myLogger.error("DailyClock thread caught: " + e.getMessage(), e); return; } } @@ -180,8 +180,9 @@ public abstract class CommEngine extends HttpServlet /** * Invoked by servlet container during initialization of servlet. + * @throws ServletException */ - public final void init() + public final void init() throws ServletException { // check init parameters ServletContext servletContext = getServletContext(); @@ -254,9 +255,21 @@ public abstract class CommEngine extends HttpServlet // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes String periodStr = config.getProperty("dead_broadcast_viewing_period", Long.toString(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT)); deadBroadcastViewingMinutes = Long.parseLong(periodStr); - clock_24hr_resume = config.getProperty(CLOCK_24HR_RESUME_KEY, ""); - clock_24hr_pause = config.getProperty(CLOCK_24HR_PAUSE_KEY, ""); - Thread clock_24hr; + + String timeOfDay, timeOfDayStr, key; + key = DAILY_RESUME_KEY; + timeOfDayStr = config.getProperty(key , ""); + timeOfDay = checkTimeOfDay(timeOfDayStr); + if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr)); + daily_resume = timeOfDay; + + key = DAILY_PAUSE_KEY; + timeOfDayStr = config.getProperty(key, ""); + timeOfDay = checkTimeOfDay(timeOfDayStr); + if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr)); + daily_pause = timeOfDay; + + Thread dailyClock; CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); @@ -264,15 +277,15 @@ public abstract class CommEngine extends HttpServlet CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize()); CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); - CommonLogger.activity.info("clock_24hr_resume = " + clock_24hr_resume); - CommonLogger.activity.info("clock_24hr_pause = " + clock_24hr_pause); + CommonLogger.activity.info("daily_resume = " + daily_resume); + CommonLogger.activity.info("daily_pause = " + daily_pause); - clock_24hr = new Clock_24hr(); - clock_24hr.start(); + dailyClock = new DailyClock(); + dailyClock.start(); initChild(); } - + public int getPauseThreshold() { return getPauseThreshold(config); @@ -424,6 +437,10 @@ public abstract class CommEngine extends HttpServlet { resumeBroadcast(request, out); } + else if (get.equalsIgnoreCase("configuration")) + { + getConfiguration(request, out); + } else if (get.equalsIgnoreCase("configure")) { configure(request, out); @@ -435,7 +452,43 @@ public abstract class CommEngine extends HttpServlet out.close(); } - private void cancelBroadcast(HttpServletRequest request, PrintWriter out) + /** + * Writes configuration in JSON string to out + * @param request + * @param out + */ + protected void getConfiguration(HttpServletRequest request, PrintWriter out) + { + JSONObject configuration = getConfigJSON(); + out.print(configuration); + } + + private JSONObject getConfigJSON() + { + Map dataMap = new HashMap(); + + // engine configuration + Map EngineMap = new HashMap(); + dataMap.put("engine", EngineMap); + EngineMap.put(DAILY_PAUSE_KEY, daily_pause); + EngineMap.put(DAILY_RESUME_KEY, daily_resume); + + // broadcast configuration + Map broadcastsMap = new HashMap(); + synchronized (broadcasts) { + for (String broadcastId : broadcasts.keySet()) + { + Broadcast broadcast = broadcasts.get(broadcastId); + if (broadcast.getState().isFinal) continue; + JSONObject configJSON = broadcast.getConfigJSON(); + broadcastsMap.put(broadcastId, configJSON); + } + } + if (broadcastsMap.size() > 0) dataMap.put("broadcasts", broadcastsMap); + return new JSONObject(dataMap); + } + + private void cancelBroadcast(HttpServletRequest request, PrintWriter out) { // Get broadcastId from request String broadcastId = getBroadcastId(request); @@ -477,35 +530,78 @@ public abstract class CommEngine extends HttpServlet broadcast.resume(reason, out); } + /** + * Check if timeOfDay is of the form "HH::mm" + * @param timeOfDay + * @return timeOfDay if valid, otherwise null + */ + protected static String checkTimeOfDay(String timeOfDay) { + timeOfDay = timeOfDay.trim(); + if (timeOfDay.length() == 0) return timeOfDay; + if (timeOfDay.length() != 5) return null; + return timeOfDay; + } + + /** + * Writes error message to out. Otherwise writes nothing to out. + * @param request + * @param out + */ protected void configure(HttpServletRequest request, PrintWriter out) { - String jsonData = request.getParameter("data"); - System.out.println("jsonData: " + jsonData); - JSONParser parser = new JSONParser(); + JSONObject currConfig = getConfigJSON(); + + String jsonString = request.getParameter("data"); try { - JSONObject jsonObject = (JSONObject) parser.parse(new StringReader(jsonData)); - System.out.println(jsonObject); - - String valueStr; - - // clock_24hr_pause - valueStr = (String) jsonObject.get(CLOCK_24HR_PAUSE_KEY); - if (valueStr != null && valueStr.length() == 5) { - clock_24hr_pause = valueStr; - out.println("clock_24hr_pause updated."); - } - - // clock_24hr_resume - valueStr = (String) jsonObject.get(CLOCK_24HR_RESUME_KEY); - if (valueStr != null && valueStr.length() == 5) { - clock_24hr_resume = valueStr; - out.println("clock_24hr_resume updated."); - } - + JSONParser parser = new JSONParser(); + JSONObject configuration = (JSONObject) parser.parse(jsonString); + configure(configuration); } catch (Exception e) { - out.println("Error - " + e.getMessage()); myLogger.error(e); + out.write("Error - " + e.getMessage()); + // restore to original confiuration + try { + configure(currConfig); + } catch (Exception e1) { + out.write("\nInternal error in restoring original configuration: " + e1.getMessage()); + } } } + + private void configureEngine(JSONObject configuration) throws Exception { + String value, timeOfDay, key; + key = DAILY_PAUSE_KEY; + value = (String)configuration.get(key); + if (value != null) { + timeOfDay = checkTimeOfDay(value); + if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); + daily_pause = timeOfDay; + } + key = DAILY_RESUME_KEY; + value = (String)configuration.get(key); + if (value != null) { + timeOfDay = checkTimeOfDay(value); + if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); + daily_resume = timeOfDay; + } + } + + private void configure(JSONObject configuration) throws Exception { + // emgine + JSONObject engineConfig = (JSONObject)configuration.get("engine"); + configureEngine(engineConfig); + + // broadcasts + JSONObject broadcastsConfig = (JSONObject)configuration.get("broadcasts"); + if (broadcastsConfig != null) { + for (Object broadcastId : broadcastsConfig.keySet()) + { + JSONObject broadcastConfig = (JSONObject)broadcastsConfig.get(broadcastId); + Broadcast broadcast = broadcasts.get(broadcastId); + if (broadcast == null) continue; + broadcast.configure(broadcastConfig); + } + } + } /** * @@ -557,13 +653,9 @@ public abstract class CommEngine extends HttpServlet // Then append status of each broadcast to outBuf. for (Broadcast broadcast : broadcastList) { - out.write(broadcast.mkStatusReport()); + out.write(broadcast.mkStatusReport() + "\n"); } out.write("\n"); - out.write("\n"); - out.write("<" + CLOCK_24HR_PAUSE_KEY + ">" + clock_24hr_pause + "\n"); - out.write("<" + CLOCK_24HR_RESUME_KEY + ">" + clock_24hr_resume + "\n"); - out.write("\n"); out.write(""); } } @@ -650,11 +742,11 @@ public abstract class CommEngine extends HttpServlet } } - // Destroy clock_24hr thread + // Destroy dailyClock thread try { - clock_24hr.terminate(); - clock_24hr.join(); + dailyClock.terminate(); + dailyClock.join(); } catch (InterruptedException e) {