diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index e099d66..db8193b 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -43,6 +43,7 @@ public abstract class Broadcast String haltReason; String stateErrorText; + public CommEngine commEngine; public final long receiveTime; public long serviceStartTime; public long serviceEndTime; @@ -78,6 +79,7 @@ public abstract class Broadcast private Integer transactions; private Integer serviceActivityCount; + public static enum BroadcastState { @@ -435,6 +437,7 @@ public abstract class Broadcast { myLogger.debug("Entering Broadcast.doPost method"); BroadcastException myException = null; + this.commEngine = commEngine; try { boolean notInService = commEngine.notInService(); @@ -450,7 +453,6 @@ public abstract class Broadcast } if (recipientList.size() == 0) { - // TODO: Got to return HTTP content before returning. CommonLogger.activity.info("Broadcast " + getBroadcastId() + ": No recipients"); setState(BroadcastState.COMPLETED, "No recipients", null); return; @@ -464,7 +466,6 @@ public abstract class Broadcast } catch (BroadcastException e) { - // TODO: Got to return HTTP content before returning. myException = e; setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); @@ -510,9 +511,9 @@ public abstract class Broadcast jobsTotal = recipientList.size(); postback = new Postback(this, - commEngine.getPostbackMaxQueueSize(), + getPostbackMaxQueueSize(), getPostbackSenderPoolSize(), - commEngine.getPostbackMaxBatchSize()); + getPostbackMaxBatchSize()); // Create service thread pool to dispatch jobs, // at the same time, setting up a list of service thread names @@ -544,9 +545,25 @@ public abstract class Broadcast } } - protected abstract int getServiceThreadPoolSize(); + protected int getPostbackMaxQueueSize() + { + return commEngine.getPostbackMaxQueueSize(); + } + + protected int getPostbackMaxBatchSize() + { + return commEngine.getPostbackMaxBatchSize(); + } - protected abstract int getPostbackSenderPoolSize(); + protected int getServiceThreadPoolSize() + { + return commEngine.getServiceThreadPoolSize(); + } + + protected int getPostbackSenderPoolSize() + { + return commEngine.getPostbackSenderPoolSize(); + } protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); @@ -607,7 +624,8 @@ public abstract class Broadcast if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime; if (postback != null) { - postback.queueReportFirst(mkStatusReport()); + if (state == BroadcastState.ALLDONE) postback.queueReport(mkStatusReport()); + else postback.queueReportFirst(mkStatusReport()); } return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); } diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index 8e864cc..2044885 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -27,7 +27,6 @@ import altk.comm.engine.Broadcast.BroadcastState; @SuppressWarnings("serial") public abstract class CommEngine extends HttpServlet { - public static final String SERVICE_THREADPOOL_SIZE_KEY = "service_threadpool_size"; static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; @@ -35,10 +34,16 @@ public abstract class CommEngine extends HttpServlet private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60; + public static final String SERVICE_THREADPOOL_SIZE_KEY = "service_threadpool_size"; private static final int SERVICE_THREADPOOL_SIZE_DEFAULT = 1; - private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 2; + private static final String POSTBACK_THREADPOOL_SIZE_KEY = "postback_threadpool_size"; + private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 20; + + private static final String POSTBACK_MAX_QUEUE_SIZE_KEY = "postback_max_queue_size"; private static final int POSTBACK_MAX_QUEUE_SIZE_DEFAULT = 10000; + + private static final String POSTBACK_MAX_BATCH_SIZE_KEY = "postback_max_batch_size"; private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100; /** * Maps a broadcastId to a broadcast. @@ -69,14 +74,6 @@ public abstract class CommEngine extends HttpServlet private int completedJobCount = 0; - private int serviceThreadPoolSize; - - private int postbackMaxQueueSize; - - private int postbackSenderPoolSize; - - private int postbackMaxBatchSize; - protected String runtimeDirPath; protected String confDirPath; @@ -193,25 +190,10 @@ public abstract class CommEngine extends HttpServlet deadBroadcastViewingMinutes = Long.parseLong(periodStr); CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); - String str = config.getProperty(SERVICE_THREADPOOL_SIZE_KEY, - new Integer(SERVICE_THREADPOOL_SIZE_DEFAULT).toString()); - serviceThreadPoolSize = Integer.parseInt(str); - CommonLogger.startup.info(String.format("service thread pool size: %d", serviceThreadPoolSize)); - - String string = config.getProperty("postback_max_queue_size", - new Integer(POSTBACK_MAX_QUEUE_SIZE_DEFAULT).toString()); - postbackMaxQueueSize = Integer.parseInt(string); - CommonLogger.activity.info("Postback max queue size = " + postbackMaxQueueSize); - - string = config.getProperty("postback_threadpool_size", - new Integer(POSTBACK_THREADPOOL_SIZE_DEFAULT).toString()); - postbackSenderPoolSize = Integer.parseInt(string); - CommonLogger.activity.info("Postback threadpool size = " + postbackSenderPoolSize); - - string = config.getProperty("postback_max_batch_size", - new Integer(POSTBACK_MAX_BATCH_SIZE_DEFAULT).toString()); - postbackMaxBatchSize = Integer.parseInt(string); - CommonLogger.activity.info("Postback max batch size = " + postbackMaxBatchSize); + CommonLogger.startup.info(String.format("service thread pool size: %d", getServiceThreadPoolSize())); + CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize()); + CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize()); + CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize()); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}}, @@ -219,6 +201,60 @@ public abstract class CommEngine extends HttpServlet initChild(); } + + public int getServiceThreadPoolSize() + { + return getServiceThreadPoolSize(config); + } + + public int getServiceThreadPoolSize(Properties properties) + { + String str = properties. + getProperty(SERVICE_THREADPOOL_SIZE_KEY, String.valueOf(SERVICE_THREADPOOL_SIZE_DEFAULT)); + int size = Integer.valueOf(str); + return size; + } + + public int getPostbackSenderPoolSize() + { + return getPostbackSenderPoolSize(config); + } + + public int getPostbackSenderPoolSize(Properties properties) + { + String str = properties. + getProperty(POSTBACK_THREADPOOL_SIZE_KEY, String.valueOf(POSTBACK_THREADPOOL_SIZE_DEFAULT)); + int size = Integer.valueOf(str); + return size; + } + + public int getPostbackMaxQueueSize() + { + return getPostbackMaxQueueSize(config); + } + + public int getPostbackMaxQueueSize(Properties properties) + { + String str = properties. + getProperty(POSTBACK_MAX_QUEUE_SIZE_KEY, String.valueOf(POSTBACK_MAX_QUEUE_SIZE_DEFAULT)); + int size = Integer.valueOf(str); + return size; + } + + public int getPostbackMaxBatchSize() + { + return getPostbackMaxBatchSize(config); + } + + public int getPostbackMaxBatchSize(Properties properties) + { + String str = properties. + getProperty(POSTBACK_MAX_BATCH_SIZE_KEY, String.valueOf(POSTBACK_MAX_BATCH_SIZE_DEFAULT)); + int size = Integer.valueOf(str); + return size; + } + + protected void purgeStaleBroadcasts() { @@ -521,21 +557,4 @@ public abstract class CommEngine extends HttpServlet broadcasts.put(broadcastId, broadcast); } - @Deprecated - public int getServiceThreadPoolSize() - { - return serviceThreadPoolSize; - } - - public int getPostbackMaxQueueSize() { - return postbackMaxQueueSize; - } - - public int getPostbackSenderPoolSize() { - return postbackSenderPoolSize; - } - - public int getPostbackMaxBatchSize() { - return postbackMaxBatchSize; - } }