Просмотр исходного кода

Add stateSemaphore to ensure modifications to state value is synchronized.

tags/CommEngine-1.0.6
ymlam 4 лет назад
Родитель
Сommit
253c78ab71
1 измененных файлов: 122 добавлений и 108 удалений
  1. +122
    -108
      src/main/java/altk/comm/engine/Broadcast.java

+ 122
- 108
src/main/java/altk/comm/engine/Broadcast.java Просмотреть файл

@@ -41,6 +41,7 @@ public abstract class Broadcast
private String broadcastId;
private BroadcastState state = BroadcastState.ACCEPTED;
private Object stateSemaphore = new Object();

String haltReason;
String stateErrorText;
@@ -335,62 +336,62 @@ public abstract class Broadcast
{
myLogger.debug("Waiting for jobs");
readyQueue.wait();
// go back to look for jobs
continue;
}
catch (Exception e)
{
// Do nothing
// go back to look for jobs
continue;
}
}
updateServiceActivityCount(batch.size());
}

if (batch.size() > 0)
// Process jobs.
// Mark start time
long now = System.currentTimeMillis();
for (Job job : batch)
{
// Process jobs.
// Mark start time
long now = System.currentTimeMillis();
for (Job job : batch)
{
job.startTime = now;
}
// Service the jobs
// But first get dependent resource
// which includes allocation from capacity. Only returns when the required allocation
// is obtained. Example, RTP port allocation, limit due to total number of allowable calls.
ServicePrerequisites prerequisites = secureServicePrerequisites();
try
{
updateServiceActivityCount(1);
int transactions = processJobs(batch, serviceProviderPeer, prerequisites);
incrementTransactions(transactions);
}
catch (EngineException e)
{
// Aborting
setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText);
}
catch (Throwable t)
{
// This is unexpected. Log stack trace
myLogger.error("Caught unexpected Throwable", t);
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
}
finally
{
updateServiceActivityCount(-1);
}
if (sleepBetweenJobs > 0)
{
try
{
Thread.sleep(sleepBetweenJobs);
}
catch (InterruptedException e1)
{
// Do nothing?
}
}
job.startTime = now;
}
// Service the jobs
// But first get dependent resource
// which includes allocation from capacity. Only returns when the required allocation
// is obtained. Example, RTP port allocation, limit due to total number of allowable calls.
ServicePrerequisites prerequisites = secureServicePrerequisites();

try
{
int transactions = processJobs(batch, serviceProviderPeer, prerequisites);
incrementTransactions(transactions);
}
catch (EngineException e)
{
// Aborting
setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText);
}
catch (Throwable t)
{
// This is unexpected. Log stack trace
myLogger.error("Caught unexpected Throwable", t);
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
}
finally
{
updateServiceActivityCount(-batch.size());
}
if (sleepBetweenJobs > 0)
{
try
{
Thread.sleep(sleepBetweenJobs);
}
catch (InterruptedException e1)
{
// Do nothing?
}
}
}
}
@@ -429,10 +430,6 @@ public abstract class Broadcast
transactions += delta;
}
}
public int pendingJobs()
{
return readyQueue.size() + scheduledJobs;
}
/**
* Experimental formulation where it takes over directing
@@ -493,7 +490,7 @@ public abstract class Broadcast
{
readyQueue.add(mkJob(recipient));
}
if (getState() == BroadcastState.ALLDONE) return;
if (state == BroadcastState.ALLDONE) return;
}
catch (BroadcastException e)
{
@@ -641,34 +638,37 @@ public abstract class Broadcast
boolean isLegal;
BroadcastState prev = null;
if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null);
List<BroadcastState> to = toStates.get(state);
isLegal = (to == null? false : to.contains(newState));
prev = state;
if (isLegal)
synchronized(stateSemaphore)
{
this.haltReason = haltReason;
this.stateErrorText = stateErrorText;
CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState));
state = newState;
changeStateTime = System.currentTimeMillis();
if (state == BroadcastState.RUNNING) serviceStartTime = changeStateTime;
if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime;
if (postback != null)
{
if (state == BroadcastState.ALLDONE) postback.queueReport(mkStatusReport());
else postback.queueReportFirst(mkStatusReport());
}
return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev);
}
else
{
// log illegal state transition with call trace
Exception e = new Exception(String.format("Broadast %s ignored illegal transition from %s to %s", broadcastId, prev, newState));
myLogger.error(e.getMessage());
myLogger.debug("This exception is not thrown -- only for debugging information", e);
return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null);
List<BroadcastState> to = toStates.get(state);
isLegal = (to == null? false : to.contains(newState));
prev = state;
if (isLegal)
{
this.haltReason = haltReason;
this.stateErrorText = stateErrorText;
CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState));
state = newState;
changeStateTime = System.currentTimeMillis();
if (state == BroadcastState.RUNNING) serviceStartTime = changeStateTime;
if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime;
if (postback != null)
{
if (state == BroadcastState.ALLDONE) postback.queueReport(mkStatusReport());
else postback.queueReportFirst(mkStatusReport());
}
return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev);
}
else
{
// log illegal state transition with call trace
Exception e = new Exception(String.format("Broadast %s ignored illegal transition from %s to %s", broadcastId, prev, newState));
myLogger.error(e.getMessage());
myLogger.debug("This exception is not thrown -- only for debugging information", e);
return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null);
}
}
}

@@ -720,7 +720,7 @@ public abstract class Broadcast
responseXML.append("\"");
}
responseXML.append(" accepted='");
responseXML.append(e != null || getState() == BroadcastState.COMPLETED ? "FALSE" : "TRUE");
responseXML.append(e != null || state == BroadcastState.COMPLETED ? "FALSE" : "TRUE");
responseXML.append("'");
if (e == null)
{
@@ -927,10 +927,8 @@ public abstract class Broadcast
*/
protected int getActiveJobCount()
{
return serviceActivityCount;
return serviceActivityCount;
}

/**
* Parses broadcastId and return if notInService is true.
@@ -1128,7 +1126,7 @@ public abstract class Broadcast
{
return true;
}
if (pendingJobs() == 0)
if (getRemainingJobCount() == 0)
{
wakeUpServiceThreads();
return true;
@@ -1309,8 +1307,16 @@ public abstract class Broadcast

public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
{
scheduledJobs++;
Runnable r = new Runnable() { public void run() { scheduledJobs--; addJob(job);}};
synchronized(readyQueue) {
scheduledJobs++;
}
Runnable r = new Runnable() { public void run() {
synchronized(readyQueue) {
scheduledJobs--;
}
addJob(job);
}
};
return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS);
}
@@ -1326,7 +1332,7 @@ public abstract class Broadcast
case RUNNING:
case PAUSING:
case PAUSED:
return pendingJobs();
return getRemainingJobCount();
default:
return 0;
}
@@ -1334,7 +1340,7 @@ public abstract class Broadcast

public int getCompletedJobCount()
{
return jobsTotal - pendingJobs() - serviceActivityCount;
return jobsTotal - getRemainingJobCount() - serviceActivityCount;
}

public String getBroadcastType() {
@@ -1342,26 +1348,34 @@ public abstract class Broadcast
}

public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() {
myLogger.debug("getPostbackThreadActionOnEmpty(): broadcast state " + state);
if (state.isFinal) return PostbackThreadActionOnEmpty.STOP;
if (getActiveJobCount() == 0) {
if (state == BroadcastState.PAUSING) {
setState(BroadcastState.PAUSED);
return PostbackThreadActionOnEmpty.WAIT;
}
if (state == BroadcastState.CANCELING) {
setState(BroadcastState.CANCELED);
return PostbackThreadActionOnEmpty.STOP;
}
else if (state == BroadcastState.ABORTING) {
setState(BroadcastState.ABORTED);
return PostbackThreadActionOnEmpty.STOP;
}
else if (state == BroadcastState.COMPLETED) {
setState(BroadcastState.ALLDONE);
return PostbackThreadActionOnEmpty.STOP;
}
}
return PostbackThreadActionOnEmpty.WAIT;
int activeJobCount = getActiveJobCount();
myLogger.debug("getPostbackThreadActionOnEmpty(): activeJobCount = " + activeJobCount);
if (activeJobCount > 0) {
return PostbackThreadActionOnEmpty.WAIT;
}
if (state == BroadcastState.PAUSING) {
return setState(BroadcastState.PAUSED).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.WAIT;
}
if (state == BroadcastState.CANCELING) {
return setState(BroadcastState.CANCELED).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
}
else if (state == BroadcastState.ABORTING) {
return setState(BroadcastState.ABORTED).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
}
else if (state == BroadcastState.COMPLETED) {
return setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
}
else {
return PostbackThreadActionOnEmpty.WAIT;
}
}

}

Загрузка…
Отмена
Сохранить