From cc1a5c174ef88b06088b247acdcf5f0758c2295c Mon Sep 17 00:00:00 2001 From: ymlam Date: Tue, 17 Sep 2019 03:39:29 -0400 Subject: [PATCH] Fix bug in not properly destroying service and postback threads. Also fix bug of not doing CANCEL properly. --- src/main/java/altk/comm/engine/Broadcast.java | 101 +++++++++--------- src/main/java/altk/comm/engine/Postback.java | 28 ++++- 2 files changed, 76 insertions(+), 53 deletions(-) diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index ebf3a0b..ef4bd65 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -79,6 +79,7 @@ public abstract class Broadcast { ACCEPTED, RUNNING, + ALLDONE, PAUSING, PAUSED, CANCELING, @@ -181,34 +182,34 @@ public abstract class Broadcast BroadcastState.PURGED, // User action BroadcastState.ABORTED, // Service provider irrecoverable error BroadcastState.EXPIRED, - BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. + BroadcastState.ALLDONE // Natural transition, if all ongoing calls complete and no more jobs in Dispatcher queues. )); // Transitions from CANCELING toStates.put(BroadcastState.CANCELING, Arrays.asList( BroadcastState.CANCELED, // User action - BroadcastState.PURGED, // User action - BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. + BroadcastState.PURGED // User action )); - // Transitions from HALTING + // Transitions from PAUSING toStates.put(BroadcastState.PAUSING, Arrays.asList( BroadcastState.RUNNING, // User action BroadcastState.CANCELED, // User action BroadcastState.PAUSED, - BroadcastState.PURGED, // User action - BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. + BroadcastState.PURGED // User action )); - // Transitions from HALTED + // Transitions from PAUSED toStates.put(BroadcastState.PAUSED, Arrays.asList( BroadcastState.RUNNING, // User action BroadcastState.CANCELED, // User action BroadcastState.CANCELING, // User action - BroadcastState.PURGED, // User action - BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. + BroadcastState.PURGED // User action + )); + // Transitions from ALLDONE + toStates.put(BroadcastState.ALLDONE, Arrays.asList( + BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more jobs in post queue )); - } public static class StateChangeResult @@ -244,6 +245,7 @@ public abstract class Broadcast if (threadsShouldStop()) { closeServiceProvider(serviceProviderPeer); + myLogger.info("Thread terminating"); return; } @@ -286,7 +288,7 @@ public abstract class Broadcast return; } - myLogger.debug("Found some jobs"); + myLogger.debug("Woke up from wait"); // Check if expired if (System.currentTimeMillis() >= expireTime) { @@ -328,31 +330,36 @@ public abstract class Broadcast batch.add(job); } } - if (batch != null && batch.size() > 0) + if (batch == null || batch.size()== 0) { - // Mark start time - long now = System.currentTimeMillis(); - for (Job job : batch) - { - job.startTime = now; - } - - // Service the jobs - try - { - processJobs(batch, serviceProviderPeer, prerequisites); - } - catch (EngineException e) - { - terminate(BroadcastState.ABORTED, e.getMessage()); - } - catch (Throwable t) - { - // This is unexpected. Log stack trace - myLogger.error("Caught unexpected Throwable", t); - terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); - } + // Exit thread + myLogger.info("Thread terminating"); + return; + } + // Process jobs. + // Mark start time + long now = System.currentTimeMillis(); + for (Job job : batch) + { + job.startTime = now; } + + // Service the jobs + try + { + processJobs(batch, serviceProviderPeer, prerequisites); + } + catch (EngineException e) + { + terminate(BroadcastState.ABORTED, e.getMessage()); + } + catch (Throwable t) + { + // This is unexpected. Log stack trace + myLogger.error("Caught unexpected Throwable", t); + terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); + } + if (sleepBetweenJobs > 0) { try @@ -366,7 +373,6 @@ public abstract class Broadcast } } } - } @@ -588,10 +594,10 @@ public abstract class Broadcast this.haltReason = haltReason; this.stateErrorText = stateErrorText; - CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state)); + CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState)); if (postback != null) { - synchronized(postback.postQueue) + //synchronized(postback.postQueue) { postback.queueReport(mkStatusReport(newState)); state = newState; @@ -1022,13 +1028,12 @@ public abstract class Broadcast private boolean threadsShouldStop() { - BroadcastState state = getState(); - return state == BroadcastState.CANCELING || state.isFinal; + return state == BroadcastState.CANCELING || + state == BroadcastState.CANCELED || state.isFinal; } private boolean threadsShouldPause() { - BroadcastState state = getState(); return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING; } @@ -1093,17 +1098,7 @@ public abstract class Broadcast { completedJobCount++; logJobCount("Completed a job"); - - //if (getRemainingJobCount() == 0) - if (completedJobCount == jobsTotal) - { - terminate(BroadcastState.COMPLETED); - } - else if (getActiveJobCount() == 0) - { - if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); - else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED); - } + if (allDone() && state==BroadcastState.RUNNING) setState(BroadcastState.ALLDONE); } else if (rescheduleTimeMS == 0) { @@ -1182,4 +1177,8 @@ public abstract class Broadcast public String getBroadcastType() { return broadcastType; } + + public boolean allDone() { + return (completedJobCount == jobsTotal); + } } diff --git a/src/main/java/altk/comm/engine/Postback.java b/src/main/java/altk/comm/engine/Postback.java index 7784c44..c8ec5e1 100644 --- a/src/main/java/altk/comm/engine/Postback.java +++ b/src/main/java/altk/comm/engine/Postback.java @@ -43,6 +43,7 @@ import org.apache.log4j.Logger; import org.w3c.dom.Document; import org.w3c.dom.Node; +import altk.comm.engine.Broadcast.BroadcastState; import altk.comm.engine.CommonLogger; @@ -145,11 +146,34 @@ public class Postback } // No reports - //if (jobReportsQueued == jobsTotal) + BroadcastState finalState = null; + String reason = null; + switch (broadcast.getState()) + { + case ALLDONE: + finalState = BroadcastState.COMPLETED; + reason = "All posted"; + break; + case CANCELING: + finalState = BroadcastState.CANCELED; + reason = "User canceled"; + break; + case PAUSING: + finalState = BroadcastState.PAUSED; + reason = "User paused"; + break; + default: + } + if (finalState != null) + { + broadcast.terminate(finalState, reason); + myLogger.info("All posted, thread terminating"); + return; + } if (broadcast.getState().isFinal) { // No more. Notify all waiting postback threads and exit thread - myLogger.info("All done, thread terminating"); + myLogger.info("All posted, thread terminating"); postQueue.notifyAll(); return; }