diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index 2044885..0320f6b 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -7,12 +7,11 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Enumeration; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.Vector; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import javax.servlet.ServletContext; import javax.servlet.http.HttpServlet; @@ -195,10 +194,6 @@ public abstract class CommEngine extends HttpServlet CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); - scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); - scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}}, - deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES); - initChild(); } @@ -261,13 +256,14 @@ public abstract class CommEngine extends HttpServlet long now = System.currentTimeMillis(); synchronized (broadcasts) { - for (String id : broadcasts.keySet()) + Iterator iter = broadcasts.keySet().iterator(); + while (iter.hasNext()) { - if (now - broadcasts.get(id).changeStateTime > deadBroadcastViewingMinutes * 60 * 1000) + Broadcast broadcast = broadcasts.get(iter.next()); + if (now - broadcast.changeStateTime > deadBroadcastViewingMinutes * 60 * 1000) { - Broadcast broadcast = broadcasts.get(id); completedJobCount += broadcast.getCompletedJobCount(); - broadcasts.remove(id); + iter.remove(); } } } @@ -393,6 +389,7 @@ public abstract class CommEngine extends HttpServlet */ private void getStatus(HttpServletRequest request, PrintWriter out) { + purgeStaleBroadcasts(); String broadcastId = request.getParameter("broadcast_id"); if (broadcastId != null) { @@ -515,11 +512,13 @@ public abstract class CommEngine extends HttpServlet // Shutdown threads that periodically purge stale broadcasts. scheduler.shutdownNow(); - for (Broadcast broadcast : broadcasts.values()) + synchronized(broadcasts) { - broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); + for (Broadcast broadcast : broadcasts.values()) + { + broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); + } } - destroyChild(); super.destroy(); } @@ -542,8 +541,10 @@ public abstract class CommEngine extends HttpServlet public void addBroadcast(String broadcastId, Broadcast broadcast) { if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; - broadcasts.put(broadcastId, broadcast); - + synchronized (broadcasts) + { + broadcasts.put(broadcastId, broadcast); + } } /** @@ -554,7 +555,10 @@ public abstract class CommEngine extends HttpServlet { String broadcastId = broadcast.getBroadcastId(); if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; - broadcasts.put(broadcastId, broadcast); + synchronized (broadcasts) + { + broadcasts.put(broadcastId, broadcast); + } } }