package altk.comm.engine.postback; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import javax.net.ssl.SSLContext; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.xpath.XPath; import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; import org.w3c.dom.Document; import org.w3c.dom.Node; import altk.comm.engine.CommonLogger; /** * Queues JobReports to be posted back to attribute postBackURL. * Multiple internal class Sender members consume this postQueue, sending items * in postQueue to postBackURL. * * In the future, if postBackURL has problem, or if * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing * everything to backingFile. * * @author Kwong * */ public class PostBack { private static final String XML_VERSION_1_0_ENCODING_UTF_8 = ""; private static final int QUEUE_WAIT = 300; // seconds private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds private final String postBackURL; private final String xmlTopElement; private Queue postQueue; private final int maxQueueSize; private List senderPool; private final String myName; private int maxBatchSize; private int threadsWaitingToPost; private static Logger myLogger = Logger.getLogger(PostBack.class); public enum PostBackStatus { SUCCESS, SERVER_IO_ERROR, IRRECOVERABLE_ERROR, HTTP_STATUS_ERROR } class Sender extends Thread { private boolean threadShouldStop; private Sender(String name) { setName(name); start(); } public void run() { threadShouldStop = false; myLogger.info(getName() + " started"); String report; for (;;) // Each iteration sends a batch { if (threadShouldStop) { myLogger.info(getName() + " terminating"); System.out.println(getName() + " terminating"); return; } myLogger.debug("Looking for reports"); List reportList = null; synchronized(postQueue) { // Each iteration examines the queue for a batch to send for (;;) { reportList = new ArrayList(); for (int i = 0; i < maxBatchSize ; i++) { report = postQueue.poll(); if (report == null) break; reportList.add(report); } if (reportList.size() > 0) { myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); postQueue.notifyAll(); break; // break out to do the work. } // Nothing to do, so wait a while, and look at the // queue again. try { myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); postQueue.wait(QUEUE_WAIT * 1000); } catch (InterruptedException e) { CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e); break; } CommonLogger.health.info("Surfacing from wait"); System.out.println(getName() + " surfacing from wait"); continue; } } // synchronized() if (reportList != null && reportList.size() > 0) { switch (post(reportList)) { case IRRECOVERABLE_ERROR: case SUCCESS: break; case SERVER_IO_ERROR: /* Should not requeue report for this may lead to dead lock on this queu. // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log. // Re-queue these reports for (String rpt : reportList) { queueReport(rpt); } */ // Sleep for a while before retrying this PostBack server. CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); try { Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000); } catch (InterruptedException e) { CommonLogger.alarm.warn("Caught while PostBack thread sleeps: " + e); } default: } } } } /** * * @param reportList * @return SUCCESS, * SERVER_IO_ERROR, when postback receiver has problem * IRRECOVERABLE_ERROR */ private PostBackStatus post(List reportList) { StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8); xml.append("<"); xml.append(xmlTopElement); xml.append(">"); for (String report : reportList) { xml.append(report + "\r\n"); } xml.append(""); TrustStrategy tustAllCerts = new TrustStrategy() { public boolean isTrusted(X509Certificate[] chain, String authType) { return true; } }; HttpPost httpPost = new HttpPost(postBackURL); StringEntity requestEntity; CloseableHttpResponse response = null; byte[] xmlBytes = null; try { SSLContext sslContext = SSLContextBuilder.create().loadTrustMaterial(tustAllCerts).build(); SSLConnectionSocketFactory connectionFactory = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier()); CloseableHttpClient httpclient = HttpClientBuilder.create().setSSLSocketFactory(connectionFactory).build(); requestEntity = (new StringEntity(xml.toString())); httpPost.setEntity(requestEntity); myLogger.debug("Posting to " + postBackURL + ": " + xml); response = httpclient.execute(httpPost, new BasicHttpContext()); StatusLine statusLine = response.getStatusLine(); int statusCode = statusLine.getStatusCode(); if (statusCode != 200) { CommonLogger.alarm.error("Got error status code " + statusCode + " while posting status to broadcast requester"); return PostBackStatus.HTTP_STATUS_ERROR; } } catch (UnsupportedEncodingException e) { CommonLogger.alarm.warn("While adding this application/xml content to PostBack: " + xml + " -- " + e); return PostBackStatus.IRRECOVERABLE_ERROR; } catch (IOException e) { CommonLogger.alarm.error("While posting back to broadcast requester: " + e); return PostBackStatus.IRRECOVERABLE_ERROR; } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) { CommonLogger.alarm.error("Totally unexpected. While constructing HttpClient for posting back to broadcast requester: " + e); return PostBackStatus.IRRECOVERABLE_ERROR; } String xmlStr; try { xmlBytes = EntityUtils.toByteArray(response.getEntity()); xmlStr = new String(xmlBytes); myLogger.debug("Received resposne: " + xmlStr); } catch (IOException e) { CommonLogger.alarm.error("While getting response from posting to broadcast requester: " + e); return PostBackStatus.SERVER_IO_ERROR; } Document xmlDoc = null; try { DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder = factory.newDocumentBuilder(); xmlDoc = builder.parse(new ByteArrayInputStream(xmlBytes)); } catch (Exception e) { CommonLogger.alarm.warn("xml parse problem on received response from " + postBackURL + ": " + xmlStr); return PostBackStatus.IRRECOVERABLE_ERROR; } if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement)) { CommonLogger.alarm.warn("xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + xmlStr); return PostBackStatus.IRRECOVERABLE_ERROR; } XPath xpathEngine = XPathFactory.newInstance().newXPath(); String xpath = null; try { xpath = "@error"; Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE); if (errorNode != null) { String errorCode = errorNode.getNodeValue(); xpath = "error_text"; String errorText = (String)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.STRING); CommonLogger.alarm.warn("Error response to <" + xmlTopElement + "> post back to " + postBackURL + " -- error code=\"" + errorCode + "\", error text = \"" + errorText + "\""); return PostBackStatus.IRRECOVERABLE_ERROR; } } catch (XPathExpressionException e) { CommonLogger.alarm.warn("Bad xpath: " + xpath); return PostBackStatus.IRRECOVERABLE_ERROR; } catch (Exception e) { CommonLogger.alarm.warn("While decoding post back response from server: " + e); return PostBackStatus.IRRECOVERABLE_ERROR; } myLogger.debug("returned from posting"); return PostBackStatus.SUCCESS; } public void terminate() { if (threadShouldStop) return; threadShouldStop = true; //Wait for at most 100 ms for thread to stop interrupt(); } } /** * Constructs a pool of threads doing posting from a common job queue, * to the supplied postBackURL. The top element of the XML that gets * posted back has the give name. * * Requires these System properties: * postback_max_queue_size * postback_threadpool_size * * @param postBackURL * @param xmlTopElementName * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is * not supplied nor valid. */ public PostBack(String postBackURL, String xmlTopElementName, int maxQueueSize, int senderPoolSize, int maxBatchSize) throws IllegalArgumentException { if (postBackURL == null || postBackURL.length() == 0) { throw new IllegalArgumentException("PostBack class given null postBackURL"); } myName = "Postback-" + postBackURL; if (xmlTopElementName == null || xmlTopElementName.length() == 0) { throw new IllegalArgumentException(myName + ": PostBack class given null xmlTopElement"); } this.postBackURL = postBackURL; this.xmlTopElement = xmlTopElementName; this.maxQueueSize = maxQueueSize; this.maxBatchSize = maxBatchSize; postQueue = new LinkedList(); threadsWaitingToPost = 0; senderPool = new ArrayList(); for (int i = 0; i < senderPoolSize; i++) { Sender sender = new Sender(myName + '-' + i); senderPool.add(sender); } } /** * Queues report to postQueue only if the queue size has not reached the * maxQueueSize. * @param report * @return true if report is added to queue, false otherwise (queue full) */ public boolean queueReport(String report) { // Log for recovery in case of problem in posting report. CommonLogger.activity.info("Attempting to queue report"); synchronized(postQueue) { for (;;) { if (postQueue.size() < maxQueueSize) { myLogger.debug("Queing report" + report); postQueue.add(report); myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); postQueue.notifyAll(); return true; } else { myLogger.debug("Waiting for space - postQueue size: " + postQueue.size()); try { threadsWaitingToPost++; myLogger.debug("Threads waiting to post: " + threadsWaitingToPost); postQueue.wait(QUEUE_WAIT * 1000); } catch (InterruptedException e) { break; } threadsWaitingToPost--; } } } myLogger.info("Interrupted while waiting for space to queue report"); return false; } public void terminate() { for (Sender sender : senderPool) { sender.terminate(); } } }