Browse Source

Add Auto Pause feature

tags/1.0.9
ymlam 3 years ago
parent
commit
4d95efe3e9
2 changed files with 28 additions and 1 deletions
  1. +11
    -1
      src/main/java/altk/comm/engine/Broadcast.java
  2. +17
    -0
      src/main/java/altk/comm/engine/CommEngine.java

+ 11
- 1
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -90,6 +90,8 @@ public abstract class Broadcast
/** Running count of successful jobs */ /** Running count of successful jobs */
private AtomicInteger successCount; private AtomicInteger successCount;
private int pauseThreshold;
private int lastPauseCount;


public static enum BroadcastState public static enum BroadcastState
@@ -414,6 +416,7 @@ public abstract class Broadcast
receiveTime = System.currentTimeMillis(); receiveTime = System.currentTimeMillis();
serviceActivityCount = Integer.valueOf(0); serviceActivityCount = Integer.valueOf(0);
transactions = Integer.valueOf(0); transactions = Integer.valueOf(0);
lastPauseCount = 0;
} }
private void incrementTransactions(int delta) private void incrementTransactions(int delta)
@@ -421,7 +424,12 @@ public abstract class Broadcast
synchronized (transactions) synchronized (transactions)
{ {
transactions += delta; transactions += delta;
lastPauseCount += delta;
} }
if (pauseThreshold > 0 && lastPauseCount >= pauseThreshold)
{
pause(null);
}
} }
/** /**
@@ -459,6 +467,7 @@ public abstract class Broadcast
myLogger.debug("Entering Broadcast.doPost method"); myLogger.debug("Entering Broadcast.doPost method");
BroadcastException myException = null; BroadcastException myException = null;
this.commEngine = commEngine; this.commEngine = commEngine;
pauseThreshold = commEngine.getPauseThreshold();
try try
{ {
boolean notInService = commEngine.notInService(); boolean notInService = commEngine.notInService();
@@ -989,7 +998,8 @@ public abstract class Broadcast
// Sets state to PAUSING, which is monitored by Broadcast.Service threads. // Sets state to PAUSING, which is monitored by Broadcast.Service threads.
// EVentually, when all service activity ends, the state transitions to PAUSED // EVentually, when all service activity ends, the state transitions to PAUSED
setState(BroadcastState.PAUSING); setState(BroadcastState.PAUSING);
out.write("Broadcast paused");
lastPauseCount = 0;
if (out != null) out.write("Broadcast paused");
} }
protected void resume(PrintWriter out) protected void resume(PrintWriter out)


+ 17
- 0
src/main/java/altk/comm/engine/CommEngine.java View File

@@ -42,6 +42,10 @@ public abstract class CommEngine extends HttpServlet
private static final String POSTBACK_MAX_BATCH_SIZE_KEY = "postback_max_batch_size"; private static final String POSTBACK_MAX_BATCH_SIZE_KEY = "postback_max_batch_size";
private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100; private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100;
private static final String PAUSE_THRESHOLD_KEY = "pause_threshold";
private static final int PAUSE_THRESHOLD_DEFAULT = 0;
/** /**
* Maps a broadcastId to a broadcast. * Maps a broadcastId to a broadcast.
*/ */
@@ -191,6 +195,19 @@ public abstract class CommEngine extends HttpServlet
initChild(); initChild();
} }
public int getPauseThreshold()
{
return getPauseThreshold(config);
}

public int getPauseThreshold(Properties properties)
{
String str = properties.
getProperty(PAUSE_THRESHOLD_KEY, String.valueOf(PAUSE_THRESHOLD_DEFAULT));
int pauseThreshold = Integer.valueOf(str);
return pauseThreshold;
}
public int getServiceThreadPoolSize() public int getServiceThreadPoolSize()
{ {
return getServiceThreadPoolSize(config); return getServiceThreadPoolSize(config);


Loading…
Cancel
Save