diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index d3646f4..ebf3a0b 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -45,19 +45,6 @@ public abstract class Broadcast String stateErrorText; public final long receiveTime; public long changeStateTime; - /** - * Count of jobs that are completed (excluding those that are - * being rescheduled). - */ - private int completedJobCount = 0; - /** - * Dynamically keeps count of the total number jobs scheduled - * in readyQueue. Initially it is set to be the size of the - * recipientList. Then as jobs are processed, and when one is - * to be repeated by re-adding it to the readyQueue, then this - * number is incremented by 1. - */ - private int effectiveJobCount = 0; protected String activityRecordIdParamName; private String jobReportRootNodeName; @@ -82,10 +69,11 @@ public abstract class Broadcast protected List serviceThreadPool; private Object resumeFlag; // Semaphore for dispatcher threads to resume. protected List recipientList; - private int jobReportsQueued; + private int completedJobCount; private ScheduledExecutorService scheduler; private int serviceThreadPoolSize; + private int jobsTotal; public static enum BroadcastState { @@ -394,7 +382,8 @@ public abstract class Broadcast this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName; this.jobReportRootNodeName = jobReportRootNodeName; - jobReportsQueued = 0; + postback = null; + completedJobCount = 0; sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT; readyQueue = new LinkedBlockingQueue(); serviceThreadPool = new ArrayList(); @@ -460,7 +449,10 @@ public abstract class Broadcast return; } initSync(commEngine.getResources()); - init(); + for (Recipient recipient : recipientList) + { + readyQueue.add(mkJob(recipient)); + } if (getState() == BroadcastState.COMPLETED) return; } catch (BroadcastException e) @@ -509,8 +501,7 @@ public abstract class Broadcast { initAsync(); - int jobsTotal = recipientList.size(); - effectiveJobCount = jobsTotal; + jobsTotal = recipientList.size(); postback = new Postback(this, commEngine.getPostbackMaxQueueSize(), commEngine.getPostbackSenderPoolSize(), @@ -529,7 +520,6 @@ public abstract class Broadcast serviceThreadPool.add(serviceThread); serviceThreadNames.add(threadName); } - //initServiceThreadContexts(serviceThreadNames); doBroadcast(); } @@ -589,18 +579,10 @@ public abstract class Broadcast { boolean isLegal; BroadcastState prev = null; - synchronized (this) - { - if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null); - List to = toStates.get(state); - isLegal = (to == null? false : to.contains(newState)); - prev = state; - if (isLegal) - { - state = newState; - changeStateTime = System.currentTimeMillis(); - } - } + if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null); + List to = toStates.get(state); + isLegal = (to == null? false : to.contains(newState)); + prev = state; if (isLegal) { this.haltReason = haltReason; @@ -609,9 +591,17 @@ public abstract class Broadcast CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state)); if (postback != null) { - postback.queueReport(mkStatusReport()); + synchronized(postback.postQueue) + { + postback.queueReport(mkStatusReport(newState)); + state = newState; + } } - + else + { + state = newState; + } + changeStateTime = System.currentTimeMillis(); return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); } else @@ -796,10 +786,21 @@ public abstract class Broadcast } /** - * Creates status report. + * Defaults to current state + * @return + */ + protected String mkStatusReport() + { + return mkStatusReport(state); + } + + /** + * Creates status report. Sometimes, we need to create the report before + * actually changing BroadcastState. + * @param state - BroadcastState for this report, which is not necessarily the same as the class attribute state. * @return status report in XML. */ - protected String mkStatusReport() + protected String mkStatusReport(BroadcastState state) { StringBuffer statusBf = new StringBuffer(); String topLevelTag = broadcastType; @@ -853,13 +854,12 @@ public abstract class Broadcast /** * - * @return number of active jobs, including those being - * rescheduled by a timer. - * Computed from effectiveJobCount, completedJobCount and readyQueue.size() + * @return number of active jobs + * computed from jobsTotal, jobReportsQueued and readyQueue.size() */ protected int getActiveJobCount() { - return effectiveJobCount - completedJobCount - readyQueue.size(); + return jobsTotal - completedJobCount - readyQueue.size(); } @@ -875,29 +875,6 @@ public abstract class Broadcast protected abstract void decode(HttpServletRequest request, boolean notInService) throws EngineException; - /** - * Remembers postBack, and - * Creates thread pool of size dictated by broadcast, which determines the size based - * on the chosen service provider. - * - * Overriding implementation must invoke this method at the end, and process information - * contained in the broadcast, in preparation for the invocation of the process - * method. - * - * If there is no error, the overriding implementation must return this base method. - * - * @param commEngine - * - * @throws BroadcastException - */ - protected final void init() - { - for (Recipient recipient : recipientList) - { - readyQueue.add(mkJob(recipient)); - } - } - protected abstract void initSync(EngineResources resources) throws BroadcastException; protected Job mkJob(Recipient recipient) @@ -1020,6 +997,7 @@ public abstract class Broadcast */ protected void close() { + postback.shutdownWhenDone(); postback = null; } @@ -1094,7 +1072,6 @@ public abstract class Broadcast */ protected void postJobStatus(Job job, long rescheduleTimeMS) { - //postJobStatus(job); logJobCount("Entering postJobStatus"); myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS); if (postback != null) @@ -1103,11 +1080,9 @@ public abstract class Broadcast report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); report.init(job); postback.queueReport(report.toString()); - jobReportsQueued++; } - //if (job.jobStatus.isTerminal()) - if (rescheduleTimeMS < 0 + if (rescheduleTimeMS <= 0 // No more rescheduling on cancel, expire, or pause || state == BroadcastState.CANCELING || state == BroadcastState.CANCELED @@ -1116,11 +1091,11 @@ public abstract class Broadcast || state == BroadcastState.PAUSING ) { - incrementCompletedCount(); + completedJobCount++; logJobCount("Completed a job"); //if (getRemainingJobCount() == 0) - if (jobReportsQueued == recipientList.size()) + if (completedJobCount == jobsTotal) { terminate(BroadcastState.COMPLETED); } @@ -1133,7 +1108,6 @@ public abstract class Broadcast else if (rescheduleTimeMS == 0) { addJob(job); - effectiveJobCount++; logJobCount("Added a job to queue"); } else if (rescheduleTimeMS > 0) @@ -1142,27 +1116,21 @@ public abstract class Broadcast } } - synchronized private void incrementCompletedCount() - { - completedJobCount++; - } - /** * Logs effectiveJobCount, completedJobCount, readyQueue.size(), - * active job count, and total which recipientList.size() + * active job count, and total. * Job statistics are collected by length of readyQueue, completedJobCount, * and effectiveJobCount. */ private void logJobCount(String title) { - myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, total jobs: %d, remaining %d, effectiveJobCount %d", + myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, total jobs: %d, remaining %d", title, completedJobCount, getActiveJobCount(), readyQueue.size(), - recipientList.size(), - getRemainingJobCount(), - effectiveJobCount + jobsTotal, + getRemainingJobCount() )); } @@ -1173,7 +1141,7 @@ public abstract class Broadcast */ private int getRemainingJobCount() { - return effectiveJobCount - completedJobCount; + return jobsTotal - completedJobCount; } public ScheduledFuture rescheduleJob(final Job job, long rescheduleTimeMS) diff --git a/src/main/java/altk/comm/engine/Postback.java b/src/main/java/altk/comm/engine/Postback.java new file mode 100644 index 0000000..7784c44 --- /dev/null +++ b/src/main/java/altk/comm/engine/Postback.java @@ -0,0 +1,469 @@ +package altk.comm.engine; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import javax.net.ssl.SSLContext; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.apache.http.StatusLine; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.BasicHttpContext; +import org.apache.http.protocol.HttpContext; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Logger; +import org.w3c.dom.Document; +import org.w3c.dom.Node; + +import altk.comm.engine.CommonLogger; + + +/** + * Queues JobReports to be posted back to attribute postBackURL. + * Multiple internal class Sender members consume this postQueue, sending items + * in postQueue to postBackURL. + * + * In the future, if postBackURL has problem, or if + * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing + * everything to backingFile. + * + * @author Kwong + * + */ +public class Postback +{ + private static final String XML_VERSION_1_0_ENCODING_UTF_8 = ""; + + private static final int QUEUE_WAIT = 300; // seconds + private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds + + private static final int RETRIES_DEFAULT = 3; + + private final String postBackURL; + private final String xmlTopElement; + final Queue postQueue; + private final int maxQueueSize; + private List senderPool; + private final String myName; + private int maxBatchSize; + + private PoolingHttpClientConnectionManager cm; + private int threadsWaitingToPost; + private TrustStrategy tustAllCerts; + SSLContext sslContext; + + // Easy ssl certificate verification. + SSLConnectionSocketFactory easyConnectionFactory; + CloseableHttpClient httpclient; + + private int maxRetries; + private Broadcast broadcast; + + private static Logger myLogger = Logger.getLogger(Postback.class); + + public enum PostbackStatus + { + SUCCESS, + SERVER_IO_ERROR, + IRRECOVERABLE_ERROR, + HTTP_STATUS_ERROR + } + + class Sender extends Thread + { + private boolean threadShouldStop; + + private Sender(String name) + { + setName(name); + start(); + } + + public void run() + { + threadShouldStop = false; + + myLogger.info(getName() + " started"); + + String report; + + for (;;) // Each iteration sends a batch + { + if (threadShouldStop) + { + myLogger.info(getName() + " terminating"); + System.out.println(getName() + " terminating"); + return; + } + myLogger.debug("Looking for reports"); + List reportList = null; + synchronized(postQueue) + { + // Each iteration examines the queue for a batch to send + for (;;) + { + reportList = new ArrayList(); + for (int i = 0; i < maxBatchSize ; i++) + { + report = postQueue.poll(); + if (report == null) break; + reportList.add(report); + } + if (reportList.size() > 0) + { + myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); + postQueue.notifyAll(); + break; // break out to do the work. + } + + // No reports + //if (jobReportsQueued == jobsTotal) + if (broadcast.getState().isFinal) + { + // No more. Notify all waiting postback threads and exit thread + myLogger.info("All done, thread terminating"); + postQueue.notifyAll(); + return; + } + // Nothing to do, so wait a while, and look at the + // queue again. + + try + { + myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); + postQueue.wait(QUEUE_WAIT * 1000); + } + catch (InterruptedException e) + { + CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e); + break; + } + CommonLogger.health.info("Surfacing from wait"); + System.out.println(getName() + " surfacing from wait"); + continue; + } + } // synchronized() + if (reportList != null && reportList.size() > 0) + { + switch (post(reportList)) + { + case IRRECOVERABLE_ERROR: + case SUCCESS: + break; + + case SERVER_IO_ERROR: + /* Should not requeue report for this may lead to dead lock on this queu. + // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log. + // Re-queue these reports + for (String rpt : reportList) + { + queueReport(rpt); + } + */ + // Sleep for a while before retrying this PostBack server. + CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); + try + { + Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000); + } + catch (InterruptedException e) + { + CommonLogger.alarm.warn("Caught while PostBack thread sleeps: " + e); + } + default: + } + } + } + } + + /** + * + * @param reportList + * @return SUCCESS, + * SERVER_IO_ERROR, when postback receiver has problem + * IRRECOVERABLE_ERROR + */ + private PostbackStatus post(List reportList) + { + StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8); + xml.append("<"); xml.append(xmlTopElement); xml.append(">"); + for (String report : reportList) + { + xml.append(report + "\r\n"); + } + xml.append(""); + + HttpPost httpPost = new HttpPost(postBackURL); + StringEntity requestEntity; + CloseableHttpResponse response = null; + byte[] xmlBytes = null; + try + { + requestEntity = (new StringEntity(xml.toString())); + httpPost.setEntity(requestEntity); + myLogger.debug("Posting to " + postBackURL + ": " + xml); + response = httpclient.execute(httpPost, new BasicHttpContext()); + StatusLine statusLine = response.getStatusLine(); + int statusCode = statusLine.getStatusCode(); + if (statusCode != 200) + { + CommonLogger.alarm.error("Got error status code " + statusCode + " while posting status to broadcast requester"); + return PostbackStatus.HTTP_STATUS_ERROR; + } + } + catch (UnsupportedEncodingException e) + { + CommonLogger.alarm.warn("While adding this application/xml content to PostBack: " + xml + " -- " + e); + return PostbackStatus.IRRECOVERABLE_ERROR; + } + catch (Exception e) + { + CommonLogger.alarm.error("While posting back to broadcast requester: " + e); + return PostbackStatus.IRRECOVERABLE_ERROR; + } + + + String xmlStr; + try + { + xmlBytes = EntityUtils.toByteArray(response.getEntity()); + xmlStr = new String(xmlBytes); + myLogger.debug("Received resposne: " + xmlStr); + } + catch (IOException e) + { + CommonLogger.alarm.error("While getting response from posting to broadcast requester: " + e); + return PostbackStatus.SERVER_IO_ERROR; + } + + Document xmlDoc = null; + try + { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = factory.newDocumentBuilder(); + xmlDoc = builder.parse(new ByteArrayInputStream(xmlBytes)); + } + catch (Exception e) + { + CommonLogger.alarm.warn("xml parse problem on received response from " + postBackURL + ": " + xmlStr); + return PostbackStatus.IRRECOVERABLE_ERROR; + } + if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement)) + { + CommonLogger.alarm.warn("xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + xmlStr); + return PostbackStatus.IRRECOVERABLE_ERROR; + } + XPath xpathEngine = XPathFactory.newInstance().newXPath(); + String xpath = null; + try + { + xpath = "@error"; + Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE); + if (errorNode != null) + { + String errorCode = errorNode.getNodeValue(); + xpath = "error_text"; + String errorText = (String)xpathEngine.evaluate(xpath, + xmlDoc.getDocumentElement(), XPathConstants.STRING); + CommonLogger.alarm.warn("Error response to <" + xmlTopElement + "> post back to " + + postBackURL + " -- error code=\"" + errorCode + "\", error text = \"" + + errorText + "\""); + return PostbackStatus.IRRECOVERABLE_ERROR; + } + } + catch (XPathExpressionException e) + { + CommonLogger.alarm.warn("Bad xpath: " + xpath); + return PostbackStatus.IRRECOVERABLE_ERROR; + } + catch (Exception e) + { + CommonLogger.alarm.warn("While decoding post back response from server: " + e); + return PostbackStatus.IRRECOVERABLE_ERROR; + } + myLogger.debug("returned from posting"); + return PostbackStatus.SUCCESS; + } + + public void terminate() + { + if (threadShouldStop) return; + + threadShouldStop = true; + + //Wait for at most 100 ms for thread to stop + interrupt(); + } + + } + + /** + * Constructs a pool of threads doing posting from a common job queue, + * to the supplied postBackURL. The top element of the XML that gets + * posted back has the give name. + * + * Requires these System properties: + * postback_max_queue_size + * postback_threadpool_size + * + * @param broadcast + * + * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is + * not supplied nor valid. + * @throws KeyStoreException + * @throws NoSuchAlgorithmException + * @throws KeyManagementException + */ + public Postback(Broadcast broadcast, + int maxQueueSize, int senderPoolSize, int maxBatchSize) + throws IllegalArgumentException, KeyManagementException, NoSuchAlgorithmException, KeyStoreException + { + this.broadcast = broadcast; + this.maxQueueSize = maxQueueSize; + this.maxBatchSize = maxBatchSize; + postBackURL = broadcast.getPostbackURL(); + xmlTopElement = broadcast.getBroadcastType() + "_status"; + + myName = broadcast.getBroadcastId() + "-postback-thread"; + postQueue = new LinkedList(); + threadsWaitingToPost = 0; + + // Build connection pool manager + tustAllCerts = new TrustStrategy() { public boolean isTrusted(X509Certificate[] chain, String authType) { return true; } }; + + sslContext = SSLContextBuilder.create().loadTrustMaterial(tustAllCerts).build(); + easyConnectionFactory = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier()); + + Registry socketFactoryRegistry = RegistryBuilder.create() + .register("http", PlainConnectionSocketFactory.getSocketFactory()) + .register("https", easyConnectionFactory) + .build(); + // Connection manager cm handles ssl certificate verification via the socket registry + cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry); + cm.setMaxTotal(senderPoolSize); + cm.setDefaultMaxPerRoute(senderPoolSize); + + // + // Retry handler + maxRetries = RETRIES_DEFAULT; + HttpRequestRetryHandler retryHandler = new HttpRequestRetryHandler() + { + public boolean retryRequest( + IOException exception, + int executionCount, + HttpContext context) + { + if (executionCount >= maxRetries) return false; + return true; + } + }; + + httpclient = HttpClientBuilder.create() + .setConnectionManager(cm) + .setRetryHandler(retryHandler) + .build(); + + senderPool = new ArrayList(); + for (int i = 0; i < senderPoolSize; i++) + { + Sender sender = new Sender(myName + '.' + i); + senderPool.add(sender); + } + } + + /** + * Queues report to postQueue only if the queue size has not reached the + * maxQueueSize. + * @param report + * @return true if report is added to queue, false otherwise (queue full) + */ + public boolean queueReport(String report) + { + // Log for recovery in case of problem in posting report. + CommonLogger.activity.info("Attempting to queue report"); + + synchronized(postQueue) + { + for (;;) + { + if (postQueue.size() < maxQueueSize) + { + myLogger.debug("Queing report" + report); + postQueue.add(report); + myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); + postQueue.notifyAll(); + return true; + } + else + { + myLogger.debug("Waiting for space - postQueue size: " + postQueue.size()); + try + { + threadsWaitingToPost++; + myLogger.debug("Threads waiting to post: " + threadsWaitingToPost); + postQueue.wait(QUEUE_WAIT * 1000); + } + catch (InterruptedException e) + { + break; + } + threadsWaitingToPost--; + } + } + } + myLogger.info("Interrupted while waiting for space to queue report"); + return false; + } + + + public void shutdownWhenDone() + { + + try + { + // Wait for all postback threads to terminate + for (Sender sender : senderPool) + { + sender.join(); + } + // Close postback connections. + httpclient.close(); + } + catch (Exception e) + { + myLogger.error("Caught when closing HttpClient: " + e.getMessage()); + } + + + broadcast = null; + } + +}