Переглянути джерело

Use completedJobCount and totalJob to help keeping track of job counts in

various states.  Delay closing HttpClient until all postback threads have
exited.
tags/CommEngine-0.0.3
ymlam 6 роки тому
джерело
коміт
78992579de
2 змінених файлів з 517 додано та 80 видалено
  1. +48
    -80
      src/main/java/altk/comm/engine/Broadcast.java
  2. +469
    -0
      src/main/java/altk/comm/engine/Postback.java

+ 48
- 80
src/main/java/altk/comm/engine/Broadcast.java Переглянути файл

@@ -45,19 +45,6 @@ public abstract class Broadcast
String stateErrorText;
public final long receiveTime;
public long changeStateTime;
/**
* Count of jobs that are completed (excluding those that are
* being rescheduled).
*/
private int completedJobCount = 0;
/**
* Dynamically keeps count of the total number jobs scheduled
* in readyQueue. Initially it is set to be the size of the
* recipientList. Then as jobs are processed, and when one is
* to be repeated by re-adding it to the readyQueue, then this
* number is incremented by 1.
*/
private int effectiveJobCount = 0;
protected String activityRecordIdParamName;
private String jobReportRootNodeName;

@@ -82,10 +69,11 @@ public abstract class Broadcast
protected List<Service> serviceThreadPool;
private Object resumeFlag; // Semaphore for dispatcher threads to resume.
protected List<Recipient> recipientList;
private int jobReportsQueued;
private int completedJobCount;
private ScheduledExecutorService scheduler;
private int serviceThreadPoolSize;
private int jobsTotal;

public static enum BroadcastState
{
@@ -394,7 +382,8 @@ public abstract class Broadcast
this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName;
this.jobReportRootNodeName = jobReportRootNodeName;
jobReportsQueued = 0;
postback = null;
completedJobCount = 0;
sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT;
readyQueue = new LinkedBlockingQueue<Job>();
serviceThreadPool = new ArrayList<Service>();
@@ -460,7 +449,10 @@ public abstract class Broadcast
return;
}
initSync(commEngine.getResources());
init();
for (Recipient recipient : recipientList)
{
readyQueue.add(mkJob(recipient));
}
if (getState() == BroadcastState.COMPLETED) return;
}
catch (BroadcastException e)
@@ -509,8 +501,7 @@ public abstract class Broadcast
{
initAsync();
int jobsTotal = recipientList.size();
effectiveJobCount = jobsTotal;
jobsTotal = recipientList.size();
postback = new Postback(this,
commEngine.getPostbackMaxQueueSize(),
commEngine.getPostbackSenderPoolSize(),
@@ -529,7 +520,6 @@ public abstract class Broadcast
serviceThreadPool.add(serviceThread);
serviceThreadNames.add(threadName);
}
//initServiceThreadContexts(serviceThreadNames);

doBroadcast();
}
@@ -589,18 +579,10 @@ public abstract class Broadcast
{
boolean isLegal;
BroadcastState prev = null;
synchronized (this)
{
if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null);
List<BroadcastState> to = toStates.get(state);
isLegal = (to == null? false : to.contains(newState));
prev = state;
if (isLegal)
{
state = newState;
changeStateTime = System.currentTimeMillis();
}
}
if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null);
List<BroadcastState> to = toStates.get(state);
isLegal = (to == null? false : to.contains(newState));
prev = state;
if (isLegal)
{
this.haltReason = haltReason;
@@ -609,9 +591,17 @@ public abstract class Broadcast
CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state));
if (postback != null)
{
postback.queueReport(mkStatusReport());
synchronized(postback.postQueue)
{
postback.queueReport(mkStatusReport(newState));
state = newState;
}
}

else
{
state = newState;
}
changeStateTime = System.currentTimeMillis();
return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev);
}
else
@@ -796,10 +786,21 @@ public abstract class Broadcast
}
/**
* Creates status report.
* Defaults to current state
* @return
*/
protected String mkStatusReport()
{
return mkStatusReport(state);
}
/**
* Creates status report. Sometimes, we need to create the report before
* actually changing BroadcastState.
* @param state - BroadcastState for this report, which is not necessarily the same as the class attribute state.
* @return status report in XML.
*/
protected String mkStatusReport()
protected String mkStatusReport(BroadcastState state)
{
StringBuffer statusBf = new StringBuffer();
String topLevelTag = broadcastType;
@@ -853,13 +854,12 @@ public abstract class Broadcast

/**
*
* @return number of active jobs, including those being
* rescheduled by a timer.
* Computed from effectiveJobCount, completedJobCount and readyQueue.size()
* @return number of active jobs
* computed from jobsTotal, jobReportsQueued and readyQueue.size()
*/
protected int getActiveJobCount()
{
return effectiveJobCount - completedJobCount - readyQueue.size();
return jobsTotal - completedJobCount - readyQueue.size();
}
@@ -875,29 +875,6 @@ public abstract class Broadcast
protected abstract void decode(HttpServletRequest request, boolean notInService)
throws EngineException;

/**
* Remembers postBack, and
* Creates thread pool of size dictated by broadcast, which determines the size based
* on the chosen service provider.
*
* Overriding implementation must invoke this method at the end, and process information
* contained in the broadcast, in preparation for the invocation of the process
* method.
*
* If there is no error, the overriding implementation must return this base method.
*
* @param commEngine
*
* @throws BroadcastException
*/
protected final void init()
{
for (Recipient recipient : recipientList)
{
readyQueue.add(mkJob(recipient));
}
}
protected abstract void initSync(EngineResources resources) throws BroadcastException;
protected Job mkJob(Recipient recipient)
@@ -1020,6 +997,7 @@ public abstract class Broadcast
*/
protected void close()
{
postback.shutdownWhenDone();
postback = null;
}
@@ -1094,7 +1072,6 @@ public abstract class Broadcast
*/
protected void postJobStatus(Job job, long rescheduleTimeMS)
{
//postJobStatus(job);
logJobCount("Entering postJobStatus");
myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS);
if (postback != null)
@@ -1103,11 +1080,9 @@ public abstract class Broadcast
report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName);
report.init(job);
postback.queueReport(report.toString());
jobReportsQueued++;
}
//if (job.jobStatus.isTerminal())
if (rescheduleTimeMS < 0
if (rescheduleTimeMS <= 0
// No more rescheduling on cancel, expire, or pause
|| state == BroadcastState.CANCELING
|| state == BroadcastState.CANCELED
@@ -1116,11 +1091,11 @@ public abstract class Broadcast
|| state == BroadcastState.PAUSING
)
{
incrementCompletedCount();
completedJobCount++;
logJobCount("Completed a job");
//if (getRemainingJobCount() == 0)
if (jobReportsQueued == recipientList.size())
if (completedJobCount == jobsTotal)
{
terminate(BroadcastState.COMPLETED);
}
@@ -1133,7 +1108,6 @@ public abstract class Broadcast
else if (rescheduleTimeMS == 0)
{
addJob(job);
effectiveJobCount++;
logJobCount("Added a job to queue");
}
else if (rescheduleTimeMS > 0)
@@ -1142,27 +1116,21 @@ public abstract class Broadcast
}
}
synchronized private void incrementCompletedCount()
{
completedJobCount++;
}
/**
* Logs effectiveJobCount, completedJobCount, readyQueue.size(),
* active job count, and total which recipientList.size()
* active job count, and total.
* Job statistics are collected by length of readyQueue, completedJobCount,
* and effectiveJobCount.
*/
private void logJobCount(String title)
{
myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, total jobs: %d, remaining %d, effectiveJobCount %d",
myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, total jobs: %d, remaining %d",
title,
completedJobCount,
getActiveJobCount(),
readyQueue.size(),
recipientList.size(),
getRemainingJobCount(),
effectiveJobCount
jobsTotal,
getRemainingJobCount()
));
}

@@ -1173,7 +1141,7 @@ public abstract class Broadcast
*/
private int getRemainingJobCount()
{
return effectiveJobCount - completedJobCount;
return jobsTotal - completedJobCount;
}

public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)


+ 469
- 0
src/main/java/altk/comm/engine/Postback.java Переглянути файл

@@ -0,0 +1,469 @@
package altk.comm.engine;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

import javax.net.ssl.SSLContext;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;

import org.apache.http.StatusLine;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Node;

import altk.comm.engine.CommonLogger;


/**
* Queues JobReports to be posted back to attribute postBackURL.
* Multiple internal class Sender members consume this postQueue, sending items
* in postQueue to postBackURL.
*
* In the future, if postBackURL has problem, or if
* length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing
* everything to backingFile.
*
* @author Kwong
*
*/
public class Postback
{
private static final String XML_VERSION_1_0_ENCODING_UTF_8 = "<?xml version=\"1.0\" encoding=\"utf-8\"?>";
private static final int QUEUE_WAIT = 300; // seconds
private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds

private static final int RETRIES_DEFAULT = 3;
private final String postBackURL;
private final String xmlTopElement;
final Queue<String> postQueue;
private final int maxQueueSize;
private List<Sender> senderPool;
private final String myName;
private int maxBatchSize;

private PoolingHttpClientConnectionManager cm;
private int threadsWaitingToPost;
private TrustStrategy tustAllCerts;
SSLContext sslContext;
// Easy ssl certificate verification.
SSLConnectionSocketFactory easyConnectionFactory;
CloseableHttpClient httpclient;

private int maxRetries;
private Broadcast broadcast;

private static Logger myLogger = Logger.getLogger(Postback.class);

public enum PostbackStatus
{
SUCCESS,
SERVER_IO_ERROR,
IRRECOVERABLE_ERROR,
HTTP_STATUS_ERROR
}
class Sender extends Thread
{
private boolean threadShouldStop;

private Sender(String name)
{
setName(name);
start();
}
public void run()
{
threadShouldStop = false;
myLogger.info(getName() + " started");
String report;
for (;;) // Each iteration sends a batch
{
if (threadShouldStop)
{
myLogger.info(getName() + " terminating");
System.out.println(getName() + " terminating");
return;
}
myLogger.debug("Looking for reports");
List<String> reportList = null;
synchronized(postQueue)
{
// Each iteration examines the queue for a batch to send
for (;;)
{
reportList = new ArrayList<String>();
for (int i = 0; i < maxBatchSize ; i++)
{
report = postQueue.poll();
if (report == null) break;
reportList.add(report);
}
if (reportList.size() > 0)
{
myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size()));
postQueue.notifyAll();
break; // break out to do the work.
}
// No reports
//if (jobReportsQueued == jobsTotal)
if (broadcast.getState().isFinal)
{
// No more. Notify all waiting postback threads and exit thread
myLogger.info("All done, thread terminating");
postQueue.notifyAll();
return;
}
// Nothing to do, so wait a while, and look at the
// queue again.

try
{
myLogger.debug("Going to wait " + QUEUE_WAIT * 1000);
postQueue.wait(QUEUE_WAIT * 1000);
}
catch (InterruptedException e)
{
CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e);
break;
}
CommonLogger.health.info("Surfacing from wait");
System.out.println(getName() + " surfacing from wait");
continue;
}
} // synchronized()
if (reportList != null && reportList.size() > 0)
{
switch (post(reportList))
{
case IRRECOVERABLE_ERROR:
case SUCCESS:
break;
case SERVER_IO_ERROR:
/* Should not requeue report for this may lead to dead lock on this queu.
// TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log.
// Re-queue these reports
for (String rpt : reportList)
{
queueReport(rpt);
}
*/
// Sleep for a while before retrying this PostBack server.
CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds");
try
{
Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000);
}
catch (InterruptedException e)
{
CommonLogger.alarm.warn("Caught while PostBack thread sleeps: " + e);
}
default:
}
}
}
}
/**
*
* @param reportList
* @return SUCCESS,
* SERVER_IO_ERROR, when postback receiver has problem
* IRRECOVERABLE_ERROR
*/
private PostbackStatus post(List<String> reportList)
{
StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8);
xml.append("<"); xml.append(xmlTopElement); xml.append(">");
for (String report : reportList)
{
xml.append(report + "\r\n");
}
xml.append("</"); xml.append(xmlTopElement); xml.append(">");
HttpPost httpPost = new HttpPost(postBackURL);
StringEntity requestEntity;
CloseableHttpResponse response = null;
byte[] xmlBytes = null;
try
{
requestEntity = (new StringEntity(xml.toString()));
httpPost.setEntity(requestEntity);
myLogger.debug("Posting to " + postBackURL + ": " + xml);
response = httpclient.execute(httpPost, new BasicHttpContext());
StatusLine statusLine = response.getStatusLine();
int statusCode = statusLine.getStatusCode();
if (statusCode != 200)
{
CommonLogger.alarm.error("Got error status code " + statusCode + " while posting status to broadcast requester");
return PostbackStatus.HTTP_STATUS_ERROR;
}
}
catch (UnsupportedEncodingException e)
{
CommonLogger.alarm.warn("While adding this application/xml content to PostBack: " + xml + " -- " + e);
return PostbackStatus.IRRECOVERABLE_ERROR;
}
catch (Exception e)
{
CommonLogger.alarm.error("While posting back to broadcast requester: " + e);
return PostbackStatus.IRRECOVERABLE_ERROR;
}

String xmlStr;
try
{
xmlBytes = EntityUtils.toByteArray(response.getEntity());
xmlStr = new String(xmlBytes);
myLogger.debug("Received resposne: " + xmlStr);
}
catch (IOException e)
{
CommonLogger.alarm.error("While getting response from posting to broadcast requester: " + e);
return PostbackStatus.SERVER_IO_ERROR;
}

Document xmlDoc = null;
try
{
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
xmlDoc = builder.parse(new ByteArrayInputStream(xmlBytes));
}
catch (Exception e)
{
CommonLogger.alarm.warn("xml parse problem on received response from " + postBackURL + ": " + xmlStr);
return PostbackStatus.IRRECOVERABLE_ERROR;
}
if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement))
{
CommonLogger.alarm.warn("xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + xmlStr);
return PostbackStatus.IRRECOVERABLE_ERROR;
}
XPath xpathEngine = XPathFactory.newInstance().newXPath();
String xpath = null;
try
{
xpath = "@error";
Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE);
if (errorNode != null)
{
String errorCode = errorNode.getNodeValue();
xpath = "error_text";
String errorText = (String)xpathEngine.evaluate(xpath,
xmlDoc.getDocumentElement(), XPathConstants.STRING);
CommonLogger.alarm.warn("Error response to <" + xmlTopElement + "> post back to "
+ postBackURL + " -- error code=\"" + errorCode + "\", error text = \""
+ errorText + "\"");
return PostbackStatus.IRRECOVERABLE_ERROR;
}
}
catch (XPathExpressionException e)
{
CommonLogger.alarm.warn("Bad xpath: " + xpath);
return PostbackStatus.IRRECOVERABLE_ERROR;
}
catch (Exception e)
{
CommonLogger.alarm.warn("While decoding post back response from server: " + e);
return PostbackStatus.IRRECOVERABLE_ERROR;
}
myLogger.debug("returned from posting");
return PostbackStatus.SUCCESS;
}

public void terminate()
{
if (threadShouldStop) return;
threadShouldStop = true;
//Wait for at most 100 ms for thread to stop
interrupt();
}
}
/**
* Constructs a pool of threads doing posting from a common job queue,
* to the supplied postBackURL. The top element of the XML that gets
* posted back has the give name.
*
* Requires these System properties:
* postback_max_queue_size
* postback_threadpool_size
*
* @param broadcast
*
* @throws IllegalArgumentException if either postBackURL or xmlTopElementName is
* not supplied nor valid.
* @throws KeyStoreException
* @throws NoSuchAlgorithmException
* @throws KeyManagementException
*/
public Postback(Broadcast broadcast,
int maxQueueSize, int senderPoolSize, int maxBatchSize)
throws IllegalArgumentException, KeyManagementException, NoSuchAlgorithmException, KeyStoreException
{
this.broadcast = broadcast;
this.maxQueueSize = maxQueueSize;
this.maxBatchSize = maxBatchSize;
postBackURL = broadcast.getPostbackURL();
xmlTopElement = broadcast.getBroadcastType() + "_status";
myName = broadcast.getBroadcastId() + "-postback-thread";
postQueue = new LinkedList<String>();
threadsWaitingToPost = 0;

// Build connection pool manager
tustAllCerts = new TrustStrategy() { public boolean isTrusted(X509Certificate[] chain, String authType) { return true; } };
sslContext = SSLContextBuilder.create().loadTrustMaterial(tustAllCerts).build();
easyConnectionFactory = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier());
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", easyConnectionFactory)
.build();
// Connection manager cm handles ssl certificate verification via the socket registry
cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
cm.setMaxTotal(senderPoolSize);
cm.setDefaultMaxPerRoute(senderPoolSize);

//
// Retry handler
maxRetries = RETRIES_DEFAULT;
HttpRequestRetryHandler retryHandler = new HttpRequestRetryHandler()
{
public boolean retryRequest(
IOException exception,
int executionCount,
HttpContext context)
{
if (executionCount >= maxRetries) return false;
return true;
}
};

httpclient = HttpClientBuilder.create()
.setConnectionManager(cm)
.setRetryHandler(retryHandler)
.build();

senderPool = new ArrayList<Sender>();
for (int i = 0; i < senderPoolSize; i++)
{
Sender sender = new Sender(myName + '.' + i);
senderPool.add(sender);
}
}
/**
* Queues report to postQueue only if the queue size has not reached the
* maxQueueSize.
* @param report
* @return true if report is added to queue, false otherwise (queue full)
*/
public boolean queueReport(String report)
{
// Log for recovery in case of problem in posting report.
CommonLogger.activity.info("Attempting to queue report");
synchronized(postQueue)
{
for (;;)
{
if (postQueue.size() < maxQueueSize)
{
myLogger.debug("Queing report" + report);
postQueue.add(report);
myLogger.debug("Added 1 report - postQueue size: " + postQueue.size());
postQueue.notifyAll();
return true;
}
else
{
myLogger.debug("Waiting for space - postQueue size: " + postQueue.size());
try
{
threadsWaitingToPost++;
myLogger.debug("Threads waiting to post: " + threadsWaitingToPost);
postQueue.wait(QUEUE_WAIT * 1000);
}
catch (InterruptedException e)
{
break;
}
threadsWaitingToPost--;
}
}
}
myLogger.info("Interrupted while waiting for space to queue report");
return false;
}

public void shutdownWhenDone()
{

try
{
// Wait for all postback threads to terminate
for (Sender sender : senderPool)
{
sender.join();
}
// Close postback connections.
httpclient.close();
}
catch (Exception e)
{
myLogger.error("Caught when closing HttpClient: " + e.getMessage());
}

broadcast = null;
}
}

Завантаження…
Відмінити
Зберегти