From b8d29b99d3bac7449a6b5dabbcfd09e60c6cd011 Mon Sep 17 00:00:00 2001 From: ymlam Date: Fri, 9 Sep 2022 10:22:18 -0400 Subject: [PATCH] Added daily pause and resume feature. Improved reponse text to pause and resume commands. --- src/main/java/altk/comm/engine/Broadcast.java | 25 ++++-- .../java/altk/comm/engine/CommEngine.java | 83 +++++++++++++++++++ 2 files changed, 101 insertions(+), 7 deletions(-) diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index c7f5c4c..244831c 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -63,6 +63,8 @@ public abstract class Broadcast protected String postbackURL; private Postback postback; public long expireTime; + protected String clock_24hr_resume; + protected String clock_24hr_pause; /** * Sleep time in milliseconds between consecutive job processing (actualliy batch) @@ -1008,8 +1010,10 @@ public abstract class Broadcast break; case SUCCESS: lastPauseCount = 0; + if (out != null) out.write("Broadcast is being PAUSED"); + break; case NO_CHANGE: - if (out != null) out.write("Broadcast is being paused"); + if (out != null) out.write("Broadcast is already RUNNING"); } } @@ -1017,13 +1021,20 @@ public abstract class Broadcast { synchronized (resumeFlag) { - if (serviceThreadsShouldPause()) - { - setState(BroadcastState.RUNNING); - resumeFlag.notifyAll(); - } + StateChangeResult result = setState(BroadcastState.RUNNING); + switch (result.stateChangeStatus) + { + case FORBIDDEN: + if (out != null) out.write("resume not allowed"); + break; + case SUCCESS: + if (out != null) out.write("Broadcast resumed"); + resumeFlag.notifyAll(); + break; + default: + break; + } } - out.write("Broadcast resumed"); } /** * Derived class may make its own Implementation of JobReport diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index 5282254..7a5bbc4 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -5,6 +5,8 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; @@ -77,7 +79,67 @@ public abstract class CommEngine extends HttpServlet protected String runtimeDirPath; protected String confDirPath; + /** Daily resume all broadcasts time. No action if "" */ + protected String clock_24hr_resume; + /** Daily pause all broadcasts time. No action if "" */ + protected String clock_24hr_pause; + private Clock_24hr clock_24hr; + + protected class Clock_24hr extends Thread + { + private boolean threadShouldStop = false; + + public void run() + { + while (!threadShouldStop) + { + long startTime = System.currentTimeMillis();; + + String timeOfDay = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm")); + //System.out.println(String.format("timeOfDay %s, pause %s, resume %s", timeOfDay, clock_24hr_pause, clock_24hr_resume)); + // Check for pause + for (Broadcast broadcast : broadcasts.values()) + { + // Check for pause + if (broadcast.clock_24hr_pause != null && broadcast.clock_24hr_pause.length() > 0) + { + if (timeOfDay.equals(broadcast.clock_24hr_pause)) broadcast.pause(null); + } + else if (clock_24hr_pause.equals(timeOfDay)) broadcast.pause(null); + + // Check for resume + if (broadcast.clock_24hr_resume != null && broadcast.clock_24hr_resume.length() > 0) + { + if (timeOfDay.equals(broadcast.clock_24hr_resume)) broadcast.resume(null); + } + else if (clock_24hr_resume.equals(timeOfDay)) broadcast.resume(null); + } + + // Wakes up every 1/2 minute to provide minute resolution + long currentTime = System.currentTimeMillis(); + long sleepTime = startTime + 30*1000 - currentTime; + if (sleepTime > 0) + { + try + { + Thread.sleep(1000); //sleepTime); + } + catch (Exception e) + { + myLogger.error("Clock_24hr thread caught: " + e.getMessage(), e); + return; + } + } + } + } + + public void terminate() + { + threadShouldStop = true; + } + } + abstract protected Broadcast mkBroadcast(); public CommEngine(String engineName) @@ -185,12 +247,21 @@ public abstract class CommEngine extends HttpServlet // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes String periodStr = config.getProperty("dead_broadcast_viewing_period", Long.toString(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT)); deadBroadcastViewingMinutes = Long.parseLong(periodStr); + clock_24hr_resume = config.getProperty("clock_24hr_resume", ""); + clock_24hr_pause = config.getProperty("clock_24hr_pause", ""); + Thread clock_24hr; + CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); CommonLogger.startup.info(String.format("service thread pool size: %d", getServiceThreadPoolSize())); CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize()); CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); + CommonLogger.activity.info("clock_24hr_resume = " + clock_24hr_resume); + CommonLogger.activity.info("clock_24hr_pause = " + clock_24hr_pause); + + clock_24hr = new Clock_24hr(); + clock_24hr.start(); initChild(); } @@ -531,6 +602,18 @@ public abstract class CommEngine extends HttpServlet broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); } } + + // Destroy clock_24hr thread + try + { + clock_24hr.terminate(); + clock_24hr.join(); + } + catch (InterruptedException e) + { + // TODO nothing + } + destroyChild(); super.destroy(); }