package altk.comm.engine; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import javax.net.ssl.SSLContext; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.xpath.XPath; import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; import org.apache.http.StatusLine; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.HttpContext; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; import org.w3c.dom.Document; import org.w3c.dom.Node; import altk.comm.engine.Broadcast.PostbackThreadActionOnEmpty; /** * Queues JobReports to be posted back to attribute postBackURL. * Multiple internal class Sender members consume this postQueue, sending items * in postQueue to postBackURL. * * In the future, if postBackURL has problem, or if * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing * everything to backingFile. * * @author Kwong * */ public class Postback { private static final String XML_VERSION_1_0_ENCODING_UTF_8 = ""; private static final int QUEUE_WAIT = 300; // seconds private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds private static final int RETRIES_DEFAULT = 3; private final String postBackURL; private final String xmlTopElement; final LinkedList postQueue; private final int maxQueueSize; private List senderPool; private final String myName; private int maxBatchSize; private PoolingHttpClientConnectionManager cm; private int threadsWaitingToPost; private TrustStrategy tustAllCerts; SSLContext sslContext; // Easy ssl certificate verification. SSLConnectionSocketFactory easyConnectionFactory; CloseableHttpClient httpclient; private int maxRetries; private Broadcast broadcast; protected int postedTransactions; private static Logger myLogger = Logger.getLogger(Postback.class); public enum PostbackStatus { SUCCESS, SERVER_IO_ERROR, IRRECOVERABLE_ERROR, HTTP_STATUS_ERROR } class Sender extends Thread { private Sender(String name) { setName(name); start(); } public void run() { myLogger.info(getName() + " started"); String report; for (;;) // Each iteration sends a batch { myLogger.debug("Looking for reports"); List reportList = new ArrayList(); synchronized(postQueue) { for (int i = 0; i < maxBatchSize ; i++) { report = postQueue.poll(); if (report == null) break; reportList.add(report); } // If space in que is generated, wake up all service queues waiting for space if (reportList.size() > 0 && threadsWaitingToPost > 0) postQueue.notifyAll(); } if (reportList.size() > 0) { switch (post(reportList)) { case IRRECOVERABLE_ERROR: case SUCCESS: break; case SERVER_IO_ERROR: // Sleep for a while before retrying this PostBack server. CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); try { Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000); } catch (InterruptedException e) { CommonLogger.alarm.warn("Caught while PostBack thread sleeps: " + e); } default: } incrementPostedTransactions(reportList.size()); } else { // empty post queue PostbackThreadActionOnEmpty postbackThreadActionOnEmpty = broadcast.getPostbackThreadActionOnEmpty(); myLogger.debug("Action on empty queue: " + postbackThreadActionOnEmpty); switch (postbackThreadActionOnEmpty) { case STOP: myLogger.info(getName() + " terminating"); System.out.println(getName() + " terminating"); return; case WAIT: synchronized (postQueue) { try { postQueue.wait(); } catch (InterruptedException e) { // Do nothing } } break; case CONTINUE: default: } } } } /** * * @param reportList * @return SUCCESS, * SERVER_IO_ERROR, when postback receiver has problem * IRRECOVERABLE_ERROR */ private PostbackStatus post(List reportList) { StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8); xml.append("<" + xmlTopElement + " reports=\"" + reportList.size() + "\">\r\n"); for (String report : reportList) { xml.append(report + "\r\n"); } xml.append(""); HttpPost httpPost = new HttpPost(postBackURL); StringEntity requestEntity; CloseableHttpResponse response = null; byte[] xmlBytes = null; try { requestEntity = (new StringEntity(xml.toString())); httpPost.setEntity(requestEntity); myLogger.debug("Posting to " + postBackURL + ": " + xml); response = httpclient.execute(httpPost, new BasicHttpContext()); incrementPostedTransactions(reportList.size()); StatusLine statusLine = response.getStatusLine(); int statusCode = statusLine.getStatusCode(); if (statusCode != 200) { CommonLogger.alarm.error("Got error status code " + statusCode + " while posting status to broadcast requester"); return PostbackStatus.HTTP_STATUS_ERROR; } } catch (UnsupportedEncodingException e) { CommonLogger.alarm.warn("While adding this application/xml content to PostBack: " + xml + " -- " + e); return PostbackStatus.IRRECOVERABLE_ERROR; } catch (Exception e) { CommonLogger.alarm.error("While posting back to broadcast requester: " + e); return PostbackStatus.IRRECOVERABLE_ERROR; } String xmlStr; try { xmlBytes = EntityUtils.toByteArray(response.getEntity()); xmlStr = new String(xmlBytes); myLogger.debug("Received resposne: " + xmlStr); } catch (IOException e) { CommonLogger.alarm.error("While getting response from posting to broadcast requester: " + e); return PostbackStatus.SERVER_IO_ERROR; } Document xmlDoc = null; try { DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder = factory.newDocumentBuilder(); xmlDoc = builder.parse(new ByteArrayInputStream(xmlBytes)); } catch (Exception e) { CommonLogger.alarm.warn("xml parse problem on received response from " + postBackURL + ": " + xmlStr); return PostbackStatus.IRRECOVERABLE_ERROR; } if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement)) { CommonLogger.alarm.warn("xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + xmlStr); return PostbackStatus.IRRECOVERABLE_ERROR; } XPath xpathEngine = XPathFactory.newInstance().newXPath(); String xpath = null; try { xpath = "@error"; Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE); if (errorNode != null) { String errorCode = errorNode.getNodeValue(); xpath = "error_text"; String errorText = (String)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.STRING); CommonLogger.alarm.warn("Error response to <" + xmlTopElement + "> post back to " + postBackURL + " -- error code=\"" + errorCode + "\", error text = \"" + errorText + "\""); return PostbackStatus.IRRECOVERABLE_ERROR; } } catch (XPathExpressionException e) { CommonLogger.alarm.warn("Bad xpath: " + xpath); return PostbackStatus.IRRECOVERABLE_ERROR; } catch (Exception e) { CommonLogger.alarm.warn("While decoding post back response from server: " + e); return PostbackStatus.IRRECOVERABLE_ERROR; } return PostbackStatus.SUCCESS; } } /** * Constructs a pool of threads doing posting from a common job queue, * to the supplied postBackURL. The top element of the XML that gets * posted back has the give name. * * Requires these System properties: * postback_max_queue_size * postback_threadpool_size * * @param broadcast * * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is * not supplied nor valid. * @throws KeyStoreException * @throws NoSuchAlgorithmException * @throws KeyManagementException */ public Postback(Broadcast broadcast, int maxQueueSize, int senderPoolSize, int maxBatchSize) throws IllegalArgumentException, KeyManagementException, NoSuchAlgorithmException, KeyStoreException { this.broadcast = broadcast; this.maxQueueSize = maxQueueSize; this.maxBatchSize = maxBatchSize; postBackURL = broadcast.getPostbackURL(); xmlTopElement = broadcast.getBroadcastType() + "_status"; myName = broadcast.getBroadcastId() + "-postback-thread"; postQueue = new LinkedList(); threadsWaitingToPost = 0; // Build connection pool manager tustAllCerts = new TrustStrategy() { public boolean isTrusted(X509Certificate[] chain, String authType) { return true; } }; sslContext = SSLContextBuilder.create().loadTrustMaterial(tustAllCerts).build(); easyConnectionFactory = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier()); Registry socketFactoryRegistry = RegistryBuilder.create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", easyConnectionFactory) .build(); // Connection manager cm handles ssl certificate verification via the socket registry cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry); cm.setMaxTotal(senderPoolSize); cm.setDefaultMaxPerRoute(senderPoolSize); // // Retry handler maxRetries = RETRIES_DEFAULT; HttpRequestRetryHandler retryHandler = new HttpRequestRetryHandler() { public boolean retryRequest( IOException exception, int executionCount, HttpContext context) { if (executionCount >= maxRetries) return false; return true; } }; httpclient = HttpClientBuilder.create() .setConnectionManager(cm) .setRetryHandler(retryHandler) .build(); senderPool = new ArrayList(); for (int i = 0; i < senderPoolSize; i++) { Sender sender = new Sender(myName + '.' + i); senderPool.add(sender); } } public void incrementPostedTransactions(int size) { postedTransactions += size; } /** * Puts report at the head of post queue, disregarding size limit. * This is suitable for posting broadcast status ahead of * a possibly long line of transaction status. * @param report * @return */ public void queueReportFirst(String report) { synchronized(postQueue) { myLogger.debug("Queueing report " + report); postQueue.offerFirst(report); postQueue.notify(); } } /** * Queues report to postQueue only if the queue size has not reached the * maxQueueSize. * @param report * @return true if report is added to queue, false otherwise (queue full) */ public boolean queueReport(String report) { synchronized(postQueue) { for (;;) { if (postQueue.size() < maxQueueSize) { myLogger.debug("Queueing report " + report); postQueue.add(report); postQueue.notify(); return true; } else { try { threadsWaitingToPost++; myLogger.debug(String.format("Waiting for space in postQueue - size %d, total waiting %d", postQueue.size(), threadsWaitingToPost)); postQueue.wait(QUEUE_WAIT * 1000); threadsWaitingToPost--; } catch (InterruptedException e) { break; } } } } myLogger.info("Interrupted while waiting for space in postQueue"); return false; } public void wrapup() { myLogger.debug("Wrapping up"); try { synchronized (postQueue) { postQueue.notifyAll(); } // Wait for all postback threads to terminate for (Sender sender : senderPool) { sender.join(); } // Close postback connections. httpclient.close(); } catch (Exception e) { myLogger.error("Caught when closing HttpClient: " + e.getMessage()); } broadcast = null; } }