Преглед изворни кода

Change HALT to PAUSE

Added syncrhonized qualifier to 2 methods to guard against simultaneous update by 2 threads.
Completed CANCEL processing.
tags/Production_2014_08_01
ymlam пре 11 година
родитељ
комит
83b9ecefad
1 измењених фајлова са 98 додато и 53 уклоњено
  1. +98
    -53
      src/altk/comm/engine/Broadcast.java

+ 98
- 53
src/altk/comm/engine/Broadcast.java Прегледај датотеку

@@ -1,7 +1,6 @@
package altk.comm.engine; package altk.comm.engine;


import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@@ -18,7 +17,6 @@ import javax.servlet.http.HttpServletRequest;


import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.NDC; import org.apache.log4j.NDC;
import org.omg.CORBA_2_3.portable.OutputStream;


import altk.comm.engine.exception.BroadcastException; import altk.comm.engine.exception.BroadcastException;
import altk.comm.engine.exception.EngineException; import altk.comm.engine.exception.EngineException;
@@ -38,7 +36,7 @@ public abstract class Broadcast
public final String broadcastType; public final String broadcastType;
private String broadcastId; private String broadcastId;
private BroadcastState state = BroadcastState.INSTALLING;
private BroadcastState state = BroadcastState.ACCEPTED;


String haltReason; String haltReason;
String stateErrorText; String stateErrorText;
@@ -60,9 +58,6 @@ public abstract class Broadcast


protected static Logger myLogger = Logger.getLogger(Broadcast.class); protected static Logger myLogger = Logger.getLogger(Broadcast.class);


/**
* This queue is designed for only one server
*/
private Queue<Job> readyQueue; private Queue<Job> readyQueue;
private List<Thread> serviceThreadPool; private List<Thread> serviceThreadPool;
private Object resumeFlag; // Semaphore for dispatcher threads to resume. private Object resumeFlag; // Semaphore for dispatcher threads to resume.
@@ -74,15 +69,28 @@ public abstract class Broadcast


public static enum BroadcastState public static enum BroadcastState
{ {
INSTALLING,
ACCEPTED,
RUNNING, RUNNING,
HALTING,
HALTED,
PAUSING,
PAUSED,
CANCELING, CANCELING,
CANCELED, // Final state
PURGED, // Final state
ABORTED, // final state
COMPLETED // Final state
CANCELED(true), // Final state
PURGED(true), // Final state
ABORTED(true), // final state
EXPIRED(true), // final state
COMPLETED(true); // Final state
final public boolean isFinal;
private BroadcastState()
{
isFinal = false;
}
private BroadcastState(boolean isFinal)
{
this.isFinal = isFinal;
}
} }
public enum StateChangeStatus public enum StateChangeStatus
@@ -144,12 +152,12 @@ public abstract class Broadcast
toStates = new HashMap<BroadcastState, List<BroadcastState>>(); toStates = new HashMap<BroadcastState, List<BroadcastState>>();


// Transitions from INSTALLING // Transitions from INSTALLING
toStates.put(BroadcastState.INSTALLING, Arrays.asList(
toStates.put(BroadcastState.ACCEPTED, Arrays.asList(
BroadcastState.RUNNING, // Normal transition BroadcastState.RUNNING, // Normal transition
BroadcastState.CANCELING, // User action BroadcastState.CANCELING, // User action
BroadcastState.CANCELED, // User action BroadcastState.CANCELED, // User action
BroadcastState.HALTING, // User action
BroadcastState.HALTED, // User action
BroadcastState.PAUSING, // User action
BroadcastState.PAUSED, // User action
BroadcastState.PURGED, // User action BroadcastState.PURGED, // User action
BroadcastState.ABORTED, // TTS error BroadcastState.ABORTED, // TTS error
BroadcastState.COMPLETED // When recipient list is empty BroadcastState.COMPLETED // When recipient list is empty
@@ -159,8 +167,8 @@ public abstract class Broadcast
toStates.put(BroadcastState.RUNNING, Arrays.asList( toStates.put(BroadcastState.RUNNING, Arrays.asList(
BroadcastState.CANCELING, // User action BroadcastState.CANCELING, // User action
BroadcastState.CANCELED, // User action BroadcastState.CANCELED, // User action
BroadcastState.HALTING, // User action
BroadcastState.HALTED, // User action
BroadcastState.PAUSING, // User action
BroadcastState.PAUSED, // User action
BroadcastState.PURGED, // User action BroadcastState.PURGED, // User action
BroadcastState.ABORTED, // Service provider irrecoverable error BroadcastState.ABORTED, // Service provider irrecoverable error
BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues. BroadcastState.COMPLETED // Natural transition, if all ongoing calls complete and no more calls in Dispatcher queues.
@@ -174,16 +182,16 @@ public abstract class Broadcast
)); ));
// Transitions from HALTING // Transitions from HALTING
toStates.put(BroadcastState.HALTING, Arrays.asList(
toStates.put(BroadcastState.PAUSING, Arrays.asList(
BroadcastState.RUNNING, // User action BroadcastState.RUNNING, // User action
BroadcastState.CANCELED, // User action BroadcastState.CANCELED, // User action
BroadcastState.HALTED,
BroadcastState.PAUSED,
BroadcastState.PURGED, // User action BroadcastState.PURGED, // User action
BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues. BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more calls in Dispatcher queues.
)); ));


// Transitions from HALTED // Transitions from HALTED
toStates.put(BroadcastState.HALTED, Arrays.asList(
toStates.put(BroadcastState.PAUSED, Arrays.asList(
BroadcastState.RUNNING, // User action BroadcastState.RUNNING, // User action
BroadcastState.CANCELED, // User action BroadcastState.CANCELED, // User action
BroadcastState.CANCELING, // User action BroadcastState.CANCELING, // User action
@@ -220,8 +228,7 @@ public abstract class Broadcast
public void run() public void run()
{ {
myLogger.info("Thread " + getName() + " starting...");
NDC.push(getName());
myLogger.info("Thread starting...");
for (;;) for (;;)
{ {
if (threadsShouldStop()) if (threadsShouldStop())
@@ -269,6 +276,16 @@ public abstract class Broadcast
return; return;
} }
// Check if expired
if (System.currentTimeMillis() >= expireTime)
{
setState(BroadcastState.EXPIRED);
continue;
}
/**
* Includes allocation from capacity. Only returns when the required allocation
* is obtained. Example, RTP port allocation, limit due to total number of allowable calls.
*/
prerequisites = secureServicePrerequisites(); prerequisites = secureServicePrerequisites();
if (threadsShouldStop() || threadsShouldPause()) if (threadsShouldStop() || threadsShouldPause())
@@ -277,6 +294,15 @@ public abstract class Broadcast
continue; continue;
} }
// Check again if expired
if (System.currentTimeMillis() >= expireTime)
{
returnPrerequisites(prerequisites);
setState(BroadcastState.EXPIRED);
continue;
}

// Now that we can go ahead with this job, let us remove this from queue // Now that we can go ahead with this job, let us remove this from queue
readyQueue.poll(); readyQueue.poll();
@@ -304,7 +330,6 @@ public abstract class Broadcast
try try
{ {
processJobs(batch, serviceProviderPeer, prerequisites); processJobs(batch, serviceProviderPeer, prerequisites);
completedJobCount++;
} }
catch (EngineException e) catch (EngineException e)
{ {
@@ -374,11 +399,11 @@ public abstract class Broadcast
/** /**
* Makes a state transition to the given newState if the transition from * Makes a state transition to the given newState if the transition from
* the current state is legal.
* the current state is legal. Also posts back a state change notification.
* @param newState * @param newState
* @return StateChangeResult * @return StateChangeResult
*/ */
public StateChangeResult setState(BroadcastState newState,
public synchronized StateChangeResult setState(BroadcastState newState,
String haltReason, String stateErrorText) String haltReason, String stateErrorText)
{ {
boolean isLegal; boolean isLegal;
@@ -529,13 +554,7 @@ public abstract class Broadcast
return responseXML.toString(); return responseXML.toString();
} }


private boolean stateIsFinal(BroadcastState state)
{
return state == BroadcastState.ABORTED || state == BroadcastState.CANCELED
|| state == BroadcastState.COMPLETED || state == BroadcastState.PURGED;
}
/**
/**
* If finalState is final, then this state is set, and dispatcher threads are stopped. * If finalState is final, then this state is set, and dispatcher threads are stopped.
* Overriding implementation may release all other resources, like timers. * Overriding implementation may release all other resources, like timers.
* @param finalState * @param finalState
@@ -552,8 +571,20 @@ public abstract class Broadcast
*/ */
public void terminate(BroadcastState finalState, String reason) public void terminate(BroadcastState finalState, String reason)
{ {
if (!stateIsFinal(finalState)) throw new IllegalArgumentException("Argument finalState " + finalState + " in Broadcast.terminate method is not final");
setState(finalState, reason, null);
if (!finalState.isFinal) throw new IllegalArgumentException("Argument finalState " + finalState + " in Broadcast.terminate method is not final");
StateChangeResult result = setState(finalState, reason, null);
switch (result.stateChangeStatus)
{
case SUCCESS:
break;
case NO_CHANGE:
return;
case FORBIDDEN:
myLogger.error("Not allow to terminate broadcast in " + result.currentState + " state");
return;
default: // Should not happen
return;
}
// Wake up all dispatcher threads waiting on readyQueue so they will all stop // Wake up all dispatcher threads waiting on readyQueue so they will all stop
synchronized(readyQueue) synchronized(readyQueue)
@@ -597,7 +628,7 @@ public abstract class Broadcast
} }
statusBf.append(">\r\n<state>" + state + "</state><state_change_time>" + changeStateTime statusBf.append(">\r\n<state>" + state + "</state><state_change_time>" + changeStateTime
+ "</state_change_time>\r\n"); + "</state_change_time>\r\n");
if (state == BroadcastState.HALTED
if (state == BroadcastState.PAUSED
|| state == BroadcastState.ABORTED) || state == BroadcastState.ABORTED)
{ {
if (haltReason != null) if (haltReason != null)
@@ -706,11 +737,25 @@ public abstract class Broadcast
/** /**
* Sets the stateMachine to CANCEL * Sets the stateMachine to CANCEL
*/ */
protected void cancel()
protected void cancel(PrintWriter out)
{ {
if (this.getActiveJobCount() == 0) setState(BroadcastState.CANCELED);
// Sets state to CANCELING, which is monitored by its Broadcast.Service threads.
else setState(BroadcastState.CANCELING);
BroadcastState targetState = getActiveJobCount() == 0?
BroadcastState.CANCELED : BroadcastState.CANCELING;
StateChangeResult result = setState(targetState);
String responseContent = null;
switch (result.stateChangeStatus)
{
case SUCCESS:
responseContent = "OK";
break;
case NO_CHANGE:
responseContent = "Not canceled: Already cancelled";
break;
case FORBIDDEN:
responseContent = "Not canceled: Not allowed to cancel a broadcast in " + result.currentState + " state";
}
out.write(responseContent);

synchronized(resumeFlag) synchronized(resumeFlag)
{ {
resumeFlag.notifyAll(); resumeFlag.notifyAll();
@@ -720,7 +765,7 @@ public abstract class Broadcast
protected void pause() protected void pause()
{ {
// Sets state to HALTED, which is monitored by Broadcast.Service threads. // Sets state to HALTED, which is monitored by Broadcast.Service threads.
setState(BroadcastState.HALTING);
setState(BroadcastState.PAUSING);
} }
protected void resume() protected void resume()
@@ -775,13 +820,13 @@ public abstract class Broadcast
private boolean threadsShouldStop() private boolean threadsShouldStop()
{ {
BroadcastState state = getState(); BroadcastState state = getState();
return state == BroadcastState.CANCELING || stateIsFinal(state);
return state == BroadcastState.CANCELING || state.isFinal;
} }


private boolean threadsShouldPause() private boolean threadsShouldPause()
{ {
BroadcastState state = getState(); BroadcastState state = getState();
return state == BroadcastState.HALTED || state == BroadcastState.HALTING;
return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING;
} }
/** /**
@@ -804,12 +849,13 @@ public abstract class Broadcast


/** /**
* Sets jobStatus in job, and post job report. * Sets jobStatus in job, and post job report.
* If no rescheduling, then decrement number of remainingJobs,
* If jobStatus is final, and no rescheduling,
* then decrement number of remainingJobs,and increment completedJobCount.
* @param job * @param job
* @param jobStatus * @param jobStatus
* @param errorText * @param errorText
*/ */
public void postJobStatus(Job job)
public synchronized void postJobStatus(Job job)
{ {
if (postBack != null) if (postBack != null)
{ {
@@ -822,6 +868,8 @@ public abstract class Broadcast
if (job.jobStatus.isTerminal()) if (job.jobStatus.isTerminal())
{ {
remainingJobs--; remainingJobs--;
completedJobCount++;

if (remainingJobs == 0) if (remainingJobs == 0)
{ {
terminate(BroadcastState.COMPLETED); terminate(BroadcastState.COMPLETED);
@@ -829,13 +877,14 @@ public abstract class Broadcast
else if (getActiveJobCount() == 0) else if (getActiveJobCount() == 0)
{ {
if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED);
else if (state == BroadcastState.HALTING) setState(BroadcastState.HALTED);
else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED);
} }
} }
} }
/** /**
* Sets jobStatus in job, and post job report. * Sets jobStatus in job, and post job report.
* Optionally reschedules job.
* If no rescheduling, then decrement number of remainingJobs, * If no rescheduling, then decrement number of remainingJobs,
* @param job * @param job
* @param jobStatus * @param jobStatus
@@ -845,10 +894,6 @@ public abstract class Broadcast
protected void postJobStatus(Job job, long rescheduleTimeMS) protected void postJobStatus(Job job, long rescheduleTimeMS)
{ {
postJobStatus(job); postJobStatus(job);
if (rescheduleTimeMS < 0)
{
completedJobCount++;
}
if (rescheduleTimeMS == 0) if (rescheduleTimeMS == 0)
{ {
addJob(job); addJob(job);
@@ -875,8 +920,8 @@ public abstract class Broadcast
switch (state) switch (state)
{ {
case RUNNING: case RUNNING:
case HALTING:
case HALTED:
case PAUSING:
case PAUSED:
return readyQueue.size(); return readyQueue.size();
default: default:
return 0; return 0;


Loading…
Откажи
Сачувај