diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 1d73949..703fd0e 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -323,10 +323,11 @@ public abstract class Broadcast // Get a batch of jobs, if available myLogger.debug("Looking for jobs"); List batch = new ArrayList(); + int batchSize = getJobBatchSize(); synchronized(readyQueue) { // We we are to get a batch of more than one, let us fill in the rest. - for (int i = 0; i < getJobBatchSize(); i++) + for (int i = 0; i < batchSize; i++) { Job job = readyQueue.poll(); if (job == null) break; @@ -349,7 +350,7 @@ public abstract class Broadcast continue; } } - updateServiceActivityCount(batch.size()); + updateServiceActivityCount(batchSize); } // Process jobs. @@ -374,15 +375,16 @@ public abstract class Broadcast catch (EngineException e) { // Aborting + myLogger.error("Caught unexpected Exception", e); setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText); - updateServiceActivityCount(-batch.size()); + updateServiceActivityCount(-batchSize); } catch (Throwable t) { // This is unexpected. Log stack trace myLogger.error("Caught unexpected Throwable", t); terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); - updateServiceActivityCount(-batch.size()); + updateServiceActivityCount(-batchSize); } if (sleepBetweenJobs > 0) { @@ -398,6 +400,10 @@ public abstract class Broadcast } // Exit thread myLogger.info("Thread terminating"); + synchronized(readyQueue) + { + readyQueue.notify(); + } System.out.println(getName() + " terminating"); closeServiceProvider(serviceProviderPeer); } diff --git a/src/main/java/altk/comm/engine/Postback.java b/src/main/java/altk/comm/engine/Postback.java index af6772a..56c81aa 100644 --- a/src/main/java/altk/comm/engine/Postback.java +++ b/src/main/java/altk/comm/engine/Postback.java @@ -165,6 +165,10 @@ public class Postback case STOP: myLogger.info(getName() + " terminating"); System.out.println(getName() + " terminating"); + synchronized(postQueue) + { + postQueue.notify(); + } return; case WAIT: synchronized (postQueue)