Переглянути джерело

Time of day clock based pause and resume.

Use reason field in Broadcast to record reason for pause, cancel etc.
tags/1.0.13
ymlam 3 роки тому
джерело
коміт
1234de3189
3 змінених файлів з 89 додано та 41 видалено
  1. +6
    -0
      pom.xml
  2. +25
    -31
      src/main/java/altk/comm/engine/Broadcast.java
  3. +58
    -10
      src/main/java/altk/comm/engine/CommEngine.java

+ 6
- 0
pom.xml Переглянути файл

@@ -64,6 +64,11 @@
<version>4.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<repositories>
<repository>
@@ -75,6 +80,7 @@
<url>sftp://repo.link2tek.net/home/snake/maven_repo</url>
</repository>
</repositories>
<distributionManagement>
<repository>
<uniqueVersion>false</uniqueVersion>


+ 25
- 31
src/main/java/altk/comm/engine/Broadcast.java Переглянути файл

@@ -44,7 +44,7 @@ public abstract class Broadcast
private BroadcastState state = BroadcastState.ACCEPTED;
private Object stateSemaphore = new Object();

String haltReason;
String reason;
String stateErrorText;
public CommEngine commEngine;
public final long receiveTime;
@@ -433,7 +433,7 @@ public abstract class Broadcast
}
if (pauseThreshold > 0 && lastPauseCount >= pauseThreshold)
{
pause(null);
pause(null, null);
}
}
@@ -641,7 +641,6 @@ public abstract class Broadcast
*/
public StateChangeResult setState(BroadcastState newState)
{
if (newState == BroadcastState.ABORTED) return setState(BroadcastState.ABORTED, haltReason, stateErrorText);
return setState(newState, null, null);
}
@@ -652,7 +651,7 @@ public abstract class Broadcast
* @return StateChangeResult
*/
public synchronized StateChangeResult setState(BroadcastState newState,
String haltReason, String stateErrorText)
String reason, String stateErrorText)
{
boolean isLegal;
BroadcastState prev = null;
@@ -664,7 +663,7 @@ public abstract class Broadcast
prev = state;
if (isLegal)
{
this.haltReason = haltReason;
this.reason = reason;
this.stateErrorText = stateErrorText;
CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState));
@@ -743,9 +742,9 @@ public abstract class Broadcast
responseXML.append("'");
if (e == null)
{
if (haltReason != null && haltReason.length() > 0)
if (reason != null && reason.length() > 0)
{
responseXML.append(" error='" + haltReason + "'");
responseXML.append(" error='" + reason + "'");
}
responseXML.append('>');
}
@@ -879,21 +878,10 @@ public abstract class Broadcast
+ "</service_end_time>");
statusBf.append("<state>" + state + "</state><state_change_time>" + changeStateTime
+ "</state_change_time>");
if (state == BroadcastState.PAUSED
|| state == BroadcastState.ABORTED || state == BroadcastState.ABORTING)
{
if (haltReason != null)
{
// Escaping '&' and '<' in haltReason before enclosing it in <reason> tag
statusBf.append("<reason>" + Util.xmlEscape(haltReason)
+ "</reason>");
}
if (stateErrorText != null)
{
statusBf.append("<error_text>" + Util.xmlEscape(stateErrorText)
+ "</error_text>");
}
}
statusBf.append("<reason>" + (reason == null? "" : Util.xmlEscape(reason))
+ "</reason>");
statusBf.append("<error_text>" + (stateErrorText == null? "" : Util.xmlEscape(stateErrorText))
+ "</error_text>");
statusBf.append("<transactions>" + transactions + "</transactions>");
statusBf.append("<success>" + successCount.intValue() + "</success>");
statusBf.append("<job_summary completed='" + getCompletedJobCount() +
@@ -975,12 +963,13 @@ public abstract class Broadcast

/**
* Sets the stateMachine to CANCEL
* @param reasons - may be null.
*/
protected void cancel(PrintWriter out)
protected void cancel(String reason, PrintWriter out)
{
BroadcastState targetState = getActiveJobCount() == 0?
BroadcastState.CANCELED : BroadcastState.CANCELING;
StateChangeResult result = setState(targetState);
StateChangeResult result = setState(targetState, reason, null);
String responseContent = null;
switch (result.stateChangeStatus)
{
@@ -998,11 +987,16 @@ public abstract class Broadcast
wakeUpServiceThreads();
}

protected void pause(PrintWriter out)
/**
*
* @param reason
* @param out
*/
protected void pause(String reason, PrintWriter out)
{
// Sets state to PAUSING, which is monitored by Broadcast.Service threads.
// EVentually, when all service activity ends, the state transitions to PAUSED
StateChangeResult result = setState(BroadcastState.PAUSING);
StateChangeResult result = setState(BroadcastState.PAUSING, reason, null);
switch (result.stateChangeStatus)
{
case FORBIDDEN:
@@ -1017,11 +1011,11 @@ public abstract class Broadcast
}
}
protected void resume(PrintWriter out)
protected void resume(String reason, PrintWriter out)
{
synchronized (resumeFlag)
{
StateChangeResult result = setState(BroadcastState.RUNNING);
StateChangeResult result = setState(BroadcastState.RUNNING, reason, null);
switch (result.stateChangeStatus)
{
case FORBIDDEN:
@@ -1391,15 +1385,15 @@ public abstract class Broadcast
}
if (state == BroadcastState.PAUSING) {
return setState(BroadcastState.PAUSED).stateChangeStatus == StateChangeStatus.SUCCESS?
return setState(BroadcastState.PAUSED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.WAIT;
}
if (state == BroadcastState.CANCELING) {
return setState(BroadcastState.CANCELED).stateChangeStatus == StateChangeStatus.SUCCESS?
return setState(BroadcastState.CANCELED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
}
else if (state == BroadcastState.ABORTING) {
return setState(BroadcastState.ABORTED).stateChangeStatus == StateChangeStatus.SUCCESS?
return setState(BroadcastState.ABORTED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
}
else if (state == BroadcastState.COMPLETED) {


+ 58
- 10
src/main/java/altk/comm/engine/CommEngine.java Переглянути файл

@@ -5,6 +5,7 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Enumeration;
@@ -22,6 +23,8 @@ import javax.servlet.http.HttpServletResponse;

import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;

import altk.comm.engine.Broadcast.BroadcastState;

@@ -29,6 +32,10 @@ import altk.comm.engine.Broadcast.BroadcastState;
public abstract class CommEngine extends HttpServlet
{

private static final String CLOCK_24HR_PAUSE_KEY = "clock_24hr_pause";

private static final String CLOCK_24HR_RESUME_KEY = "clock_24hr_resume";

static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request";

private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60;
@@ -104,16 +111,16 @@ public abstract class CommEngine extends HttpServlet
// Check for pause
if (broadcast.clock_24hr_pause != null && broadcast.clock_24hr_pause.length() > 0)
{
if (timeOfDay.equals(broadcast.clock_24hr_pause)) broadcast.pause(null);
if (timeOfDay.equals(broadcast.clock_24hr_pause)) broadcast.pause("clock", null);
}
else if (clock_24hr_pause.equals(timeOfDay)) broadcast.pause(null);
else if (clock_24hr_pause.equals(timeOfDay)) broadcast.pause("clock", null);

// Check for resume
if (broadcast.clock_24hr_resume != null && broadcast.clock_24hr_resume.length() > 0)
{
if (timeOfDay.equals(broadcast.clock_24hr_resume)) broadcast.resume(null);
if (timeOfDay.equals(broadcast.clock_24hr_resume)) broadcast.resume("clock", null);
}
else if (clock_24hr_resume.equals(timeOfDay)) broadcast.resume(null);
else if (clock_24hr_resume.equals(timeOfDay)) broadcast.resume("clock", null);
}

// Wakes up every 1/2 minute to provide minute resolution
@@ -247,8 +254,8 @@ public abstract class CommEngine extends HttpServlet
// Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes
String periodStr = config.getProperty("dead_broadcast_viewing_period", Long.toString(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT));
deadBroadcastViewingMinutes = Long.parseLong(periodStr);
clock_24hr_resume = config.getProperty("clock_24hr_resume", "");
clock_24hr_pause = config.getProperty("clock_24hr_pause", "");
clock_24hr_resume = config.getProperty(CLOCK_24HR_RESUME_KEY, "");
clock_24hr_pause = config.getProperty(CLOCK_24HR_PAUSE_KEY, "");
Thread clock_24hr;
CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes));
@@ -417,6 +424,10 @@ public abstract class CommEngine extends HttpServlet
{
resumeBroadcast(request, out);
}
else if (get.equalsIgnoreCase("configure"))
{
configure(request, out);
}
else
{
out.write(get + " not supported");
@@ -429,12 +440,13 @@ public abstract class CommEngine extends HttpServlet
// Get broadcastId from request
String broadcastId = getBroadcastId(request);
Broadcast broadcast = broadcasts.get(broadcastId);
String reason = request.getParameter("reason");
if (broadcast == null)
{
out.format("Broadcast %s does not exist", broadcastId);
return;
}
broadcast.cancel(out);
broadcast.cancel(reason, out);
}
protected void pauseBroadcast(HttpServletRequest request, PrintWriter out)
@@ -442,12 +454,13 @@ public abstract class CommEngine extends HttpServlet
// Get broadcastId from request
String broadcastId = getBroadcastId(request);
Broadcast broadcast = broadcasts.get(broadcastId);
String reason = request.getParameter("reason");
if (broadcast == null)
{
out.format("Broadcast %s does not exist", broadcastId);
return;
}
broadcast.pause(out);
broadcast.pause(reason, out);
}


@@ -456,12 +469,43 @@ public abstract class CommEngine extends HttpServlet
// Get broadcastId from request
String broadcastId = getBroadcastId(request);
Broadcast broadcast = broadcasts.get(broadcastId);
String reason = request.getParameter("reason");
if (broadcast == null)
{
out.format("Broadcast %s does not exist", broadcastId);
return;
}
broadcast.resume(out);
broadcast.resume(reason, out);
}
protected void configure(HttpServletRequest request, PrintWriter out) {
String jsonData = request.getParameter("data");
System.out.println("jsonData: " + jsonData);
JSONParser parser = new JSONParser();
try {
JSONObject jsonObject = (JSONObject) parser.parse(new StringReader(jsonData));
System.out.println(jsonObject);

String valueStr;
// clock_24hr_pause
valueStr = (String) jsonObject.get(CLOCK_24HR_PAUSE_KEY);
if (valueStr != null && valueStr.length() == 5) {
clock_24hr_pause = valueStr;
out.println("clock_24hr_pause updated.");
}
// clock_24hr_pause
valueStr = (String) jsonObject.get(CLOCK_24HR_RESUME_KEY);
if (valueStr != null && valueStr.length() == 5) {
clock_24hr_resume = valueStr;
out.println("clock_24hr_resume updated.");
}
} catch (Exception e) {
out.println("Error - " + e.getMessage());
myLogger.error(e);
}
}

/**
@@ -516,7 +560,11 @@ public abstract class CommEngine extends HttpServlet
{
out.write(broadcast.mkStatusReport());
}
out.write("<job_summary completed='" + getCompletedJobCount() + "' pending='" + getPendingJobCount() + "' active='" + getActiveJobCount() + "'/>");
out.write("<job_summary completed='" + getCompletedJobCount() + "' pending='" + getPendingJobCount() + "' active='" + getActiveJobCount() + "'/>\n");
out.write("<configuration>\n");
out.write("<" + CLOCK_24HR_PAUSE_KEY + ">" + clock_24hr_pause + "</" + CLOCK_24HR_PAUSE_KEY + ">\n");
out.write("<" + CLOCK_24HR_RESUME_KEY + ">" + clock_24hr_resume + "</" + CLOCK_24HR_RESUME_KEY + ">\n");
out.write("</configuration>\n");
out.write("</" + tag + ">");
}
}


Завантаження…
Відмінити
Зберегти