Browse Source

Added daily pause and resume feature.

Improved reponse text to pause and resume commands.
tags/1.0.13
ymlam 3 years ago
parent
commit
b8d29b99d3
2 changed files with 101 additions and 7 deletions
  1. +18
    -7
      src/main/java/altk/comm/engine/Broadcast.java
  2. +83
    -0
      src/main/java/altk/comm/engine/CommEngine.java

+ 18
- 7
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -63,6 +63,8 @@ public abstract class Broadcast
protected String postbackURL; protected String postbackURL;
private Postback postback; private Postback postback;
public long expireTime; public long expireTime;
protected String clock_24hr_resume;
protected String clock_24hr_pause;
/** /**
* Sleep time in milliseconds between consecutive job processing (actualliy batch) * Sleep time in milliseconds between consecutive job processing (actualliy batch)
@@ -1008,8 +1010,10 @@ public abstract class Broadcast
break; break;
case SUCCESS: case SUCCESS:
lastPauseCount = 0; lastPauseCount = 0;
if (out != null) out.write("Broadcast is being PAUSED");
break;
case NO_CHANGE: case NO_CHANGE:
if (out != null) out.write("Broadcast is being paused");
if (out != null) out.write("Broadcast is already RUNNING");
} }
} }
@@ -1017,13 +1021,20 @@ public abstract class Broadcast
{ {
synchronized (resumeFlag) synchronized (resumeFlag)
{ {
if (serviceThreadsShouldPause())
{
setState(BroadcastState.RUNNING);
resumeFlag.notifyAll();
}
StateChangeResult result = setState(BroadcastState.RUNNING);
switch (result.stateChangeStatus)
{
case FORBIDDEN:
if (out != null) out.write("resume not allowed");
break;
case SUCCESS:
if (out != null) out.write("Broadcast resumed");
resumeFlag.notifyAll();
break;
default:
break;
}
} }
out.write("Broadcast resumed");
} }
/** /**
* Derived class may make its own Implementation of JobReport * Derived class may make its own Implementation of JobReport


+ 83
- 0
src/main/java/altk/comm/engine/CommEngine.java View File

@@ -5,6 +5,8 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@@ -77,7 +79,67 @@ public abstract class CommEngine extends HttpServlet


protected String runtimeDirPath; protected String runtimeDirPath;
protected String confDirPath; protected String confDirPath;
/** Daily resume all broadcasts time. No action if "" */
protected String clock_24hr_resume;
/** Daily pause all broadcasts time. No action if "" */
protected String clock_24hr_pause;


private Clock_24hr clock_24hr;
protected class Clock_24hr extends Thread
{
private boolean threadShouldStop = false;
public void run()
{
while (!threadShouldStop)
{
long startTime = System.currentTimeMillis();;

String timeOfDay = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm"));
//System.out.println(String.format("timeOfDay %s, pause %s, resume %s", timeOfDay, clock_24hr_pause, clock_24hr_resume));
// Check for pause
for (Broadcast broadcast : broadcasts.values())
{
// Check for pause
if (broadcast.clock_24hr_pause != null && broadcast.clock_24hr_pause.length() > 0)
{
if (timeOfDay.equals(broadcast.clock_24hr_pause)) broadcast.pause(null);
}
else if (clock_24hr_pause.equals(timeOfDay)) broadcast.pause(null);

// Check for resume
if (broadcast.clock_24hr_resume != null && broadcast.clock_24hr_resume.length() > 0)
{
if (timeOfDay.equals(broadcast.clock_24hr_resume)) broadcast.resume(null);
}
else if (clock_24hr_resume.equals(timeOfDay)) broadcast.resume(null);
}

// Wakes up every 1/2 minute to provide minute resolution
long currentTime = System.currentTimeMillis();
long sleepTime = startTime + 30*1000 - currentTime;
if (sleepTime > 0)
{
try
{
Thread.sleep(1000); //sleepTime);
}
catch (Exception e)
{
myLogger.error("Clock_24hr thread caught: " + e.getMessage(), e);
return;
}
}
}
}

public void terminate()
{
threadShouldStop = true;
}
}
abstract protected Broadcast mkBroadcast(); abstract protected Broadcast mkBroadcast();
public CommEngine(String engineName) public CommEngine(String engineName)
@@ -185,12 +247,21 @@ public abstract class CommEngine extends HttpServlet
// Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes
String periodStr = config.getProperty("dead_broadcast_viewing_period", Long.toString(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT)); String periodStr = config.getProperty("dead_broadcast_viewing_period", Long.toString(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT));
deadBroadcastViewingMinutes = Long.parseLong(periodStr); deadBroadcastViewingMinutes = Long.parseLong(periodStr);
clock_24hr_resume = config.getProperty("clock_24hr_resume", "");
clock_24hr_pause = config.getProperty("clock_24hr_pause", "");
Thread clock_24hr;
CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes));


CommonLogger.startup.info(String.format("service thread pool size: %d", getServiceThreadPoolSize())); CommonLogger.startup.info(String.format("service thread pool size: %d", getServiceThreadPoolSize()));
CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize()); CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize());
CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize());
CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize());
CommonLogger.activity.info("clock_24hr_resume = " + clock_24hr_resume);
CommonLogger.activity.info("clock_24hr_pause = " + clock_24hr_pause);
clock_24hr = new Clock_24hr();
clock_24hr.start();
initChild(); initChild();
} }
@@ -531,6 +602,18 @@ public abstract class CommEngine extends HttpServlet
broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); broadcast.terminate(BroadcastState.ABORTED, "Platform termination");
} }
} }
// Destroy clock_24hr thread
try
{
clock_24hr.terminate();
clock_24hr.join();
}
catch (InterruptedException e)
{
// TODO nothing
}
destroyChild(); destroyChild();
super.destroy(); super.destroy();
} }


Loading…
Cancel
Save