|
- package altk.comm.engine.postback;
-
- import java.io.ByteArrayInputStream;
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.security.KeyManagementException;
- import java.security.KeyStoreException;
- import java.security.NoSuchAlgorithmException;
- import java.security.cert.X509Certificate;
- import java.util.ArrayList;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Queue;
-
- import javax.net.ssl.SSLContext;
- import javax.xml.parsers.DocumentBuilder;
- import javax.xml.parsers.DocumentBuilderFactory;
- import javax.xml.xpath.XPath;
- import javax.xml.xpath.XPathConstants;
- import javax.xml.xpath.XPathExpressionException;
- import javax.xml.xpath.XPathFactory;
-
- import org.apache.http.StatusLine;
- import org.apache.http.client.HttpRequestRetryHandler;
- import org.apache.http.client.methods.CloseableHttpResponse;
- import org.apache.http.client.methods.HttpPost;
- import org.apache.http.conn.ssl.NoopHostnameVerifier;
- import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
- import org.apache.http.conn.ssl.TrustStrategy;
- import org.apache.http.entity.StringEntity;
- import org.apache.http.impl.client.CloseableHttpClient;
- import org.apache.http.impl.client.HttpClientBuilder;
- import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
- import org.apache.http.protocol.BasicHttpContext;
- import org.apache.http.protocol.HttpContext;
- import org.apache.http.ssl.SSLContextBuilder;
- import org.apache.http.util.EntityUtils;
- import org.apache.log4j.Logger;
- import org.w3c.dom.Document;
- import org.w3c.dom.Node;
-
- import altk.comm.engine.CommonLogger;
-
-
- /**
- * Queues JobReports to be posted back to attribute postBackURL.
- * Multiple internal class Sender members consume this postQueue, sending items
- * in postQueue to postBackURL.
- *
- * In the future, if postBackURL has problem, or if
- * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing
- * everything to backingFile.
- *
- * @author Kwong
- *
- */
- public class PostBack
- {
- private static final String XML_VERSION_1_0_ENCODING_UTF_8 = "<?xml version=\"1.0\" encoding=\"utf-8\"?>";
-
- private static final int QUEUE_WAIT = 300; // seconds
- private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds
-
- private static final int RETRIES_DEFAULT = 3;
-
- private final String postBackURL;
- private final String xmlTopElement;
- private Queue<String> postQueue;
- private final int maxQueueSize;
- private List<Sender> senderPool;
- private final String myName;
- private int maxBatchSize;
-
- private PoolingHttpClientConnectionManager cm;
- private int threadsWaitingToPost;
- private TrustStrategy tustAllCerts;
- SSLContext sslContext;
- SSLConnectionSocketFactory connectionFactory;
- CloseableHttpClient httpclient;
-
- private int maxRetries;
-
-
-
- private static Logger myLogger = Logger.getLogger(PostBack.class);
-
- public enum PostBackStatus
- {
- SUCCESS,
- SERVER_IO_ERROR,
- IRRECOVERABLE_ERROR,
- HTTP_STATUS_ERROR
- }
-
- class Sender extends Thread
- {
- private boolean threadShouldStop;
-
-
- private Sender(String name)
- {
- setName(name);
- start();
- }
-
- public void run()
- {
- threadShouldStop = false;
-
- myLogger.info(getName() + " started");
-
- String report;
-
- for (;;) // Each iteration sends a batch
- {
- if (threadShouldStop)
- {
- myLogger.info(getName() + " terminating");
- System.out.println(getName() + " terminating");
- return;
- }
- myLogger.debug("Looking for reports");
- List<String> reportList = null;
- synchronized(postQueue)
- {
- // Each iteration examines the queue for a batch to send
- for (;;)
- {
- reportList = new ArrayList<String>();
- for (int i = 0; i < maxBatchSize ; i++)
- {
- report = postQueue.poll();
- if (report == null) break;
- reportList.add(report);
- }
- if (reportList.size() > 0)
- {
- myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size()));
- postQueue.notifyAll();
- break; // break out to do the work.
- }
-
- // Nothing to do, so wait a while, and look at the
- // queue again.
- try
- {
- myLogger.debug("Going to wait " + QUEUE_WAIT * 1000);
- postQueue.wait(QUEUE_WAIT * 1000);
- }
- catch (InterruptedException e)
- {
- CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e);
- break;
- }
- CommonLogger.health.info("Surfacing from wait");
- System.out.println(getName() + " surfacing from wait");
- continue;
- }
- } // synchronized()
- if (reportList != null && reportList.size() > 0)
- {
- switch (post(reportList))
- {
- case IRRECOVERABLE_ERROR:
- case SUCCESS:
- break;
-
- case SERVER_IO_ERROR:
- /* Should not requeue report for this may lead to dead lock on this queu.
- // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log.
- // Re-queue these reports
- for (String rpt : reportList)
- {
- queueReport(rpt);
- }
- */
- // Sleep for a while before retrying this PostBack server.
- CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds");
- try
- {
- Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000);
- }
- catch (InterruptedException e)
- {
- CommonLogger.alarm.warn("Caught while PostBack thread sleeps: " + e);
- }
- default:
- }
- }
- }
- }
-
- /**
- *
- * @param reportList
- * @return SUCCESS,
- * SERVER_IO_ERROR, when postback receiver has problem
- * IRRECOVERABLE_ERROR
- */
- private PostBackStatus post(List<String> reportList)
- {
- StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8);
- xml.append("<"); xml.append(xmlTopElement); xml.append(">");
- for (String report : reportList)
- {
- xml.append(report + "\r\n");
- }
- xml.append("</"); xml.append(xmlTopElement); xml.append(">");
-
- HttpPost httpPost = new HttpPost(postBackURL);
- StringEntity requestEntity;
- CloseableHttpResponse response = null;
- byte[] xmlBytes = null;
- try
- {
- requestEntity = (new StringEntity(xml.toString()));
- httpPost.setEntity(requestEntity);
- myLogger.debug("Posting to " + postBackURL + ": " + xml);
- response = httpclient.execute(httpPost, new BasicHttpContext());
- StatusLine statusLine = response.getStatusLine();
- int statusCode = statusLine.getStatusCode();
- if (statusCode != 200)
- {
- CommonLogger.alarm.error("Got error status code " + statusCode + " while posting status to broadcast requester");
- return PostBackStatus.HTTP_STATUS_ERROR;
- }
- }
- catch (UnsupportedEncodingException e)
- {
- CommonLogger.alarm.warn("While adding this application/xml content to PostBack: " + xml + " -- " + e);
- return PostBackStatus.IRRECOVERABLE_ERROR;
- }
- catch (IOException e)
- {
- CommonLogger.alarm.error("While posting back to broadcast requester: " + e);
- return PostBackStatus.IRRECOVERABLE_ERROR;
- }
-
-
- String xmlStr;
- try
- {
- xmlBytes = EntityUtils.toByteArray(response.getEntity());
- xmlStr = new String(xmlBytes);
- myLogger.debug("Received resposne: " + xmlStr);
- }
- catch (IOException e)
- {
- CommonLogger.alarm.error("While getting response from posting to broadcast requester: " + e);
- return PostBackStatus.SERVER_IO_ERROR;
- }
-
- Document xmlDoc = null;
- try
- {
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- DocumentBuilder builder = factory.newDocumentBuilder();
- xmlDoc = builder.parse(new ByteArrayInputStream(xmlBytes));
- }
- catch (Exception e)
- {
- CommonLogger.alarm.warn("xml parse problem on received response from " + postBackURL + ": " + xmlStr);
- return PostBackStatus.IRRECOVERABLE_ERROR;
- }
- if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement))
- {
- CommonLogger.alarm.warn("xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + xmlStr);
- return PostBackStatus.IRRECOVERABLE_ERROR;
- }
- XPath xpathEngine = XPathFactory.newInstance().newXPath();
- String xpath = null;
- try
- {
- xpath = "@error";
- Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE);
- if (errorNode != null)
- {
- String errorCode = errorNode.getNodeValue();
- xpath = "error_text";
- String errorText = (String)xpathEngine.evaluate(xpath,
- xmlDoc.getDocumentElement(), XPathConstants.STRING);
- CommonLogger.alarm.warn("Error response to <" + xmlTopElement + "> post back to "
- + postBackURL + " -- error code=\"" + errorCode + "\", error text = \""
- + errorText + "\"");
- return PostBackStatus.IRRECOVERABLE_ERROR;
- }
- }
- catch (XPathExpressionException e)
- {
- CommonLogger.alarm.warn("Bad xpath: " + xpath);
- return PostBackStatus.IRRECOVERABLE_ERROR;
- }
- catch (Exception e)
- {
- CommonLogger.alarm.warn("While decoding post back response from server: " + e);
- return PostBackStatus.IRRECOVERABLE_ERROR;
- }
- myLogger.debug("returned from posting");
- return PostBackStatus.SUCCESS;
- }
-
- public void terminate()
- {
- if (threadShouldStop) return;
-
- threadShouldStop = true;
-
- //Wait for at most 100 ms for thread to stop
- interrupt();
- }
-
- }
-
- /**
- * Constructs a pool of threads doing posting from a common job queue,
- * to the supplied postBackURL. The top element of the XML that gets
- * posted back has the give name.
- *
- * Requires these System properties:
- * postback_max_queue_size
- * postback_threadpool_size
- *
- * @param postBackURL
- * @param xmlTopElementName
- * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is
- * not supplied nor valid.
- * @throws KeyStoreException
- * @throws NoSuchAlgorithmException
- * @throws KeyManagementException
- */
- public PostBack(String postBackURL, String xmlTopElementName,
- int maxQueueSize, int senderPoolSize, int maxBatchSize)
- throws IllegalArgumentException, KeyManagementException, NoSuchAlgorithmException, KeyStoreException
- {
- if (postBackURL == null || postBackURL.length() == 0)
- {
- throw new IllegalArgumentException("PostBack class given null postBackURL");
- }
- myName = "Postback-" + postBackURL;
- if (xmlTopElementName == null || xmlTopElementName.length() == 0)
- {
- throw new IllegalArgumentException(myName + ": PostBack class given null xmlTopElement");
- }
- this.postBackURL = postBackURL;
- this.xmlTopElement = xmlTopElementName;
- this.maxQueueSize = maxQueueSize;
- this.maxBatchSize = maxBatchSize;
- postQueue = new LinkedList<String>();
- threadsWaitingToPost = 0;
- cm = new PoolingHttpClientConnectionManager();
- cm.setMaxTotal(senderPoolSize);
- cm.setDefaultMaxPerRoute(senderPoolSize);
-
- tustAllCerts = new TrustStrategy() { public boolean isTrusted(X509Certificate[] chain, String authType) { return true; } };
-
- sslContext = SSLContextBuilder.create().loadTrustMaterial(tustAllCerts).build();
- connectionFactory = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier());
-
- //
- // Retry handler
- maxRetries = RETRIES_DEFAULT;
- HttpRequestRetryHandler retryHandler = new HttpRequestRetryHandler()
- {
- public boolean retryRequest(
- IOException exception,
- int executionCount,
- HttpContext context)
- {
- if (executionCount >= maxRetries) return false;
- return true;
- }
- };
-
- httpclient = HttpClientBuilder.create()
- // .setConnectionManager(cm)
- // .setRetryHandler(retryHandler)
- .setSSLSocketFactory(connectionFactory)
- .build();
-
- senderPool = new ArrayList<Sender>();
- for (int i = 0; i < senderPoolSize; i++)
- {
- Sender sender = new Sender(myName + '-' + i);
- senderPool.add(sender);
- }
- }
-
- /**
- * Queues report to postQueue only if the queue size has not reached the
- * maxQueueSize.
- * @param report
- * @return true if report is added to queue, false otherwise (queue full)
- */
- public boolean queueReport(String report)
- {
- // Log for recovery in case of problem in posting report.
- CommonLogger.activity.info("Attempting to queue report");
-
- synchronized(postQueue)
- {
- for (;;)
- {
- if (postQueue.size() < maxQueueSize)
- {
- myLogger.debug("Queing report" + report);
- postQueue.add(report);
- myLogger.debug("Added 1 report - postQueue size: " + postQueue.size());
- postQueue.notifyAll();
- return true;
- }
- else
- {
- myLogger.debug("Waiting for space - postQueue size: " + postQueue.size());
- try
- {
- threadsWaitingToPost++;
- myLogger.debug("Threads waiting to post: " + threadsWaitingToPost);
- postQueue.wait(QUEUE_WAIT * 1000);
- }
- catch (InterruptedException e)
- {
- break;
- }
- threadsWaitingToPost--;
- }
- }
- }
- myLogger.info("Interrupted while waiting for space to queue report");
- return false;
- }
-
-
- public void terminate()
- {
- try
- {
- httpclient.close();
- }
- catch (IOException e)
- {
- myLogger.error("Caught when closing HttpClient: " + e.getMessage());
- }
-
- // Terminate postback threads
- for (Sender sender : senderPool)
- {
- sender.terminate();
- }
-
- }
-
- }
|