diff --git a/src/altk/comm/engine/Broadcast.java b/src/altk/comm/engine/Broadcast.java index 5c2dfdf..a7d05c4 100644 --- a/src/altk/comm/engine/Broadcast.java +++ b/src/altk/comm/engine/Broadcast.java @@ -39,6 +39,7 @@ public abstract class Broadcast String haltReason; String stateErrorText; + public final long receiveTime; public long changeStateTime; private int completedJobCount = 0; protected String activityRecordIdParamName; @@ -66,6 +67,7 @@ public abstract class Broadcast private int remainingJobs; private ScheduledExecutorService scheduler; + private int serviceThreadPoolSize; public static enum BroadcastState { @@ -298,6 +300,7 @@ public abstract class Broadcast try { processJobs(batch, serviceProvider, prerequisites); + completedJobCount++; } catch (EngineException e) { @@ -324,6 +327,7 @@ public abstract class Broadcast recipientList = new ArrayList(); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); resumeFlag = new Object(); + receiveTime = System.currentTimeMillis(); } protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); @@ -568,16 +572,19 @@ public abstract class Broadcast { StringBuffer statusBf = new StringBuffer(); String topLevelTag = broadcastType; - statusBf.append("<" + topLevelTag + " broadcast_id='" + getBroadcastId() + String broadcastId = getBroadcastId(); + if (broadcastId == null) broadcastId = ""; + statusBf.append("<" + topLevelTag + " broadcast_id='" + broadcastId + + "' receive_time='" + receiveTime + "' recipient_count='" + recipientList.size() + "'"); if (launchRecordId != null) { statusBf.append(" launch_record_id='" + launchRecordId + "'"); } - BroadcastState broadcastState = getState(); - statusBf.append(">\r\n" + broadcastState + "\r\n"); - if (broadcastState == BroadcastState.HALTED - || broadcastState == BroadcastState.ABORTED) + statusBf.append(">\r\n" + state + "" + changeStateTime + + "\r\n"); + if (state == BroadcastState.HALTED + || state == BroadcastState.ABORTED) { if (haltReason != null) { @@ -677,8 +684,6 @@ public abstract class Broadcast // Do nothing in base class. } - public abstract int getServiceThreadPoolSize(); - public String getId() { return broadcastId; @@ -735,9 +740,8 @@ public abstract class Broadcast public void startProcessing() throws BroadcastException { - // Create dispatcher thread pool - int threadPoolSize = getServiceThreadPoolSize(); - for (int i = 0; i < threadPoolSize; i++) + // Create service thread pool to dispatch jobs + for (int i = 0; i < serviceThreadPoolSize; i++) { String threadName = broadcastId + "_service_thread_" + i; Service serviceThread = new Service(threadName); @@ -828,7 +832,7 @@ public abstract class Broadcast postJobStatus(job); if (rescheduleTimeMS < 0) { - completedJobCount ++; + completedJobCount++; } if (rescheduleTimeMS == 0) { @@ -869,4 +873,9 @@ public abstract class Broadcast { return completedJobCount; } + + protected void setServiceThreadPoolsize(int serviceThreadPoolSize) + { + this.serviceThreadPoolSize = serviceThreadPoolSize; + } }