diff --git a/src/altk/comm/engine/Broadcast.java b/src/altk/comm/engine/Broadcast.java index aca51cc..f930f0c 100644 --- a/src/altk/comm/engine/Broadcast.java +++ b/src/altk/comm/engine/Broadcast.java @@ -1,5 +1,6 @@ package altk.comm.engine; +import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Arrays; @@ -14,12 +15,15 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; -import org.apache.log4j.NDC; +import altk.comm.engine.exception.BroadcastError; import altk.comm.engine.exception.BroadcastException; import altk.comm.engine.exception.EngineException; +import altk.comm.engine.exception.PlatformError; +import altk.comm.engine.exception.PlatformException; import altk.comm.engine.postback.PostBack; /** @@ -76,7 +80,7 @@ public abstract class Broadcast protected static Logger myLogger = Logger.getLogger(Broadcast.class); private Queue readyQueue; - private List serviceThreadPool; + protected List serviceThreadPool; private Object resumeFlag; // Semaphore for dispatcher threads to resume. protected List recipientList; //private int remainingJobs; @@ -390,12 +394,145 @@ public abstract class Broadcast this.jobReportRootNodeName = jobReportRootNodeName; sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT; readyQueue = new LinkedBlockingQueue(); - serviceThreadPool = new ArrayList(); + serviceThreadPool = new ArrayList(); recipientList = new ArrayList(); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); resumeFlag = new Object(); receiveTime = System.currentTimeMillis(); } + /** + * Experimental formulation where it takes over directing + * the activity of a Broadcast, as it should, instead of relegating + * it to CommEngine. This is directly invoked by CommEngine.doPost method, + * and is much easier to read and comprehend. + *

It is responsible for + *

    + *
  • Replying with HTTP response, and closing HTTP request and response + *
  • Does broadcast and posts real-time progress, using post back queues + * from CommEngine. + *
+ * Strategy of execution: + *
    + *
  • Decode xml request + *
  • Set up Service threads + *
  • Invite derived class to set up contexts, one for each of the Service threads. + *
  • Do broadcast. + *
+ * This method is to be invoked by the CommEngine.doPost method, which fields a HTTP POST. + *

+ * Adopting the use of this method by CommEngine + * is 100% compatible with derived classes VoiceBroadcast and EmailBroadcast + * existing today, 6/18/2016. + * + * + * @param request + * @param response + * @param commEngine + */ + protected void doPost(HttpServletRequest request, HttpServletResponse response, + CommEngine commEngine) + { + myLogger.debug("Entering Broadcast.doPost method"); + BroadcastException myException = null; + this.serviceThreadPoolSize = commEngine.getServiceThreadPoolSize(); + try + { + boolean notInService = commEngine.notInService(); + decode(request, notInService); + // Now that have decoded the id of this broadcast, + // ask CommEngine to install it with its id. + commEngine.installBroadcast(this); + + if (notInService) + { + throw new PlatformException(PlatformError.RUNTIME_ERROR, + "Not in service"); + } + if (recipientList.size() == 0) + { + // TODO: Got to return HTTP content before returning. + CommonLogger.activity.info("Broadcast " + getBroadcastId() + ": No recipients"); + setState(BroadcastState.COMPLETED, "No recipients", null); + return; + } + postBack = (PostBack)commEngine.getPostBack(getPostBackURL(), broadcastType); + initSync(commEngine.getResources()); + init(postBack); + if (getState() == BroadcastState.COMPLETED) return; + } + catch (BroadcastException e) + { + // TODO: Got to return HTTP content before returning. + myException = e; + setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); + CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); + myLogger.error("Broadcast aborted", e); + return; + } + catch (Throwable t) + { + // Caught stray unexpected runtime problem + CommonLogger.alarm.error("Broadcast aborted: " + t); + myLogger.error("Broadcast aborted", t); + myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage()); + setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText); + } + finally + { + // Return regular or error response + String responseXML = getResponseXML(myException); + PrintWriter writer; + try + { + writer = response.getWriter(); + writer.write(responseXML); + writer.close(); + } + catch (IOException e) + { + myLogger.error("Failed to write reponse to requester. Aborts broadcast." ); + if (state != BroadcastState.ABORTED) + { + setState(BroadcastState.ABORTED, "Failed to reply to requester", e.getMessage()); + } + return; + } + + if (myException != null) return; + } + // So far so good, we now go ahead with completing + // initialization. + try + { + initAsync(); + + effectiveJobCount = recipientList.size(); + + // Create service thread pool to dispatch jobs, + // at the same time, setting up a list of service thread names + // for use by derived class to set up contexts in which + // these threads run. + myLogger.debug("At creating service threads, serviceThreadPoolSize = " + serviceThreadPoolSize); + List serviceThreadNames = new ArrayList(); + for (int i = 0; i < serviceThreadPoolSize; i++) + { + String threadName = broadcastId + "_service_thread_" + i; + Service serviceThread = new Service(threadName); + serviceThreadPool.add(serviceThread); + serviceThreadNames.add(threadName); + } + //initServiceThreadContexts(serviceThreadNames); + + doBroadcast(); + } + catch (BroadcastException e) + { + setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); + CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); + myLogger.error("Broadcast aborted", e); + } + + } protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); @@ -504,6 +641,11 @@ public abstract class Broadcast return launchRecordId; } + /** + * + * @param e + * @return HTTP response for normal case (when exception e is null), or with exception + */ public String getResponseXML(BroadcastException e) { String tagName = broadcastType + "_response"; @@ -833,6 +975,45 @@ public abstract class Broadcast } } + /** + * For use by the new Broadcast.doPost method. + * It changes state to RUNNING and waits for all Service threads + * to terminate after starting them. + * @throws BroadcastException + */ + public void doBroadcast() throws BroadcastException + { + setState(BroadcastState.RUNNING); + + // Start the dispatcher threads + for (Service thread : serviceThreadPool) + { + thread.start(); + } + + for (Service thread : serviceThreadPool) + { + try + { + thread.join(); + } + catch (InterruptedException e) + { + myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e); + } + } + close(); + myLogger.info("Broadcast " + getId() + " terminated"); + } + + /** + * Derived may release resources here. + */ + protected void close() + { + // Do nothing in base class + } + public void startProcessing() throws BroadcastException { effectiveJobCount = recipientList.size(); @@ -846,15 +1027,36 @@ public abstract class Broadcast serviceThreadPool.add(serviceThread); } + initServiceThreadContexts(); + setState(BroadcastState.RUNNING); // Start the dispatcher threads - for (Thread thread : serviceThreadPool) + for (Service thread : serviceThreadPool) { thread.start(); } } + /** + * Derived class may set up environment before starting Service threads. + * @param serviceThreadNames + */ + protected void initServiceThreadContexts() + { + // Do nothing in base class + } + + /** + * Experimental - needed to go with the also experimental method exec. + * @param serviceThreadNames + */ + protected void initServiceThreadContexts(List serviceThreadNames) + { + // Do nothing in base class + } + + private boolean threadsShouldStop() { BroadcastState state = getState();