package altk.comm.engine.postback; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.xpath.XPath; import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; import org.apache.commons.httpclient.ConnectTimeoutException; import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; import org.apache.commons.httpclient.Header; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpURL; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.methods.StringRequestEntity; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.log4j.Logger; import org.w3c.dom.Document; import org.w3c.dom.Node; import altk.comm.engine.CommonLogger; /** * Queues JobReports to be posted back to attribute postBackURL. * Multiple internal class Sender members consume this postQueue, sending items * in postQueue to postBackURL. * * In the future, if postBackURL has problem, or if * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing * everything to backingFile. * * @author Kwong * */ public class PostBack { private static final String XML_VERSION_1_0_ENCODING_UTF_8 = ""; private static final int QUEUE_WAIT = 300; // seconds private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds private final String postBackURL; private final String xmlTopElement; private Queue postQueue; private final int maxQueueSize; private List senderPool; private final String myName; private int maxBatchSize; private int threadsWaitingToPost; private static Logger myLogger = Logger.getLogger(PostBack.class); public enum PostBackStatus { SUCCESS, SERVER_IO_ERROR, IRRECOVERABLE_ERROR, HTTP_STATUS_ERROR } class Sender extends Thread { private boolean threadShouldStop; private Sender(String name) { setName(name); start(); } public void run() { threadShouldStop = false; myLogger.info(getName() + " started"); String report; for (;;) // Each iteration sends a batch { if (threadShouldStop) { myLogger.info(getName() + " terminating"); System.out.println(getName() + " terminating"); return; } myLogger.debug("Looking for reports"); List reportList = null; synchronized(postQueue) { // Each iteration examines the queue for a batch to send for (;;) { reportList = new ArrayList(); for (int i = 0; i < maxBatchSize ; i++) { report = postQueue.poll(); if (report == null) break; reportList.add(report); } if (reportList.size() > 0) { myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); postQueue.notifyAll(); break; // break out to do the work. } // Nothing to do, so wait a while, and look at the // queue again. try { myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); postQueue.wait(QUEUE_WAIT * 1000); } catch (InterruptedException e) { CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e); break; } CommonLogger.health.info("Surfacing from wait"); System.out.println(getName() + " surfacing from wait"); continue; } } // synchronized() if (reportList != null && reportList.size() > 0) { switch (post(reportList)) { case IRRECOVERABLE_ERROR: case SUCCESS: break; case SERVER_IO_ERROR: /* Should not requeue report for this may lead to dead lock on this queu. // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log. // Re-queue these reports for (String rpt : reportList) { queueReport(rpt); } */ // Sleep for a while before retrying this PostBack server. CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); try { Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000); } catch (InterruptedException e) { CommonLogger.alarm.warn("Caught while PostBack thread sleeps: " + e); } default: } } } } /** * * @param reportList * @return SUCCESS, * SERVER_IO_ERROR, when postback receiver has problem * IRRECOVERABLE_ERROR */ private PostBackStatus post(List reportList) { StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8); xml.append("<"); xml.append(xmlTopElement); xml.append(">"); for (String report : reportList) { xml.append(report + "\r\n"); } xml.append(""); PostMethod post = new PostMethod(); String responseBody = null; StringRequestEntity requestEntity = null; try { requestEntity = new StringRequestEntity(xml.toString(), "application/xml", "utf-8"); } catch (UnsupportedEncodingException e) { CommonLogger.alarm.warn("While adding this application/xml content to PostBack: " + xml + " -- " + e); return PostBackStatus.IRRECOVERABLE_ERROR; } post.setRequestEntity(requestEntity); post.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false)); /* CommonLogger.activity.debug("Before calling setFolloweRedirects()"); post.setFollowRedirects(false); CommonLogger.activity.debug("After calling setFolloweRedirects()"); */ HttpClient client = new HttpClient(); String url = postBackURL; for (int redirectCount = 0; redirectCount < 3; redirectCount++) { try { client.getHttpConnectionManager().getParams().setConnectionTimeout(5 * 1000); post.setURI(new HttpURL(url)); if (redirectCount == 0) { CommonLogger.activity.info("posting " + xml.toString() + " to " + url); } else { CommonLogger.activity.info("redirected to " + url); } long beginPost = System.currentTimeMillis(); int statusCode = client.executeMethod(post); long postingTime = System.currentTimeMillis() - beginPost; // msec if (statusCode == 302) { Header locationHeader = post.getResponseHeader("Location"); if (locationHeader != null) { url = locationHeader.getValue(); post.releaseConnection(); continue; } else { CommonLogger.alarm.warn("When posting to \"" + url + "\": " + " received status 302 but without location header"); return PostBackStatus.IRRECOVERABLE_ERROR; } } else if (statusCode != 200) { CommonLogger.alarm.warn("Received problem status code " + statusCode + " from posting to \"" + url + "\": " + xml); return PostBackStatus.HTTP_STATUS_ERROR; } responseBody = post.getResponseBodyAsString().trim(); post.releaseConnection(); myLogger.debug("Postback time (msec): " + postingTime); CommonLogger.activity.info("Received response: " + (responseBody.length() == 0? "[empty]" : responseBody)); if (responseBody.trim().length() == 0) return PostBackStatus.SUCCESS; break; } catch (ConnectTimeoutException e) { CommonLogger.alarm.warn("IO problem while posting to \"" + url + "\": " + xml + " -- " + e.getMessage()); return PostBackStatus.SERVER_IO_ERROR; } catch (IOException e) { CommonLogger.alarm.warn("IO problem while posting to \"" + url + "\": " + xml + " -- " + e.getMessage()); return PostBackStatus.SERVER_IO_ERROR; } catch (IllegalArgumentException e) { CommonLogger.alarm.warn("When posting to \"" + url + "\": " + e.getMessage()); return PostBackStatus.IRRECOVERABLE_ERROR; } } if (responseBody == null) { CommonLogger.alarm.warn("When posting to \"" + url + "\": " + " Exhausted allowable redirects"); return PostBackStatus.IRRECOVERABLE_ERROR; } // parse into xml doc Document xmlDoc = null; try { DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder = factory.newDocumentBuilder(); xmlDoc = builder.parse(new ByteArrayInputStream(responseBody.getBytes())); } catch (Exception e) { CommonLogger.alarm.warn("xml parse problem on received response from " + postBackURL + ": " + responseBody); return PostBackStatus.IRRECOVERABLE_ERROR; } if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement)) { CommonLogger.alarm.warn("xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + responseBody); return PostBackStatus.IRRECOVERABLE_ERROR; } XPath xpathEngine = XPathFactory.newInstance().newXPath(); String xpath = null; try { xpath = "@error"; Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE); if (errorNode != null) { String errorCode = errorNode.getNodeValue(); xpath = "error_text"; String errorText = (String)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.STRING); CommonLogger.alarm.warn("Error response to <" + xmlTopElement + "> post back to " + postBackURL + " -- error code=\"" + errorCode + "\", error text = \"" + errorText + "\""); return PostBackStatus.IRRECOVERABLE_ERROR; } } catch (XPathExpressionException e) { CommonLogger.alarm.warn("Bad xpath: " + xpath); return PostBackStatus.IRRECOVERABLE_ERROR; } catch (Exception e) { CommonLogger.alarm.warn("While decoding post back response from server: " + e); return PostBackStatus.IRRECOVERABLE_ERROR; } myLogger.debug("returned from posting"); return PostBackStatus.SUCCESS; } public void terminate() { if (threadShouldStop) return; threadShouldStop = true; //Wait for at most 100 ms for thread to stop interrupt(); } } /** * Constructs a pool of threads doing posting from a common job queue, * to the supplied postBackURL. The top element of the XML that gets * posted back has the give name. * * Requires these System properties: * postback_max_queue_size * postback_threadpool_size * * @param postBackURL * @param xmlTopElementName * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is * not supplied nor valid. */ public PostBack(String postBackURL, String xmlTopElementName, int maxQueueSize, int senderPoolSize, int maxBatchSize) throws IllegalArgumentException { if (postBackURL == null || postBackURL.length() == 0) { throw new IllegalArgumentException("PostBack class given null postBackURL"); } myName = "Postback-" + postBackURL; if (xmlTopElementName == null || xmlTopElementName.length() == 0) { throw new IllegalArgumentException(myName + ": PostBack class given null xmlTopElement"); } this.postBackURL = postBackURL; this.xmlTopElement = xmlTopElementName; this.maxQueueSize = maxQueueSize; this.maxBatchSize = maxBatchSize; postQueue = new LinkedList(); threadsWaitingToPost = 0; senderPool = new ArrayList(); for (int i = 0; i < senderPoolSize; i++) { Sender sender = new Sender(myName + '-' + i); senderPool.add(sender); } } /** * Queues report to postQueue only if the queue size has not reached the * maxQueueSize. * @param report * @return true if report is added to queue, false otherwise (queue full) */ public boolean queueReport(String report) { // Log for recovery in case of problem in posting report. CommonLogger.activity.info("Attempting to queue report"); synchronized(postQueue) { for (;;) { if (postQueue.size() < maxQueueSize) { myLogger.debug("Queing report" + report); postQueue.add(report); myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); postQueue.notifyAll(); return true; } else { myLogger.debug("Waiting for space - postQueue size: " + postQueue.size()); try { threadsWaitingToPost++; myLogger.debug("Threads waiting to post: " + threadsWaitingToPost); postQueue.wait(QUEUE_WAIT * 1000); } catch (InterruptedException e) { break; } threadsWaitingToPost--; } } } myLogger.info("Interrupted while waiting for space to queue report"); return false; } /** * Queues reports to postQueue only if the queue size has not reached the * maxQueueSize. * @param reports to be added back to postQueue * @return true if all jobs have been added to queue, false otherwise (queue full) */ /* @Deprecated public boolean queueReports(List reports) { myLogger.debug(myName + ": postQueue size: " + postQueue.size()); synchronized(postQueue) { Iterator iter = reports.iterator(); int count = 0; // Number of reports added back to postQueue while (iter.hasNext()) { String report = iter.next(); if (postQueue.size() < maxQueueSize) { postQueue.add(report); count++; } } if (count > 0) postQueue.notify(); boolean returnValue = (count == reports.size()); if (!returnValue) { CommonLogger.alarm.warn(myName + ".queueReport method returning false, having queued " + count + " out of " + reports.size()); } return returnValue; } } */ public void terminate() { for (Sender sender : senderPool) { sender.terminate(); } } }