package altk.comm.engine.postback; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.xpath.XPath; import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; import org.apache.commons.httpclient.ConnectTimeoutException; import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; import org.apache.commons.httpclient.Header; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpURL; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.methods.StringRequestEntity; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.log4j.Logger; import org.w3c.dom.Document; import org.w3c.dom.Node; import altk.comm.engine.CommonLogger; /** * Queues JobReports to be posted back to attribute postBackURL. * Multiple internal class Sender members consume this postQueue, sending items * in postQueue to postBackURL. * * In the future, if postBackURL has problem, or if * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing * everything to backingFile. * * @author Kwong * */ public class PostBack { private static final String XML_VERSION_1_0_ENCODING_UTF_8 = ""; private static final int QUEUE_WAIT = 300; // seconds private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds private final String postBackURL; private final String xmlTopElement; private Queue postQueue; private final int maxQueueSize; private List senderPool; private final String myName; private int maxBatchSize; private static Logger myLogger = Logger.getLogger(PostBack.class); public enum PostBackStatus { SUCCESS, SERVER_IO_ERROR, IRRECOVERABLE_ERROR, HTTP_STATUS_ERROR } class Sender extends Thread { private boolean threadShouldStop; private Sender(String name) { setName(name); start(); } public void run() { threadShouldStop = false; myLogger.info(getName() + " started"); String report; for (;;) // Each iteration sends a batch { if (threadShouldStop) { myLogger.info(getName() + " terminating"); System.out.println(getName() + " terminating"); return; } List reportList = null; synchronized(postQueue) { // Each iteration examines the queue for a batch to send for (;;) { reportList = new ArrayList(); for (int i = 0; i < maxBatchSize ; i++) { report = postQueue.poll(); if (report == null) break; reportList.add(report); } if (reportList.size() > 0) { postQueue.notify(); break; // break out to do the work. } // Nothing to do, so wait a while, and look at the // queue again. try { postQueue.wait(QUEUE_WAIT * 1000); } catch (InterruptedException e) { CommonLogger.alarm.info(getName() + ": Postback queue interrupted while waiting: " + e); break; } CommonLogger.health.info(getName() + " surfacing from wait"); System.out.println(getName() + " surfacing from wait"); continue; } } // synchronized() if (reportList != null && reportList.size() > 0) { switch (post(reportList)) { case IRRECOVERABLE_ERROR: case SUCCESS: break; case SERVER_IO_ERROR: // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log. // Re-queue these reports for (String rpt : reportList) { queueReport(rpt); } // Sleep for a while before retrying this PostBack server. CommonLogger.alarm.warn(getName() + ": Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); try { Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000); } catch (InterruptedException e) { CommonLogger.alarm.warn(getName() + ": Caught while PostBack thread sleeps: " + e); } default: } } } } /** * * @param reportList * @return SUCCESS, * SERVER_IO_ERROR, when postback receiver has problem * IRRECOVERABLE_ERROR */ private PostBackStatus post(List reportList) { StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8); xml.append("<"); xml.append(xmlTopElement); xml.append(">"); for (String report : reportList) { xml.append(report); } xml.append(""); PostMethod post = new PostMethod(); String responseBody = null; StringRequestEntity requestEntity = null; try { requestEntity = new StringRequestEntity(xml.toString(), "application/xml", "utf-8"); } catch (UnsupportedEncodingException e) { CommonLogger.alarm.warn(getName() + ": While adding this application/xml content to PostBack: " + xml + " -- " + e); return PostBackStatus.IRRECOVERABLE_ERROR; } post.setRequestEntity(requestEntity); post.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false)); /* CommonLogger.activity.debug("Before calling setFolloweRedirects()"); post.setFollowRedirects(false); CommonLogger.activity.debug("After calling setFolloweRedirects()"); */ HttpClient client = new HttpClient(); String url = postBackURL; for (int redirectCount = 0; redirectCount < 3; redirectCount++) { try { client.getHttpConnectionManager().getParams().setConnectionTimeout(5 * 1000); post.setURI(new HttpURL(url)); if (redirectCount == 0) { CommonLogger.activity.info(getName() + ": posting " + xml.toString() + " to " + url); } else { CommonLogger.activity.info(getName() + ": redirected to " + url); } int statusCode = client.executeMethod(post); if (statusCode == 302) { Header locationHeader = post.getResponseHeader("Location"); if (locationHeader != null) { url = locationHeader.getValue(); post.releaseConnection(); continue; } else { CommonLogger.alarm.warn(getName() + ": When posting to \"" + url + "\": " + " received status 302 but without location header"); return PostBackStatus.IRRECOVERABLE_ERROR; } } else if (statusCode != 200) { CommonLogger.alarm.warn(getName() + ": Received problem status code " + statusCode + " from posting to \"" + url + "\": " + xml); return PostBackStatus.HTTP_STATUS_ERROR; } responseBody = post.getResponseBodyAsString().trim(); post.releaseConnection(); CommonLogger.activity.info(getName() + ": Received response: " + (responseBody.length() == 0? "[empty]" : responseBody)); if (responseBody.trim().length() == 0) return PostBackStatus.SUCCESS; break; } catch (ConnectTimeoutException e) { CommonLogger.alarm.warn(getName() + ": IO problem while posting to \"" + url + "\": " + xml + " -- " + e.getMessage()); return PostBackStatus.SERVER_IO_ERROR; } catch (IOException e) { CommonLogger.alarm.warn(getName() + ": IO problem while posting to \"" + url + "\": " + xml + " -- " + e.getMessage()); return PostBackStatus.SERVER_IO_ERROR; } catch (IllegalArgumentException e) { CommonLogger.alarm.warn(getName() + ": When posting to \"" + url + "\": " + e.getMessage()); return PostBackStatus.IRRECOVERABLE_ERROR; } } if (responseBody == null) { CommonLogger.alarm.warn(getName() + ": When posting to \"" + url + "\": " + " Exhausted allowable redirects"); return PostBackStatus.IRRECOVERABLE_ERROR; } // parse into xml doc Document xmlDoc = null; try { DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder = factory.newDocumentBuilder(); xmlDoc = builder.parse(new ByteArrayInputStream(responseBody.getBytes())); } catch (Exception e) { CommonLogger.alarm.warn(getName() + ": xml parse problem on received response from " + postBackURL + ": " + responseBody); return PostBackStatus.IRRECOVERABLE_ERROR; } if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement)) { CommonLogger.alarm.warn(getName() + ": xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + responseBody); return PostBackStatus.IRRECOVERABLE_ERROR; } XPath xpathEngine = XPathFactory.newInstance().newXPath(); String xpath = null; try { xpath = "@error"; Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE); if (errorNode != null) { String errorCode = errorNode.getNodeValue(); xpath = "error_text"; String errorText = (String)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.STRING); CommonLogger.alarm.warn(getName() + ": Error response to <" + xmlTopElement + "> post back to " + postBackURL + " -- error code=\"" + errorCode + "\", error text = \"" + errorText + "\""); return PostBackStatus.IRRECOVERABLE_ERROR; } } catch (XPathExpressionException e) { CommonLogger.alarm.warn("Bad xpath: " + xpath); return PostBackStatus.IRRECOVERABLE_ERROR; } catch (Exception e) { CommonLogger.alarm.warn(getName() + ": While decoding post back response from server: " + e); return PostBackStatus.IRRECOVERABLE_ERROR; } return PostBackStatus.SUCCESS; } public void terminate() { if (threadShouldStop) return; threadShouldStop = true; //Wait for at most 100 ms for thread to stop interrupt(); } } /** * Constructs a pool of threads doing posting from a common job queue, * to the supplied postBackURL. The top element of the XML that gets * posted back has the give name. * * Requires these System properties: * postback_max_queue_size * postback_threadpool_size * * @param postBackURL * @param xmlTopElementName * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is * not supplied nor valid. */ public PostBack(String postBackURL, String xmlTopElementName, int maxQueueSize, int senderPoolSize, int maxBatchSize) throws IllegalArgumentException { if (postBackURL == null || postBackURL.length() == 0) { throw new IllegalArgumentException("PostBack class given null postBackURL"); } myName = "Postback-" + postBackURL; if (xmlTopElementName == null || xmlTopElementName.length() == 0) { throw new IllegalArgumentException(myName + ": PostBack class given null xmlTopElement"); } this.postBackURL = postBackURL; this.xmlTopElement = xmlTopElementName; this.maxQueueSize = maxQueueSize; this.maxBatchSize = maxBatchSize; postQueue = new LinkedList(); senderPool = new ArrayList(); for (int i = 0; i < senderPoolSize; i++) { Sender sender = new Sender(myName + '-' + i); senderPool.add(sender); } } /** * Queues report to postQueue only if the queue size has not reached the * maxQueueSize. * @param report * @return true if report is added to queue, false otherwise (queue full) */ public boolean queueReport(String report) { // Log for recovery in case of problem in posting report. CommonLogger.activity.info(myName + " queing report: " + report); myLogger.debug(myName + ": postQueue size: " + postQueue.size()); synchronized(postQueue) { for (;;) { if (postQueue.size() < maxQueueSize) { postQueue.add(report); postQueue.notify(); return true; } else { myLogger.debug("Waiting for space in postQueue to queue report"); try { postQueue.wait(); } catch (InterruptedException e) { break; } } } } myLogger.error("Interrupted while waiting for space to queue report"); return false; } /** * Queues reports to postQueue only if the queue size has not reached the * maxQueueSize. * @param reports to be added back to postQueue * @return true if all jobs have been added to queue, false otherwise (queue full) */ /* @Deprecated public boolean queueReports(List reports) { myLogger.debug(myName + ": postQueue size: " + postQueue.size()); synchronized(postQueue) { Iterator iter = reports.iterator(); int count = 0; // Number of reports added back to postQueue while (iter.hasNext()) { String report = iter.next(); if (postQueue.size() < maxQueueSize) { postQueue.add(report); count++; } } if (count > 0) postQueue.notify(); boolean returnValue = (count == reports.size()); if (!returnValue) { CommonLogger.alarm.warn(myName + ".queueReport method returning false, having queued " + count + " out of " + reports.size()); } return returnValue; } } */ public void terminate() { for (Sender sender : senderPool) { sender.terminate(); } } }