Browse Source

Purging stale broadcasts moved to getStatus

rather than periodically on schedule.  This change has
no impact on user experience, and has improved logic,
and slight improvement in performance.

Also made all changes to the broadcasts structure under
synchronized control.
tags/1.0.4
Yuk-Ming Lam 4 years ago
parent
commit
3d0b89a5bf
1 changed files with 20 additions and 16 deletions
  1. +20
    -16
      src/main/java/altk/comm/engine/CommEngine.java

+ 20
- 16
src/main/java/altk/comm/engine/CommEngine.java View File

@@ -7,12 +7,11 @@ import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
@@ -195,10 +194,6 @@ public abstract class CommEngine extends HttpServlet
CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize());
CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize());
scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE);
scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}},
deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES);

initChild(); initChild();
} }
@@ -261,13 +256,14 @@ public abstract class CommEngine extends HttpServlet
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
synchronized (broadcasts) synchronized (broadcasts)
{ {
for (String id : broadcasts.keySet())
Iterator<String> iter = broadcasts.keySet().iterator();
while (iter.hasNext())
{ {
if (now - broadcasts.get(id).changeStateTime > deadBroadcastViewingMinutes * 60 * 1000)
Broadcast broadcast = broadcasts.get(iter.next());
if (now - broadcast.changeStateTime > deadBroadcastViewingMinutes * 60 * 1000)
{ {
Broadcast broadcast = broadcasts.get(id);
completedJobCount += broadcast.getCompletedJobCount(); completedJobCount += broadcast.getCompletedJobCount();
broadcasts.remove(id);
iter.remove();
} }
} }
} }
@@ -393,6 +389,7 @@ public abstract class CommEngine extends HttpServlet
*/ */
private void getStatus(HttpServletRequest request, PrintWriter out) private void getStatus(HttpServletRequest request, PrintWriter out)
{ {
purgeStaleBroadcasts();
String broadcastId = request.getParameter("broadcast_id"); String broadcastId = request.getParameter("broadcast_id");
if (broadcastId != null) if (broadcastId != null)
{ {
@@ -515,11 +512,13 @@ public abstract class CommEngine extends HttpServlet
// Shutdown threads that periodically purge stale broadcasts. // Shutdown threads that periodically purge stale broadcasts.
scheduler.shutdownNow(); scheduler.shutdownNow();
for (Broadcast broadcast : broadcasts.values())
synchronized(broadcasts)
{ {
broadcast.terminate(BroadcastState.ABORTED, "Platform termination");
for (Broadcast broadcast : broadcasts.values())
{
broadcast.terminate(BroadcastState.ABORTED, "Platform termination");
}
} }
destroyChild(); destroyChild();
super.destroy(); super.destroy();
} }
@@ -542,8 +541,10 @@ public abstract class CommEngine extends HttpServlet
public void addBroadcast(String broadcastId, Broadcast broadcast) public void addBroadcast(String broadcastId, Broadcast broadcast)
{ {
if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++;
broadcasts.put(broadcastId, broadcast);
synchronized (broadcasts)
{
broadcasts.put(broadcastId, broadcast);
}
} }


/** /**
@@ -554,7 +555,10 @@ public abstract class CommEngine extends HttpServlet
{ {
String broadcastId = broadcast.getBroadcastId(); String broadcastId = broadcast.getBroadcastId();
if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++; if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++;
broadcasts.put(broadcastId, broadcast);
synchronized (broadcasts)
{
broadcasts.put(broadcastId, broadcast);
}
} }


} }

Loading…
Cancel
Save