From 3d0b89a5bf4f3328cfb6deb8be2d5d533153f91e Mon Sep 17 00:00:00 2001 From: Yuk-Ming Lam Date: Thu, 18 Feb 2021 01:24:43 -0500 Subject: [PATCH] Purging stale broadcasts moved to getStatus rather than periodically on schedule. This change has no impact on user experience, and has improved logic, and slight improvement in performance. Also made all changes to the broadcasts structure under synchronized control. --- .../java/altk/comm/engine/CommEngine.java | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index 2044885..0320f6b 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -7,12 +7,11 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Enumeration; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Vector; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import javax.servlet.ServletContext; import javax.servlet.http.HttpServlet; @@ -195,10 +194,6 @@ public abstract class CommEngine extends HttpServlet CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); - scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); - scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}}, - deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES); - initChild(); } @@ -261,13 +256,14 @@ public abstract class CommEngine extends HttpServlet long now = System.currentTimeMillis(); synchronized (broadcasts) { - for (String id : broadcasts.keySet()) + Iterator iter = broadcasts.keySet().iterator(); + while (iter.hasNext()) { - if (now - broadcasts.get(id).changeStateTime > deadBroadcastViewingMinutes * 60 * 1000) + Broadcast broadcast = broadcasts.get(iter.next()); + if (now - broadcast.changeStateTime > deadBroadcastViewingMinutes * 60 * 1000) { - Broadcast broadcast = broadcasts.get(id); completedJobCount += broadcast.getCompletedJobCount(); - broadcasts.remove(id); + iter.remove(); } } } @@ -393,6 +389,7 @@ public abstract class CommEngine extends HttpServlet */ private void getStatus(HttpServletRequest request, PrintWriter out) { + purgeStaleBroadcasts(); String broadcastId = request.getParameter("broadcast_id"); if (broadcastId != null) { @@ -515,11 +512,13 @@ public abstract class CommEngine extends HttpServlet // Shutdown threads that periodically purge stale broadcasts. scheduler.shutdownNow(); - for (Broadcast broadcast : broadcasts.values()) + synchronized(broadcasts) { - broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); + for (Broadcast broadcast : broadcasts.values()) + { + broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); + } } - destroyChild(); super.destroy(); } @@ -542,8 +541,10 @@ public abstract class CommEngine extends HttpServlet public void addBroadcast(String broadcastId, Broadcast broadcast) { if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; - broadcasts.put(broadcastId, broadcast); - + synchronized (broadcasts) + { + broadcasts.put(broadcastId, broadcast); + } } /** @@ -554,7 +555,10 @@ public abstract class CommEngine extends HttpServlet { String broadcastId = broadcast.getBroadcastId(); if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; - broadcasts.put(broadcastId, broadcast); + synchronized (broadcasts) + { + broadcasts.put(broadcastId, broadcast); + } } }