소스 검색

Add BroadcastState.ABORTING to fix bug of losing postbacks on abort.

tags/CommEngine-1.0.6
ymlam 4 년 전
부모
커밋
8fd8813afa
2개의 변경된 파일86개의 추가작업 그리고 34개의 파일을 삭제
  1. +85
    -31
      src/main/java/altk/comm/engine/Broadcast.java
  2. +1
    -3
      src/main/java/altk/comm/engine/Postback.java

+ 85
- 31
src/main/java/altk/comm/engine/Broadcast.java 파일 보기

@@ -79,6 +79,7 @@ public abstract class Broadcast
private int scheduledJobs;
private Integer transactions;
/** Instantaneous number of jobs being serviced by service provider */
private Integer serviceActivityCount;
/** Running count of successful jobs */
@@ -87,17 +88,27 @@ public abstract class Broadcast
public static enum BroadcastState
{
/** Broadcast request accepted for execution */
ACCEPTED,
/** Servicing jobs */
RUNNING,
/** User action, causing system to quiesce, i.e. quiet down */
PAUSING,
/** System is paused and quiet. Ready to resume */
PAUSED,
/** User action */
CANCELING,
/** Ireoverable internal or service provider error */
ABORTING,
/** All servicing done, reporting may still be ongoing */
COMPLETED,
CANCELED(true), // Final state
PURGED(true), // Final state
ABORTED(true), // final state
EXPIRED(true), // final state
COMPLETED,
ALLDONE(true); // Final state
/** All servicing and reporting done */
ALLDONE(true) // Final state
;
final public boolean isFinal;
@@ -178,9 +189,10 @@ public abstract class Broadcast
BroadcastState.PAUSING, // User action
BroadcastState.PAUSED, // User action
BroadcastState.PURGED, // User action
BroadcastState.ABORTED, // TTS error
BroadcastState.ABORTING,
BroadcastState.ABORTED,
BroadcastState.EXPIRED,
BroadcastState.COMPLETED // When recipient list is empty
BroadcastState.ALLDONE // When recipient list is empty
));
// Transitions from RUNNING
@@ -191,19 +203,29 @@ public abstract class Broadcast
BroadcastState.PAUSING, // User action
BroadcastState.PAUSED, // User action
BroadcastState.PURGED, // User action
BroadcastState.ABORTED, // Service provider irrecoverable error
BroadcastState.ABORTING,
BroadcastState.ABORTED,
BroadcastState.EXPIRED
));
// Transitions from CANCELING
toStates.put(BroadcastState.CANCELING, Arrays.asList(
BroadcastState.ABORTING,
BroadcastState.ABORTED,
BroadcastState.CANCELED, // User action
BroadcastState.PURGED // User action
));
// Transitions from ABORTING
toStates.put(BroadcastState.ABORTING, Arrays.asList(
BroadcastState.ABORTED,
BroadcastState.PURGED // User action
));
// Transitions from PAUSING
toStates.put(BroadcastState.PAUSING, Arrays.asList(
BroadcastState.RUNNING, // User action
BroadcastState.ABORTING,
BroadcastState.CANCELED, // User action
BroadcastState.PAUSED,
BroadcastState.PURGED // User action
@@ -214,6 +236,8 @@ public abstract class Broadcast
BroadcastState.RUNNING, // User action
BroadcastState.CANCELED, // User action
BroadcastState.CANCELING, // User action
BroadcastState.ABORTING,
BroadcastState.ABORTED,
BroadcastState.PURGED // User action
));
toStates.put(BroadcastState.COMPLETED, Arrays.asList(
@@ -340,11 +364,11 @@ public abstract class Broadcast
updateServiceActivityCount(1);
int transactions = processJobs(batch, serviceProviderPeer, prerequisites);
incrementTransactions(transactions);
updateServiceActivityCount(-1);
}
catch (EngineException e)
{
terminate(BroadcastState.ABORTED, e.getMessage());
// Aborting
setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText);
}
catch (Throwable t)
{
@@ -352,7 +376,10 @@ public abstract class Broadcast
myLogger.error("Caught unexpected Throwable", t);
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
}
finally
{
updateServiceActivityCount(-1);
}
if (sleepBetweenJobs > 0)
{
try
@@ -458,7 +485,7 @@ public abstract class Broadcast
if (recipientList.size() == 0)
{
CommonLogger.activity.info("Broadcast " + getBroadcastId() + ": No recipients");
setState(BroadcastState.COMPLETED, "No recipients", null);
setState(BroadcastState.ALLDONE, "No recipients", null);
return;
}
initSync(commEngine.getResources());
@@ -466,13 +493,13 @@ public abstract class Broadcast
{
readyQueue.add(mkJob(recipient));
}
if (getState() == BroadcastState.COMPLETED) return;
if (getState() == BroadcastState.ALLDONE) return;
}
catch (BroadcastException e)
{
myException = e;
setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText);
CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage());
setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText);
CommonLogger.alarm.error("Broadcast aborting: " + e.getMessage());
myLogger.error("Broadcast aborted", e);
return;
}
@@ -598,6 +625,7 @@ public abstract class Broadcast
*/
public StateChangeResult setState(BroadcastState newState)
{
if (newState == BroadcastState.ABORTED) return setState(BroadcastState.ABORTED, haltReason, stateErrorText);
return setState(newState, null, null);
}
@@ -635,7 +663,11 @@ public abstract class Broadcast
}
else
{
myLogger.warn(String.format("Broadcast %s: Transition from %s to %s forbidden", broadcastId, prev, newState));
// log illegal state transition with call trace
Exception e = new Exception(String.format("Broadast %s ignored illegal transition from %s to %s", broadcastId, prev, newState));
myLogger.error(e.getMessage());
myLogger.debug("This exception is not thrown -- only for debugging information", e);
return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null);
}
}
@@ -834,12 +866,12 @@ public abstract class Broadcast
statusBf.append("<current_time>" + System.currentTimeMillis() + "</current_time>");
if (serviceStartTime > 0) statusBf.append("<service_start_time>" + serviceStartTime
+ "</service_start_time>");
if (serviceEndTime > 0) statusBf.append("<service_end_time>" + serviceEndTime
if (state.isFinal && serviceEndTime > 0) statusBf.append("<service_end_time>" + serviceEndTime
+ "</service_end_time>");
statusBf.append("<state>" + state + "</state><state_change_time>" + changeStateTime
+ "</state_change_time>");
if (state == BroadcastState.PAUSED
|| state == BroadcastState.ABORTED)
|| state == BroadcastState.ABORTED || state == BroadcastState.ABORTING)
{
if (haltReason != null)
{
@@ -891,9 +923,8 @@ public abstract class Broadcast

/**
*
* @return number of active jobs
* computed from jobsTotal, jobReportsQueued and readyQueue.size()
*/
* @return instantaneous number of jobs being serviced by service provider
*/
protected int getActiveJobCount()
{
return serviceActivityCount;
@@ -1030,12 +1061,11 @@ public abstract class Broadcast
}
}
waitForEndOfService();
setState(BroadcastState.COMPLETED);
}
destroyResources();
postback.wrapup();
postback = null;
myLogger.info("Broadcast " + getId() + " terminated");
CommonLogger.activity.info("Broadcast " + getId() + " terminated");
}
/**
@@ -1093,8 +1123,8 @@ public abstract class Broadcast
return true;
}
if (state == BroadcastState.CANCELING ||
state == BroadcastState.EXPIRED ||
state == BroadcastState.CANCELED || state.isFinal)
state == BroadcastState.ABORTING ||
state.isFinal)
{
return true;
}
@@ -1152,8 +1182,12 @@ public abstract class Broadcast
serviceActivityCount += increment;
if (increment < 0 && serviceActivityCount <= 0)
{
if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED);
if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED);
if (state == BroadcastState.RUNNING)
{
if (getRemainingJobCount() == 0) {
setState(BroadcastState.COMPLETED);
}
}
}
}
}
@@ -1190,18 +1224,20 @@ public abstract class Broadcast
postback.queueReport(report.toString());
}
// No more rescheduling on cancel, abort, expired, or pause
if (state == BroadcastState.CANCELING
|| state == BroadcastState.CANCELED
|| state == BroadcastState.EXPIRED
|| state == BroadcastState.PAUSED
|| state == BroadcastState.PAUSING
|| state == BroadcastState.ABORTED
|| state == BroadcastState.ABORTING
|| state == BroadcastState.ALLDONE
)
{
// No more rescheduling on cancel, expire, or pause
//completedJobCount++;
//logJobCount("Completed a job");
return;
}
else if (rescheduleTimeMS == 0)
if (rescheduleTimeMS == 0)
{
addJob(job);
//logJobCount("Added a job to queue");
@@ -1266,7 +1302,9 @@ public abstract class Broadcast
*/
private int getRemainingJobCount()
{
return readyQueue.size() + scheduledJobs;
synchronized(readyQueue) {
return readyQueue.size() + scheduledJobs;
}
}

public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
@@ -1304,9 +1342,25 @@ public abstract class Broadcast
}

public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() {
//logJobCount("getPostbackThreadActionOnEmpty");
if (state.isFinal) return PostbackThreadActionOnEmpty.STOP;
if (setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS) return PostbackThreadActionOnEmpty.CONTINUE;
if (getActiveJobCount() == 0) {
if (state == BroadcastState.PAUSING) {
setState(BroadcastState.PAUSED);
return PostbackThreadActionOnEmpty.WAIT;
}
if (state == BroadcastState.CANCELING) {
setState(BroadcastState.CANCELED);
return PostbackThreadActionOnEmpty.STOP;
}
else if (state == BroadcastState.ABORTING) {
setState(BroadcastState.ABORTED);
return PostbackThreadActionOnEmpty.STOP;
}
else if (state == BroadcastState.COMPLETED) {
setState(BroadcastState.ALLDONE);
return PostbackThreadActionOnEmpty.STOP;
}
}
return PostbackThreadActionOnEmpty.WAIT;
}



+ 1
- 3
src/main/java/altk/comm/engine/Postback.java 파일 보기

@@ -126,7 +126,7 @@ public class Postback
if (report == null) break;
reportList.add(report);
}
// If space in que is generated, wake up all service queues waiting for space
// If space in queue is generated, wake up all service queues waiting for space
if (reportList.size() > 0 && threadsWaitingToPost > 0) postQueue.notifyAll();
}
if (reportList.size() > 0)
@@ -175,7 +175,6 @@ public class Postback
}
}
break;
case CONTINUE:
default:
}
}
@@ -340,7 +339,6 @@ public class Postback
cm.setMaxTotal(senderPoolSize);
cm.setDefaultMaxPerRoute(senderPoolSize);

//
// Retry handler
maxRetries = RETRIES_DEFAULT;
HttpRequestRetryHandler retryHandler = new HttpRequestRetryHandler()


불러오는 중...
취소
저장