Browse Source

Fix bug in not properly destroying service and postback threads.

Also fix bug of not doing CANCEL properly.
tags/CommEngine-0.0.3
ymlam 6 years ago
parent
commit
cc1a5c174e
2 changed files with 76 additions and 53 deletions
  1. +50
    -51
      src/main/java/altk/comm/engine/Broadcast.java
  2. +26
    -2
      src/main/java/altk/comm/engine/Postback.java

+ 50
- 51
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -79,6 +79,7 @@ public abstract class Broadcast
{ {
ACCEPTED, ACCEPTED,
RUNNING, RUNNING,
ALLDONE,
PAUSING, PAUSING,
PAUSED, PAUSED,
CANCELING, CANCELING,
@@ -181,34 +182,34 @@ public abstract class Broadcast
BroadcastState.PURGED, // User action BroadcastState.PURGED, // User action
BroadcastState.ABORTED, // Service provider irrecoverable error BroadcastState.ABORTED, // Service provider irrecoverable error
BroadcastState.EXPIRED, BroadcastState.EXPIRED,
BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues.
BroadcastState.ALLDONE // Natural transition, if all ongoing calls complete and no more jobs in Dispatcher queues.
)); ));
// Transitions from CANCELING // Transitions from CANCELING
toStates.put(BroadcastState.CANCELING, Arrays.asList( toStates.put(BroadcastState.CANCELING, Arrays.asList(
BroadcastState.CANCELED, // User action BroadcastState.CANCELED, // User action
BroadcastState.PURGED, // User action
BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues.
BroadcastState.PURGED // User action
)); ));
// Transitions from HALTING
// Transitions from PAUSING
toStates.put(BroadcastState.PAUSING, Arrays.asList( toStates.put(BroadcastState.PAUSING, Arrays.asList(
BroadcastState.RUNNING, // User action BroadcastState.RUNNING, // User action
BroadcastState.CANCELED, // User action BroadcastState.CANCELED, // User action
BroadcastState.PAUSED, BroadcastState.PAUSED,
BroadcastState.PURGED, // User action
BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues.
BroadcastState.PURGED // User action
)); ));


// Transitions from HALTED
// Transitions from PAUSED
toStates.put(BroadcastState.PAUSED, Arrays.asList( toStates.put(BroadcastState.PAUSED, Arrays.asList(
BroadcastState.RUNNING, // User action BroadcastState.RUNNING, // User action
BroadcastState.CANCELED, // User action BroadcastState.CANCELED, // User action
BroadcastState.CANCELING, // User action BroadcastState.CANCELING, // User action
BroadcastState.PURGED, // User action
BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues.
BroadcastState.PURGED // User action
));
// Transitions from ALLDONE
toStates.put(BroadcastState.ALLDONE, Arrays.asList(
BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more jobs in post queue
)); ));

} }
public static class StateChangeResult public static class StateChangeResult
@@ -244,6 +245,7 @@ public abstract class Broadcast
if (threadsShouldStop()) if (threadsShouldStop())
{ {
closeServiceProvider(serviceProviderPeer); closeServiceProvider(serviceProviderPeer);
myLogger.info("Thread terminating");
return; return;
} }
@@ -286,7 +288,7 @@ public abstract class Broadcast
return; return;
} }
myLogger.debug("Found some jobs");
myLogger.debug("Woke up from wait");
// Check if expired // Check if expired
if (System.currentTimeMillis() >= expireTime) if (System.currentTimeMillis() >= expireTime)
{ {
@@ -328,31 +330,36 @@ public abstract class Broadcast
batch.add(job); batch.add(job);
} }
} }
if (batch != null && batch.size() > 0)
if (batch == null || batch.size()== 0)
{ {
// Mark start time
long now = System.currentTimeMillis();
for (Job job : batch)
{
job.startTime = now;
}
// Service the jobs
try
{
processJobs(batch, serviceProviderPeer, prerequisites);
}
catch (EngineException e)
{
terminate(BroadcastState.ABORTED, e.getMessage());
}
catch (Throwable t)
{
// This is unexpected. Log stack trace
myLogger.error("Caught unexpected Throwable", t);
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
}
// Exit thread
myLogger.info("Thread terminating");
return;
}
// Process jobs.
// Mark start time
long now = System.currentTimeMillis();
for (Job job : batch)
{
job.startTime = now;
} }
// Service the jobs
try
{
processJobs(batch, serviceProviderPeer, prerequisites);
}
catch (EngineException e)
{
terminate(BroadcastState.ABORTED, e.getMessage());
}
catch (Throwable t)
{
// This is unexpected. Log stack trace
myLogger.error("Caught unexpected Throwable", t);
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
}
if (sleepBetweenJobs > 0) if (sleepBetweenJobs > 0)
{ {
try try
@@ -366,7 +373,6 @@ public abstract class Broadcast
} }
} }
} }

} }




@@ -588,10 +594,10 @@ public abstract class Broadcast
this.haltReason = haltReason; this.haltReason = haltReason;
this.stateErrorText = stateErrorText; this.stateErrorText = stateErrorText;
CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state));
CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState));
if (postback != null) if (postback != null)
{ {
synchronized(postback.postQueue)
//synchronized(postback.postQueue)
{ {
postback.queueReport(mkStatusReport(newState)); postback.queueReport(mkStatusReport(newState));
state = newState; state = newState;
@@ -1022,13 +1028,12 @@ public abstract class Broadcast
private boolean threadsShouldStop() private boolean threadsShouldStop()
{ {
BroadcastState state = getState();
return state == BroadcastState.CANCELING || state.isFinal;
return state == BroadcastState.CANCELING ||
state == BroadcastState.CANCELED || state.isFinal;
} }


private boolean threadsShouldPause() private boolean threadsShouldPause()
{ {
BroadcastState state = getState();
return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING; return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING;
} }
@@ -1093,17 +1098,7 @@ public abstract class Broadcast
{ {
completedJobCount++; completedJobCount++;
logJobCount("Completed a job"); logJobCount("Completed a job");
//if (getRemainingJobCount() == 0)
if (completedJobCount == jobsTotal)
{
terminate(BroadcastState.COMPLETED);
}
else if (getActiveJobCount() == 0)
{
if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED);
else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED);
}
if (allDone() && state==BroadcastState.RUNNING) setState(BroadcastState.ALLDONE);
} }
else if (rescheduleTimeMS == 0) else if (rescheduleTimeMS == 0)
{ {
@@ -1182,4 +1177,8 @@ public abstract class Broadcast
public String getBroadcastType() { public String getBroadcastType() {
return broadcastType; return broadcastType;
} }
public boolean allDone() {
return (completedJobCount == jobsTotal);
}
} }

+ 26
- 2
src/main/java/altk/comm/engine/Postback.java View File

@@ -43,6 +43,7 @@ import org.apache.log4j.Logger;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.w3c.dom.Node; import org.w3c.dom.Node;


import altk.comm.engine.Broadcast.BroadcastState;
import altk.comm.engine.CommonLogger; import altk.comm.engine.CommonLogger;




@@ -145,11 +146,34 @@ public class Postback
} }
// No reports // No reports
//if (jobReportsQueued == jobsTotal)
BroadcastState finalState = null;
String reason = null;
switch (broadcast.getState())
{
case ALLDONE:
finalState = BroadcastState.COMPLETED;
reason = "All posted";
break;
case CANCELING:
finalState = BroadcastState.CANCELED;
reason = "User canceled";
break;
case PAUSING:
finalState = BroadcastState.PAUSED;
reason = "User paused";
break;
default:
}
if (finalState != null)
{
broadcast.terminate(finalState, reason);
myLogger.info("All posted, thread terminating");
return;
}
if (broadcast.getState().isFinal) if (broadcast.getState().isFinal)
{ {
// No more. Notify all waiting postback threads and exit thread // No more. Notify all waiting postback threads and exit thread
myLogger.info("All done, thread terminating");
myLogger.info("All posted, thread terminating");
postQueue.notifyAll(); postQueue.notifyAll();
return; return;
} }


Loading…
Cancel
Save