Browse Source

Allow derived class to add resume conditions.

tags/1.0.31
ymlam 2 years ago
parent
commit
1952dce39c
1 changed files with 31 additions and 23 deletions
  1. +31
    -23
      src/main/java/altk/comm/engine/Broadcast.java

+ 31
- 23
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -10,6 +10,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Vector;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@@ -20,7 +21,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;


import org.apache.log4j.Category;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.json.simple.JSONObject; import org.json.simple.JSONObject;


@@ -41,6 +41,7 @@ import altk.comm.engine.exception.PlatformException;
*/ */
public abstract class Broadcast public abstract class Broadcast
{ {
protected static final String ACTION_BY_CLOCK = "clock";
private static final int SCHEDULER_THREAD_POOL_SIZE = 5; private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id"; private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id";
private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0; private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0;
@@ -52,10 +53,10 @@ public abstract class Broadcast
public final String broadcastType; public final String broadcastType;
private String broadcastId; private String broadcastId;
private BroadcastState state = BroadcastState.ACCEPTED;
protected BroadcastState state = BroadcastState.ACCEPTED;
private Object stateSemaphore = new Object(); private Object stateSemaphore = new Object();


String reason;
protected String reason;
String stateErrorText; String stateErrorText;
public CommEngine commEngine; public CommEngine commEngine;
public final long receiveTime; public final long receiveTime;
@@ -699,11 +700,6 @@ public abstract class Broadcast
} }
else else
{ {
// log illegal state transition with call trace
Exception e = new Exception(String.format("Broadast %s ignored illegal transition from %s to %s", broadcastId, prev, newState));
myLogger.error(e.getMessage());
myLogger.debug("This exception is not thrown -- only for debugging information", e);
return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null); return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null);
} }
} }
@@ -1017,29 +1013,43 @@ public abstract class Broadcast
protected void resume(String reason, PrintWriter out) protected void resume(String reason, PrintWriter out)
{ {
if (state == BroadcastState.ACCEPTED || state.isFinal) return; if (state == BroadcastState.ACCEPTED || state.isFinal) return;
if (reason == "clock" && state == BroadcastState.PAUSED && this.reason != "clock") return;
if (!withinOperatingHours())
{
if (out != null) out.write("Cannot resume outside operating hours");
return;
String actionStatus = resume(reason);
if (out != null && actionStatus != null) out.write(actionStatus);
}
protected String resume(String reason) {
if (reason == ACTION_BY_CLOCK) {
boolean inAcceptqble = false;
for (String acceptable : getPauseModesResumableByClock()) {
if (this.reason.equalsIgnoreCase(acceptable)) inAcceptqble = true;
}
if (!inAcceptqble) return null;
} }
if (!withinOperatingHours()) return "Cannot resume outside operating hours";
synchronized (resumeFlag) synchronized (resumeFlag)
{ {
StateChangeResult result = setState(BroadcastState.RUNNING, reason, null); StateChangeResult result = setState(BroadcastState.RUNNING, reason, null);
switch (result.stateChangeStatus) switch (result.stateChangeStatus)
{ {
case FORBIDDEN: case FORBIDDEN:
if (out != null) out.write("resume not allowed");
break;
return "resume not allowed";
case SUCCESS: case SUCCESS:
if (out != null) out.write("Broadcast resumed");
resumeFlag.notifyAll(); resumeFlag.notifyAll();
break;
return "Broadcast resumed";
default: default:
break;
return null;
} }
} }
} }
/** derived class may override by adding more modes */
protected List<String> getPauseModesResumableByClock() {
Vector<String> modes = new Vector<String>();
modes.add(ACTION_BY_CLOCK);
return modes;
}
/** /**
* Derived class may make its own Implementation of JobReport * Derived class may make its own Implementation of JobReport
* @return * @return
@@ -1075,7 +1085,7 @@ public abstract class Broadcast
} }
else else
{ {
setState(BroadcastState.PAUSED, "clock", null);
setState(BroadcastState.PAUSED, ACTION_BY_CLOCK, null);
} }
// Start the dispatcher threads // Start the dispatcher threads
@@ -1473,10 +1483,8 @@ public abstract class Broadcast
*/ */
void enforceOperationHours() { void enforceOperationHours() {
if (state == BroadcastState.ABORTED) return; if (state == BroadcastState.ABORTED) return;
if (withinOperatingHours()) {
// resume("clock", null);
} else {
pause("clock", null);
if (!withinOperatingHours()) {
pause(ACTION_BY_CLOCK, null);
} }
} }




Loading…
Cancel
Save