From 4d95efe3e939b23e73eb7977ae945a42e8f478da Mon Sep 17 00:00:00 2001 From: ymlam Date: Tue, 7 Jun 2022 12:12:33 -0400 Subject: [PATCH] Add Auto Pause feature --- src/main/java/altk/comm/engine/Broadcast.java | 12 +++++++++++- src/main/java/altk/comm/engine/CommEngine.java | 17 +++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 21fcbdd..8ae901a 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -90,6 +90,8 @@ public abstract class Broadcast /** Running count of successful jobs */ private AtomicInteger successCount; + private int pauseThreshold; + private int lastPauseCount; public static enum BroadcastState @@ -414,6 +416,7 @@ public abstract class Broadcast receiveTime = System.currentTimeMillis(); serviceActivityCount = Integer.valueOf(0); transactions = Integer.valueOf(0); + lastPauseCount = 0; } private void incrementTransactions(int delta) @@ -421,7 +424,12 @@ public abstract class Broadcast synchronized (transactions) { transactions += delta; + lastPauseCount += delta; } + if (pauseThreshold > 0 && lastPauseCount >= pauseThreshold) + { + pause(null); + } } /** @@ -459,6 +467,7 @@ public abstract class Broadcast myLogger.debug("Entering Broadcast.doPost method"); BroadcastException myException = null; this.commEngine = commEngine; + pauseThreshold = commEngine.getPauseThreshold(); try { boolean notInService = commEngine.notInService(); @@ -989,7 +998,8 @@ public abstract class Broadcast // Sets state to PAUSING, which is monitored by Broadcast.Service threads. // EVentually, when all service activity ends, the state transitions to PAUSED setState(BroadcastState.PAUSING); - out.write("Broadcast paused"); + lastPauseCount = 0; + if (out != null) out.write("Broadcast paused"); } protected void resume(PrintWriter out) diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index 89a2147..5282254 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -42,6 +42,10 @@ public abstract class CommEngine extends HttpServlet private static final String POSTBACK_MAX_BATCH_SIZE_KEY = "postback_max_batch_size"; private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100; + + private static final String PAUSE_THRESHOLD_KEY = "pause_threshold"; + private static final int PAUSE_THRESHOLD_DEFAULT = 0; + /** * Maps a broadcastId to a broadcast. */ @@ -191,6 +195,19 @@ public abstract class CommEngine extends HttpServlet initChild(); } + public int getPauseThreshold() + { + return getPauseThreshold(config); + } + + public int getPauseThreshold(Properties properties) + { + String str = properties. + getProperty(PAUSE_THRESHOLD_KEY, String.valueOf(PAUSE_THRESHOLD_DEFAULT)); + int pauseThreshold = Integer.valueOf(str); + return pauseThreshold; + } + public int getServiceThreadPoolSize() { return getServiceThreadPoolSize(config);