Browse Source

Replace daily pause/resume with daily operating hours

tags/1.0.16
ymlam 3 years ago
parent
commit
b26988bc59
3 changed files with 94 additions and 101 deletions
  1. +85
    -22
      src/main/java/altk/comm/engine/Broadcast.java
  2. +5
    -75
      src/main/java/altk/comm/engine/CommEngine.java
  3. +4
    -4
      src/main/java/altk/comm/engine/XMLSAXBroadcast.java

+ 85
- 22
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -2,6 +2,7 @@ package altk.comm.engine;


import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.time.LocalTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@@ -39,6 +40,8 @@ public abstract class Broadcast
private static final int SCHEDULER_THREAD_POOL_SIZE = 5; private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id"; private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id";
private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0; private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0;
static final String DAILY_STOP_KEY = "daily_stop";
static final String DAILY_START_KEY = "daily_start";


public final String broadcastType; public final String broadcastType;
private String broadcastId; private String broadcastId;
@@ -65,8 +68,8 @@ public abstract class Broadcast
protected String postbackURL; protected String postbackURL;
private Postback postback; private Postback postback;
public long expireTime; public long expireTime;
protected String daily_resume = "";
protected String daily_pause = "";
protected String daily_start = "";
protected String daily_stop = "";
/** /**
* Sleep time in milliseconds between consecutive job processing (actualliy batch) * Sleep time in milliseconds between consecutive job processing (actualliy batch)
@@ -478,11 +481,15 @@ public abstract class Broadcast
pauseThreshold = commEngine.getPauseThreshold(); pauseThreshold = commEngine.getPauseThreshold();
try try
{ {
// Check validity of operating hours parameters
boolean notInService = commEngine.notInService(); boolean notInService = commEngine.notInService();
decode(request, notInService); decode(request, notInService);
// Now that have decoded the id of this broadcast, // Now that have decoded the id of this broadcast,
// ask CommEngine to install it with its id. // ask CommEngine to install it with its id.
commEngine.installBroadcast(this); commEngine.installBroadcast(this);
setOperatingHours(DAILY_START_KEY, daily_start);
setOperatingHours(DAILY_STOP_KEY, daily_stop);
if (notInService) if (notInService)
{ {
@@ -891,8 +898,8 @@ public abstract class Broadcast
"' ready='" + getPendingJobCount() + "'"); "' ready='" + getPendingJobCount() + "'");
statusBf.append(" active='" + getActiveJobCount() + "'"); statusBf.append(" active='" + getActiveJobCount() + "'");
statusBf.append("></job_summary>\n"); statusBf.append("></job_summary>\n");
statusBf.append("<daily_pause>" + daily_pause + "</daily_pause>");
statusBf.append("<daily_resume>" + daily_resume + "</daily_resume>\n");
statusBf.append("<daily_stop>" + daily_stop + "</daily_stop>");
statusBf.append("<daily_start>" + daily_start + "</daily_start>\n");
statusBf.append(additionalStatusXML()); statusBf.append(additionalStatusXML());
statusBf.append("</" + topLevelTag + ">"); statusBf.append("</" + topLevelTag + ">");
String statusReport = statusBf.toString(); String statusReport = statusBf.toString();
@@ -1018,6 +1025,11 @@ public abstract class Broadcast
protected void resume(String reason, PrintWriter out) protected void resume(String reason, PrintWriter out)
{ {
if (!withinOperatingHours())
{
if (out != null) out.write("Cannot resume outside operating hours");
return;
}
synchronized (resumeFlag) synchronized (resumeFlag)
{ {
StateChangeResult result = setState(BroadcastState.RUNNING, reason, null); StateChangeResult result = setState(BroadcastState.RUNNING, reason, null);
@@ -1064,7 +1076,14 @@ public abstract class Broadcast
changeStateTime = System.currentTimeMillis(); changeStateTime = System.currentTimeMillis();
if (!serviceThreadsShouldStop()) if (!serviceThreadsShouldStop())
{ {
setState(BroadcastState.RUNNING);
if (withinOperatingHours())
{
setState(BroadcastState.RUNNING);
}
else
{
setState(BroadcastState.PAUSED, "clock", null);
}
// Start the dispatcher threads // Start the dispatcher threads
for (Service thread : serviceThreadPool) for (Service thread : serviceThreadPool)
@@ -1088,6 +1107,25 @@ public abstract class Broadcast
} }
} }
private boolean withinOperatingHours() {
int dailyStartMin = convert2Min(daily_start);
int dailyStopMin = convert2Min(daily_stop);
// Ensure daily stop > daily start
if (dailyStopMin < dailyStartMin) dailyStopMin += 24 * 60;
LocalTime now = LocalTime.now();
int nowMin = now.getHour() * 60 + now.getMinute();
if (nowMin < dailyStartMin) nowMin += 24 * 60;
boolean within = nowMin >= dailyStartMin && nowMin < dailyStopMin;
return within;
}

private int convert2Min(String hhmm) {
String[] parts = hhmm.split(":");
int hh = Integer.parseInt(parts[0]);
int mm = Integer.parseInt(parts[1]);
return hh * 60 + mm;
}

/** /**
* Derived class should wait for end of service before returning. * Derived class should wait for end of service before returning.
* At this point all service threads have already ended. If the derived * At this point all service threads have already ended. If the derived
@@ -1401,35 +1439,60 @@ public abstract class Broadcast
{ {
StringBuffer configBuf = new StringBuffer(); StringBuffer configBuf = new StringBuffer();
configBuf.append("<broadcast_configuration broadcast_id='" + broadcastId + "'>"); configBuf.append("<broadcast_configuration broadcast_id='" + broadcastId + "'>");
configBuf.append("<" + CommEngine.DAILY_PAUSE_KEY + ">" + daily_pause + "</" + CommEngine.DAILY_PAUSE_KEY + ">");
configBuf.append("<" + CommEngine.DAILY_RESUME_KEY + ">" + daily_resume + "</" + CommEngine.DAILY_RESUME_KEY + ">");
configBuf.append("<" + DAILY_STOP_KEY + ">" + daily_stop + "</" + DAILY_STOP_KEY + ">");
configBuf.append("<" + DAILY_START_KEY + ">" + daily_start + "</" + DAILY_START_KEY + ">");
configBuf.append("</broadcast_configuration>"); configBuf.append("</broadcast_configuration>");
return configBuf.toString(); return configBuf.toString();
} }


public void configure(JSONObject configuration) throws Exception { public void configure(JSONObject configuration) throws Exception {
String value, timeOfDay, key;
key = CommEngine.DAILY_PAUSE_KEY;
value = (String)configuration.get(key);
if (value != null) {
timeOfDay = CommEngine.checkTimeOfDay(value);
if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value));
daily_pause = timeOfDay;
boolean timeChanged = false;
for (String key : new String[] {DAILY_STOP_KEY, DAILY_START_KEY}) {
String value = (String)configuration.get(key);
if (value != null) {
timeChanged = timeChanged || setOperatingHours(key, value);
}
} }
key = CommEngine.DAILY_RESUME_KEY;
value = (String)configuration.get(key);
if (value != null) {
timeOfDay = CommEngine.checkTimeOfDay(value);
if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value));
daily_resume = timeOfDay;
if (timeChanged) enforceOperationHours();
}

void enforceOperationHours() {
if (state == BroadcastState.ABORTED) return;
if (withinOperatingHours()) {
resume("clock", null);
} else {
pause("clock", null);
}
}

/**
* Sets timeParam to value
* @param timeParam
* @param value
* @return false if no change
*/
private boolean setOperatingHours(String timeParam, String value) {
String timeOfDay = CommEngine.checkTimeOfDay(value);
if (timeOfDay == null) throw new RuntimeException(String.format("Invalid value for %s: %s", timeParam, value));
switch (timeParam) {
case DAILY_STOP_KEY:
if (daily_stop == timeOfDay) return false;
daily_stop = timeOfDay;
return true;
case DAILY_START_KEY:
if (daily_start == timeOfDay) return false;
daily_start = timeOfDay;
return true;
default:
throw new RuntimeException("Unknown parameter name: " + timeParam);
} }
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public JSONObject getConfigJSON() { public JSONObject getConfigJSON() {
JSONObject dataMap = new JSONObject(); JSONObject dataMap = new JSONObject();
dataMap.put(CommEngine.DAILY_PAUSE_KEY, daily_pause);
dataMap.put(CommEngine.DAILY_RESUME_KEY, daily_resume);
dataMap.put(DAILY_START_KEY, daily_start);
dataMap.put(DAILY_STOP_KEY, daily_stop);
childAddConfigJSON(dataMap); childAddConfigJSON(dataMap);
return dataMap; return dataMap;
} }


+ 5
- 75
src/main/java/altk/comm/engine/CommEngine.java View File

@@ -5,8 +5,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@@ -33,11 +31,6 @@ import altk.comm.engine.Broadcast.BroadcastState;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public abstract class CommEngine extends HttpServlet public abstract class CommEngine extends HttpServlet
{ {

static final String DAILY_PAUSE_KEY = "daily_pause";

static final String DAILY_RESUME_KEY = "daily_resume";

static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request";


private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60; private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60;
@@ -88,10 +81,6 @@ public abstract class CommEngine extends HttpServlet


protected String runtimeDirPath; protected String runtimeDirPath;
protected String confDirPath; protected String confDirPath;
/** Daily resume all broadcasts time. No action if "" */
protected String daily_resume = "";
/** Daily pause all broadcasts time. No action if "" */
protected String daily_pause = "";


private DailyClock dailyClock; private DailyClock dailyClock;
@@ -103,27 +92,13 @@ public abstract class CommEngine extends HttpServlet
{ {
while (!threadShouldStop) while (!threadShouldStop)
{ {
String timeOfDay = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm"));
// Check for pause
for (Broadcast broadcast : broadcasts.values()) for (Broadcast broadcast : broadcasts.values())
{ {
// Check for pause
if (broadcast.daily_pause.length() > 0)
{
if (timeOfDay.equals(broadcast.daily_pause)) broadcast.pause("clock", null);
}
else if (timeOfDay.equals(daily_pause)) broadcast.pause("clock", null);

// Check for resume
if (broadcast.daily_resume.length() > 0)
{
if (timeOfDay.equals(broadcast.daily_resume)) broadcast.resume("clock", null);
}
else if (timeOfDay.equals(daily_resume)) broadcast.resume("clock", null);
broadcast.enforceOperationHours();
} }


long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long sleepTime = 60000 + 30000 - currentTime % 60000;
long sleepTime = 60000 - currentTime % 60000;
if (sleepTime > 0) if (sleepTime > 0)
{ {
try try
@@ -249,20 +224,7 @@ public abstract class CommEngine extends HttpServlet
notInService = true; notInService = true;
return; return;
} }
String timeOfDay, timeOfDayStr, key;
key = DAILY_RESUME_KEY;
timeOfDayStr = config.getProperty(key , "");
timeOfDay = checkTimeOfDay(timeOfDayStr);
if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr));
daily_resume = timeOfDay;
key = DAILY_PAUSE_KEY;
timeOfDayStr = config.getProperty(key, "");
timeOfDay = checkTimeOfDay(timeOfDayStr);
if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr));
daily_pause = timeOfDay;
Thread dailyClock; Thread dailyClock;
CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes));
@@ -271,8 +233,6 @@ public abstract class CommEngine extends HttpServlet
CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize()); CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize());
CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize());
CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize());
CommonLogger.activity.info("daily_resume = " + daily_resume);
CommonLogger.activity.info("daily_pause = " + daily_pause);
dailyClock = new DailyClock(); dailyClock = new DailyClock();
dailyClock.start(); dailyClock.start();
@@ -472,13 +432,7 @@ public abstract class CommEngine extends HttpServlet
private JSONObject getConfigJSON() private JSONObject getConfigJSON()
{ {
JSONObject config = new JSONObject(); JSONObject config = new JSONObject();
// engine configuration
JSONObject EngineConfig = new JSONObject();
config.put("engine", EngineConfig);
EngineConfig.put(DAILY_PAUSE_KEY, daily_pause);
EngineConfig.put(DAILY_RESUME_KEY, daily_resume);
// broadcast configuration // broadcast configuration
JSONObject broadcastsConfig = new JSONObject(); JSONObject broadcastsConfig = new JSONObject();
synchronized (broadcasts) { synchronized (broadcasts) {
@@ -553,7 +507,6 @@ public abstract class CommEngine extends HttpServlet
*/ */
protected static String checkTimeOfDay(String timeOfDay) { protected static String checkTimeOfDay(String timeOfDay) {
timeOfDay = timeOfDay.trim(); timeOfDay = timeOfDay.trim();
if (timeOfDay.length() == 0) return timeOfDay;
// pattern hh:mm // pattern hh:mm
Pattern pattern = Pattern.compile("^(\\d\\d):[0-5]\\d$"); Pattern pattern = Pattern.compile("^(\\d\\d):[0-5]\\d$");
Matcher matcher = pattern.matcher(timeOfDay); Matcher matcher = pattern.matcher(timeOfDay);
@@ -589,30 +542,8 @@ public abstract class CommEngine extends HttpServlet
} }
} }
} }
private void configureEngine(JSONObject configuration) throws Exception {
String value, timeOfDay, key;
key = DAILY_PAUSE_KEY;
value = (String)configuration.get(key);
if (value != null) {
timeOfDay = checkTimeOfDay(value);
if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value));
daily_pause = timeOfDay;
}
key = DAILY_RESUME_KEY;
value = (String)configuration.get(key);
if (value != null) {
timeOfDay = checkTimeOfDay(value);
if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value));
daily_resume = timeOfDay;
}
}
private void configure(JSONObject configuration) throws Exception { private void configure(JSONObject configuration) throws Exception {
// emgine
JSONObject engineConfig = (JSONObject)configuration.get("engine");
configureEngine(engineConfig);
// broadcasts // broadcasts
JSONObject broadcastsConfig = (JSONObject)configuration.get("broadcasts"); JSONObject broadcastsConfig = (JSONObject)configuration.get("broadcasts");
if (broadcastsConfig != null) { if (broadcastsConfig != null) {
@@ -689,7 +620,6 @@ public abstract class CommEngine extends HttpServlet
out.write(broadcast.mkStatusReport() + "\n"); out.write(broadcast.mkStatusReport() + "\n");
} }
out.write("<job_summary completed='" + getCompletedJobCount() + "' pending='" + getPendingJobCount() + "' active='" + getActiveJobCount() + "'/>\n"); out.write("<job_summary completed='" + getCompletedJobCount() + "' pending='" + getPendingJobCount() + "' active='" + getActiveJobCount() + "'/>\n");
out.write("<engine_clock><daily_pause>" + daily_pause + "</daily_pause><daily_resume>" + daily_resume + "</daily_resume></engine_clock>\n");
out.write("</" + tag + ">"); out.write("</" + tag + ">");
} }
} }


+ 4
- 4
src/main/java/altk/comm/engine/XMLSAXBroadcast.java View File

@@ -113,13 +113,13 @@ public abstract class XMLSAXBroadcast extends Broadcast
myLogger.debug("expire time adjusted to be " + expireTime); myLogger.debug("expire time adjusted to be " + expireTime);
} }
} }
else if (qName.equals("daily_pause"))
else if (qName.equals(DAILY_STOP_KEY))
{ {
daily_pause = getTrimmedText();
daily_stop = getTrimmedText();
} }
else if (qName.equals("daily_resume"))
else if (qName.equals(DAILY_START_KEY))
{ {
daily_resume = getTrimmedText();
daily_start = getTrimmedText();
} }
else if (inRecipientProp) else if (inRecipientProp)
{ {


Loading…
Cancel
Save