diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 50775a5..47dc97d 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -2,6 +2,7 @@ package altk.comm.engine; import java.io.IOException; import java.io.PrintWriter; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -39,6 +40,8 @@ public abstract class Broadcast private static final int SCHEDULER_THREAD_POOL_SIZE = 5; private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id"; private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0; + static final String DAILY_STOP_KEY = "daily_stop"; + static final String DAILY_START_KEY = "daily_start"; public final String broadcastType; private String broadcastId; @@ -65,8 +68,8 @@ public abstract class Broadcast protected String postbackURL; private Postback postback; public long expireTime; - protected String daily_resume = ""; - protected String daily_pause = ""; + protected String daily_start = ""; + protected String daily_stop = ""; /** * Sleep time in milliseconds between consecutive job processing (actualliy batch) @@ -478,11 +481,15 @@ public abstract class Broadcast pauseThreshold = commEngine.getPauseThreshold(); try { + // Check validity of operating hours parameters + boolean notInService = commEngine.notInService(); decode(request, notInService); // Now that have decoded the id of this broadcast, // ask CommEngine to install it with its id. commEngine.installBroadcast(this); + setOperatingHours(DAILY_START_KEY, daily_start); + setOperatingHours(DAILY_STOP_KEY, daily_stop); if (notInService) { @@ -891,8 +898,8 @@ public abstract class Broadcast "' ready='" + getPendingJobCount() + "'"); statusBf.append(" active='" + getActiveJobCount() + "'"); statusBf.append(">\n"); - statusBf.append("" + daily_pause + ""); - statusBf.append("" + daily_resume + "\n"); + statusBf.append("" + daily_stop + ""); + statusBf.append("" + daily_start + "\n"); statusBf.append(additionalStatusXML()); statusBf.append(""); String statusReport = statusBf.toString(); @@ -1018,6 +1025,11 @@ public abstract class Broadcast protected void resume(String reason, PrintWriter out) { + if (!withinOperatingHours()) + { + if (out != null) out.write("Cannot resume outside operating hours"); + return; + } synchronized (resumeFlag) { StateChangeResult result = setState(BroadcastState.RUNNING, reason, null); @@ -1064,7 +1076,14 @@ public abstract class Broadcast changeStateTime = System.currentTimeMillis(); if (!serviceThreadsShouldStop()) { - setState(BroadcastState.RUNNING); + if (withinOperatingHours()) + { + setState(BroadcastState.RUNNING); + } + else + { + setState(BroadcastState.PAUSED, "clock", null); + } // Start the dispatcher threads for (Service thread : serviceThreadPool) @@ -1088,6 +1107,25 @@ public abstract class Broadcast } } + private boolean withinOperatingHours() { + int dailyStartMin = convert2Min(daily_start); + int dailyStopMin = convert2Min(daily_stop); + // Ensure daily stop > daily start + if (dailyStopMin < dailyStartMin) dailyStopMin += 24 * 60; + LocalTime now = LocalTime.now(); + int nowMin = now.getHour() * 60 + now.getMinute(); + if (nowMin < dailyStartMin) nowMin += 24 * 60; + boolean within = nowMin >= dailyStartMin && nowMin < dailyStopMin; + return within; + } + + private int convert2Min(String hhmm) { + String[] parts = hhmm.split(":"); + int hh = Integer.parseInt(parts[0]); + int mm = Integer.parseInt(parts[1]); + return hh * 60 + mm; + } + /** * Derived class should wait for end of service before returning. * At this point all service threads have already ended. If the derived @@ -1401,35 +1439,60 @@ public abstract class Broadcast { StringBuffer configBuf = new StringBuffer(); configBuf.append(""); - configBuf.append("<" + CommEngine.DAILY_PAUSE_KEY + ">" + daily_pause + ""); - configBuf.append("<" + CommEngine.DAILY_RESUME_KEY + ">" + daily_resume + ""); + configBuf.append("<" + DAILY_STOP_KEY + ">" + daily_stop + ""); + configBuf.append("<" + DAILY_START_KEY + ">" + daily_start + ""); configBuf.append(""); return configBuf.toString(); } public void configure(JSONObject configuration) throws Exception { - String value, timeOfDay, key; - key = CommEngine.DAILY_PAUSE_KEY; - value = (String)configuration.get(key); - if (value != null) { - timeOfDay = CommEngine.checkTimeOfDay(value); - if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); - daily_pause = timeOfDay; + boolean timeChanged = false; + for (String key : new String[] {DAILY_STOP_KEY, DAILY_START_KEY}) { + String value = (String)configuration.get(key); + if (value != null) { + timeChanged = timeChanged || setOperatingHours(key, value); + } } - key = CommEngine.DAILY_RESUME_KEY; - value = (String)configuration.get(key); - if (value != null) { - timeOfDay = CommEngine.checkTimeOfDay(value); - if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); - daily_resume = timeOfDay; + if (timeChanged) enforceOperationHours(); + } + + void enforceOperationHours() { + if (state == BroadcastState.ABORTED) return; + if (withinOperatingHours()) { + resume("clock", null); + } else { + pause("clock", null); + } + } + + /** + * Sets timeParam to value + * @param timeParam + * @param value + * @return false if no change + */ + private boolean setOperatingHours(String timeParam, String value) { + String timeOfDay = CommEngine.checkTimeOfDay(value); + if (timeOfDay == null) throw new RuntimeException(String.format("Invalid value for %s: %s", timeParam, value)); + switch (timeParam) { + case DAILY_STOP_KEY: + if (daily_stop == timeOfDay) return false; + daily_stop = timeOfDay; + return true; + case DAILY_START_KEY: + if (daily_start == timeOfDay) return false; + daily_start = timeOfDay; + return true; + default: + throw new RuntimeException("Unknown parameter name: " + timeParam); } } @SuppressWarnings("unchecked") public JSONObject getConfigJSON() { JSONObject dataMap = new JSONObject(); - dataMap.put(CommEngine.DAILY_PAUSE_KEY, daily_pause); - dataMap.put(CommEngine.DAILY_RESUME_KEY, daily_resume); + dataMap.put(DAILY_START_KEY, daily_start); + dataMap.put(DAILY_STOP_KEY, daily_stop); childAddConfigJSON(dataMap); return dataMap; } diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index 69cf12d..292e482 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -5,8 +5,6 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; -import java.time.LocalTime; -import java.time.format.DateTimeFormatter; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; @@ -33,11 +31,6 @@ import altk.comm.engine.Broadcast.BroadcastState; @SuppressWarnings("serial") public abstract class CommEngine extends HttpServlet { - - static final String DAILY_PAUSE_KEY = "daily_pause"; - - static final String DAILY_RESUME_KEY = "daily_resume"; - static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60; @@ -88,10 +81,6 @@ public abstract class CommEngine extends HttpServlet protected String runtimeDirPath; protected String confDirPath; - /** Daily resume all broadcasts time. No action if "" */ - protected String daily_resume = ""; - /** Daily pause all broadcasts time. No action if "" */ - protected String daily_pause = ""; private DailyClock dailyClock; @@ -103,27 +92,13 @@ public abstract class CommEngine extends HttpServlet { while (!threadShouldStop) { - String timeOfDay = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm")); - // Check for pause for (Broadcast broadcast : broadcasts.values()) { - // Check for pause - if (broadcast.daily_pause.length() > 0) - { - if (timeOfDay.equals(broadcast.daily_pause)) broadcast.pause("clock", null); - } - else if (timeOfDay.equals(daily_pause)) broadcast.pause("clock", null); - - // Check for resume - if (broadcast.daily_resume.length() > 0) - { - if (timeOfDay.equals(broadcast.daily_resume)) broadcast.resume("clock", null); - } - else if (timeOfDay.equals(daily_resume)) broadcast.resume("clock", null); + broadcast.enforceOperationHours(); } long currentTime = System.currentTimeMillis(); - long sleepTime = 60000 + 30000 - currentTime % 60000; + long sleepTime = 60000 - currentTime % 60000; if (sleepTime > 0) { try @@ -249,20 +224,7 @@ public abstract class CommEngine extends HttpServlet notInService = true; return; } - - String timeOfDay, timeOfDayStr, key; - key = DAILY_RESUME_KEY; - timeOfDayStr = config.getProperty(key , ""); - timeOfDay = checkTimeOfDay(timeOfDayStr); - if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr)); - daily_resume = timeOfDay; - - key = DAILY_PAUSE_KEY; - timeOfDayStr = config.getProperty(key, ""); - timeOfDay = checkTimeOfDay(timeOfDayStr); - if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr)); - daily_pause = timeOfDay; - + Thread dailyClock; CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); @@ -271,8 +233,6 @@ public abstract class CommEngine extends HttpServlet CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize()); CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); - CommonLogger.activity.info("daily_resume = " + daily_resume); - CommonLogger.activity.info("daily_pause = " + daily_pause); dailyClock = new DailyClock(); dailyClock.start(); @@ -472,13 +432,7 @@ public abstract class CommEngine extends HttpServlet private JSONObject getConfigJSON() { JSONObject config = new JSONObject(); - - // engine configuration - JSONObject EngineConfig = new JSONObject(); - config.put("engine", EngineConfig); - EngineConfig.put(DAILY_PAUSE_KEY, daily_pause); - EngineConfig.put(DAILY_RESUME_KEY, daily_resume); - + // broadcast configuration JSONObject broadcastsConfig = new JSONObject(); synchronized (broadcasts) { @@ -553,7 +507,6 @@ public abstract class CommEngine extends HttpServlet */ protected static String checkTimeOfDay(String timeOfDay) { timeOfDay = timeOfDay.trim(); - if (timeOfDay.length() == 0) return timeOfDay; // pattern hh:mm Pattern pattern = Pattern.compile("^(\\d\\d):[0-5]\\d$"); Matcher matcher = pattern.matcher(timeOfDay); @@ -589,30 +542,8 @@ public abstract class CommEngine extends HttpServlet } } } - - private void configureEngine(JSONObject configuration) throws Exception { - String value, timeOfDay, key; - key = DAILY_PAUSE_KEY; - value = (String)configuration.get(key); - if (value != null) { - timeOfDay = checkTimeOfDay(value); - if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); - daily_pause = timeOfDay; - } - key = DAILY_RESUME_KEY; - value = (String)configuration.get(key); - if (value != null) { - timeOfDay = checkTimeOfDay(value); - if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value)); - daily_resume = timeOfDay; - } - } - + private void configure(JSONObject configuration) throws Exception { - // emgine - JSONObject engineConfig = (JSONObject)configuration.get("engine"); - configureEngine(engineConfig); - // broadcasts JSONObject broadcastsConfig = (JSONObject)configuration.get("broadcasts"); if (broadcastsConfig != null) { @@ -689,7 +620,6 @@ public abstract class CommEngine extends HttpServlet out.write(broadcast.mkStatusReport() + "\n"); } out.write("\n"); - out.write("" + daily_pause + "" + daily_resume + "\n"); out.write(""); } } diff --git a/src/main/java/altk/comm/engine/XMLSAXBroadcast.java b/src/main/java/altk/comm/engine/XMLSAXBroadcast.java index 62f2098..73e137c 100644 --- a/src/main/java/altk/comm/engine/XMLSAXBroadcast.java +++ b/src/main/java/altk/comm/engine/XMLSAXBroadcast.java @@ -113,13 +113,13 @@ public abstract class XMLSAXBroadcast extends Broadcast myLogger.debug("expire time adjusted to be " + expireTime); } } - else if (qName.equals("daily_pause")) + else if (qName.equals(DAILY_STOP_KEY)) { - daily_pause = getTrimmedText(); + daily_stop = getTrimmedText(); } - else if (qName.equals("daily_resume")) + else if (qName.equals(DAILY_START_KEY)) { - daily_resume = getTrimmedText(); + daily_start = getTrimmedText(); } else if (inRecipientProp) {