Tested to work correctly with a 1-recipient SMS broadcast. Need more testing.tags/CommEngine-0.0.3
| @@ -24,7 +24,6 @@ import altk.comm.engine.exception.BroadcastException; | |||||
| import altk.comm.engine.exception.EngineException; | import altk.comm.engine.exception.EngineException; | ||||
| import altk.comm.engine.exception.PlatformError; | import altk.comm.engine.exception.PlatformError; | ||||
| import altk.comm.engine.exception.PlatformException; | import altk.comm.engine.exception.PlatformException; | ||||
| import altk.comm.engine.postback.PostBack; | |||||
| /** | /** | ||||
| * Broadcast class absorbs what was formerly known as Dispatcher class. | * Broadcast class absorbs what was formerly known as Dispatcher class. | ||||
| @@ -68,8 +67,8 @@ public abstract class Broadcast | |||||
| private String launchRecordId; | private String launchRecordId; | ||||
| // protected XPath xpathEngine; | // protected XPath xpathEngine; | ||||
| protected String postBackURL; | |||||
| private PostBack postBack; | |||||
| protected String postbackURL; | |||||
| private Postback postback; | |||||
| public long expireTime; | public long expireTime; | ||||
| /** | /** | ||||
| @@ -83,7 +82,7 @@ public abstract class Broadcast | |||||
| protected List<Service> serviceThreadPool; | protected List<Service> serviceThreadPool; | ||||
| private Object resumeFlag; // Semaphore for dispatcher threads to resume. | private Object resumeFlag; // Semaphore for dispatcher threads to resume. | ||||
| protected List<Recipient> recipientList; | protected List<Recipient> recipientList; | ||||
| //private int remainingJobs; | |||||
| private int jobReportsQueued; | |||||
| private ScheduledExecutorService scheduler; | private ScheduledExecutorService scheduler; | ||||
| private int serviceThreadPoolSize; | private int serviceThreadPoolSize; | ||||
| @@ -381,6 +380,8 @@ public abstract class Broadcast | |||||
| } | } | ||||
| } | } | ||||
| /** | /** | ||||
| * | * | ||||
| * @param broadcastType | * @param broadcastType | ||||
| @@ -392,12 +393,15 @@ public abstract class Broadcast | |||||
| this.broadcastType = broadcastType; | this.broadcastType = broadcastType; | ||||
| this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName; | this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName; | ||||
| this.jobReportRootNodeName = jobReportRootNodeName; | this.jobReportRootNodeName = jobReportRootNodeName; | ||||
| jobReportsQueued = 0; | |||||
| sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT; | sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT; | ||||
| readyQueue = new LinkedBlockingQueue<Job>(); | readyQueue = new LinkedBlockingQueue<Job>(); | ||||
| serviceThreadPool = new ArrayList<Service>(); | serviceThreadPool = new ArrayList<Service>(); | ||||
| recipientList = new ArrayList<Recipient>(); | recipientList = new ArrayList<Recipient>(); | ||||
| scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); | scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); | ||||
| resumeFlag = new Object(); | resumeFlag = new Object(); | ||||
| receiveTime = System.currentTimeMillis(); | receiveTime = System.currentTimeMillis(); | ||||
| } | } | ||||
| /** | /** | ||||
| @@ -455,9 +459,8 @@ public abstract class Broadcast | |||||
| setState(BroadcastState.COMPLETED, "No recipients", null); | setState(BroadcastState.COMPLETED, "No recipients", null); | ||||
| return; | return; | ||||
| } | } | ||||
| postBack = (PostBack)commEngine.getPostBack(getPostBackURL(), broadcastType); | |||||
| initSync(commEngine.getResources()); | initSync(commEngine.getResources()); | ||||
| init(postBack); | |||||
| init(); | |||||
| if (getState() == BroadcastState.COMPLETED) return; | if (getState() == BroadcastState.COMPLETED) return; | ||||
| } | } | ||||
| catch (BroadcastException e) | catch (BroadcastException e) | ||||
| @@ -506,7 +509,12 @@ public abstract class Broadcast | |||||
| { | { | ||||
| initAsync(); | initAsync(); | ||||
| effectiveJobCount = recipientList.size(); | |||||
| int jobsTotal = recipientList.size(); | |||||
| effectiveJobCount = jobsTotal; | |||||
| postback = new Postback(this, | |||||
| commEngine.getPostbackMaxQueueSize(), | |||||
| commEngine.getPostbackSenderPoolSize(), | |||||
| commEngine.getPostbackMaxBatchSize()); | |||||
| // Create service thread pool to dispatch jobs, | // Create service thread pool to dispatch jobs, | ||||
| // at the same time, setting up a list of service thread names | // at the same time, setting up a list of service thread names | ||||
| @@ -516,7 +524,7 @@ public abstract class Broadcast | |||||
| List<String> serviceThreadNames = new ArrayList<String>(); | List<String> serviceThreadNames = new ArrayList<String>(); | ||||
| for (int i = 0; i < serviceThreadPoolSize; i++) | for (int i = 0; i < serviceThreadPoolSize; i++) | ||||
| { | { | ||||
| String threadName = broadcastId + "_service_thread_" + i; | |||||
| String threadName = broadcastId + "-service-thread." + i; | |||||
| Service serviceThread = new Service(threadName); | Service serviceThread = new Service(threadName); | ||||
| serviceThreadPool.add(serviceThread); | serviceThreadPool.add(serviceThread); | ||||
| serviceThreadNames.add(threadName); | serviceThreadNames.add(threadName); | ||||
| @@ -531,7 +539,11 @@ public abstract class Broadcast | |||||
| CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); | CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); | ||||
| myLogger.error("Broadcast aborted", e); | myLogger.error("Broadcast aborted", e); | ||||
| } | } | ||||
| catch (Exception e) | |||||
| { | |||||
| CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); | |||||
| myLogger.error("Broadcast aborted", e); | |||||
| } | |||||
| } | } | ||||
| protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); | protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); | ||||
| @@ -595,9 +607,9 @@ public abstract class Broadcast | |||||
| this.stateErrorText = stateErrorText; | this.stateErrorText = stateErrorText; | ||||
| CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state)); | CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state)); | ||||
| if (postBack != null) | |||||
| if (postback != null) | |||||
| { | { | ||||
| postBack.queueReport(mkStatusReport()); | |||||
| postback.queueReport(mkStatusReport()); | |||||
| } | } | ||||
| return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); | return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); | ||||
| @@ -687,9 +699,9 @@ public abstract class Broadcast | |||||
| return responseXML.toString(); | return responseXML.toString(); | ||||
| } | } | ||||
| public String getPostBackURL() | |||||
| public String getPostbackURL() | |||||
| { | { | ||||
| return postBackURL; | |||||
| return postbackURL; | |||||
| } | } | ||||
| protected String mkResponseXML(String errorCode, String errorText) | protected String mkResponseXML(String errorCode, String errorText) | ||||
| @@ -878,16 +890,12 @@ public abstract class Broadcast | |||||
| * | * | ||||
| * @throws BroadcastException | * @throws BroadcastException | ||||
| */ | */ | ||||
| protected final void init(PostBack postBack) | |||||
| protected final void init() | |||||
| { | { | ||||
| // Remember postBack | |||||
| this.postBack = postBack; | |||||
| for (Recipient recipient : recipientList) | for (Recipient recipient : recipientList) | ||||
| { | { | ||||
| readyQueue.add(mkJob(recipient)); | readyQueue.add(mkJob(recipient)); | ||||
| } | } | ||||
| //remainingJobs = readyQueue.size(); | |||||
| } | } | ||||
| protected abstract void initSync(EngineResources resources) throws BroadcastException; | protected abstract void initSync(EngineResources resources) throws BroadcastException; | ||||
| @@ -1012,7 +1020,7 @@ public abstract class Broadcast | |||||
| */ | */ | ||||
| protected void close() | protected void close() | ||||
| { | { | ||||
| // Do nothing in base class | |||||
| postback = null; | |||||
| } | } | ||||
| /** | /** | ||||
| @@ -1075,31 +1083,6 @@ public abstract class Broadcast | |||||
| public void postJobStatus(Job job) | public void postJobStatus(Job job) | ||||
| { | { | ||||
| postJobStatus(job, -1); | postJobStatus(job, -1); | ||||
| /* | |||||
| if (postBack != null) | |||||
| { | |||||
| JobReport report = mkJobReport(); | |||||
| report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); | |||||
| report.init(job); | |||||
| postBack.queueReport(report.toString()); | |||||
| } | |||||
| if (job.jobStatus.isTerminal()) | |||||
| { | |||||
| remainingJobs--; | |||||
| completedJobCount++; | |||||
| if (remainingJobs == 0) | |||||
| { | |||||
| terminate(BroadcastState.COMPLETED); | |||||
| } | |||||
| else if (getActiveJobCount() == 0) | |||||
| { | |||||
| if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); | |||||
| else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED); | |||||
| } | |||||
| } | |||||
| */ | |||||
| } | } | ||||
| /** | /** | ||||
| @@ -1114,12 +1097,13 @@ public abstract class Broadcast | |||||
| //postJobStatus(job); | //postJobStatus(job); | ||||
| logJobCount("Entering postJobStatus"); | logJobCount("Entering postJobStatus"); | ||||
| myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS); | myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS); | ||||
| if (postBack != null) | |||||
| if (postback != null) | |||||
| { | { | ||||
| JobReport report = mkJobReport(); | JobReport report = mkJobReport(); | ||||
| report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); | report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); | ||||
| report.init(job); | report.init(job); | ||||
| postBack.queueReport(report.toString()); | |||||
| postback.queueReport(report.toString()); | |||||
| jobReportsQueued++; | |||||
| } | } | ||||
| //if (job.jobStatus.isTerminal()) | //if (job.jobStatus.isTerminal()) | ||||
| @@ -1135,7 +1119,8 @@ public abstract class Broadcast | |||||
| incrementCompletedCount(); | incrementCompletedCount(); | ||||
| logJobCount("Completed a job"); | logJobCount("Completed a job"); | ||||
| if (getRemainingJobCount() == 0) | |||||
| //if (getRemainingJobCount() == 0) | |||||
| if (jobReportsQueued == recipientList.size()) | |||||
| { | { | ||||
| terminate(BroadcastState.COMPLETED); | terminate(BroadcastState.COMPLETED); | ||||
| } | } | ||||
| @@ -1225,4 +1210,8 @@ public abstract class Broadcast | |||||
| { | { | ||||
| this.serviceThreadPoolSize = serviceThreadPoolSize; | this.serviceThreadPoolSize = serviceThreadPoolSize; | ||||
| } | } | ||||
| public String getBroadcastType() { | |||||
| return broadcastType; | |||||
| } | |||||
| } | } | ||||
| @@ -5,9 +5,6 @@ import java.io.FileInputStream; | |||||
| import java.io.FileNotFoundException; | import java.io.FileNotFoundException; | ||||
| import java.io.IOException; | import java.io.IOException; | ||||
| import java.io.PrintWriter; | import java.io.PrintWriter; | ||||
| import java.security.KeyManagementException; | |||||
| import java.security.KeyStoreException; | |||||
| import java.security.NoSuchAlgorithmException; | |||||
| import java.util.Enumeration; | import java.util.Enumeration; | ||||
| import java.util.HashMap; | import java.util.HashMap; | ||||
| import java.util.Map; | import java.util.Map; | ||||
| @@ -26,11 +23,6 @@ import org.apache.log4j.Logger; | |||||
| import org.apache.log4j.PropertyConfigurator; | import org.apache.log4j.PropertyConfigurator; | ||||
| import altk.comm.engine.Broadcast.BroadcastState; | import altk.comm.engine.Broadcast.BroadcastState; | ||||
| import altk.comm.engine.exception.BroadcastError; | |||||
| import altk.comm.engine.exception.BroadcastException; | |||||
| import altk.comm.engine.exception.PlatformError; | |||||
| import altk.comm.engine.exception.PlatformException; | |||||
| import altk.comm.engine.postback.PostBack; | |||||
| @SuppressWarnings("serial") | @SuppressWarnings("serial") | ||||
| public abstract class CommEngine extends HttpServlet | public abstract class CommEngine extends HttpServlet | ||||
| @@ -55,8 +47,6 @@ public abstract class CommEngine extends HttpServlet | |||||
| protected Properties config; | protected Properties config; | ||||
| protected Map<String, PostBack> postBackMap; | |||||
| protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice" | protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice" | ||||
| private long startupTimestamp; | private long startupTimestamp; | ||||
| @@ -64,8 +54,6 @@ public abstract class CommEngine extends HttpServlet | |||||
| // Sequencing naming of broadcast that fails to yield its broadcastId | // Sequencing naming of broadcast that fails to yield its broadcastId | ||||
| private int unknownBroadcastIdNdx = 1; | private int unknownBroadcastIdNdx = 1; | ||||
| private BroadcastException myException; | |||||
| /** | /** | ||||
| * Used to communicate media-specific platform resources to broadcasts | * Used to communicate media-specific platform resources to broadcasts | ||||
| */ | */ | ||||
| @@ -97,7 +85,6 @@ public abstract class CommEngine extends HttpServlet | |||||
| this.engineName = engineName; | this.engineName = engineName; | ||||
| broadcasts = new HashMap<String, Broadcast>(); | broadcasts = new HashMap<String, Broadcast>(); | ||||
| startupTimestamp = System.currentTimeMillis(); | startupTimestamp = System.currentTimeMillis(); | ||||
| myException = null; | |||||
| } | } | ||||
| /** | /** | ||||
| @@ -198,8 +185,6 @@ public abstract class CommEngine extends HttpServlet | |||||
| return; | return; | ||||
| } | } | ||||
| postBackMap = new HashMap<String, PostBack>(); | |||||
| // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes | // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes | ||||
| String periodStr = config.getProperty("dead_broadcast_viewing_period", | String periodStr = config.getProperty("dead_broadcast_viewing_period", | ||||
| new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString()); | new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString()); | ||||
| @@ -492,12 +477,6 @@ public abstract class CommEngine extends HttpServlet | |||||
| // Shutdown threads that periodically purge stale broadcasts. | // Shutdown threads that periodically purge stale broadcasts. | ||||
| scheduler.shutdownNow(); | scheduler.shutdownNow(); | ||||
| // Kill threads in each PostBack, which is remembered in postBackMap. | |||||
| for (PostBack postback : postBackMap.values()) | |||||
| { | |||||
| postback.terminate(); | |||||
| } | |||||
| for (Broadcast broadcast : broadcasts.values()) | for (Broadcast broadcast : broadcasts.values()) | ||||
| { | { | ||||
| broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); | broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); | ||||
| @@ -517,23 +496,6 @@ public abstract class CommEngine extends HttpServlet | |||||
| */ | */ | ||||
| abstract protected void destroyChild(); | abstract protected void destroyChild(); | ||||
| public PostBack getPostBack(String postBackURL, String broadcastType) throws BroadcastException | |||||
| { | |||||
| if (postBackURL == null) return null; | |||||
| PostBack postBack = postBackMap.get(postBackURL); | |||||
| if (postBack != null) return postBack; | |||||
| try { | |||||
| postBack = new PostBack(postBackURL, broadcastType + "_status", | |||||
| postbackMaxQueueSize, postbackSenderPoolSize, postbackMaxBatchSize); | |||||
| } catch (KeyManagementException | IllegalArgumentException | NoSuchAlgorithmException | KeyStoreException e) { | |||||
| throw new BroadcastException(BroadcastError.PLATFORM_ERROR, e.getMessage(), e); | |||||
| } | |||||
| postBackMap.put(postBackURL, postBack); | |||||
| return postBack; | |||||
| } | |||||
| public EngineResources getResources() | public EngineResources getResources() | ||||
| { | { | ||||
| return resources; | return resources; | ||||
| @@ -561,4 +523,16 @@ public abstract class CommEngine extends HttpServlet | |||||
| { | { | ||||
| return serviceThreadPoolSize; | return serviceThreadPoolSize; | ||||
| } | } | ||||
| public int getPostbackMaxQueueSize() { | |||||
| return postbackMaxQueueSize; | |||||
| } | |||||
| public int getPostbackSenderPoolSize() { | |||||
| return postbackSenderPoolSize; | |||||
| } | |||||
| public int getPostbackMaxBatchSize() { | |||||
| return postbackMaxBatchSize; | |||||
| } | |||||
| } | } | ||||
| @@ -158,9 +158,9 @@ public abstract class XMLDOMBroadcast extends Broadcast | |||||
| } | } | ||||
| // Postback | // Postback | ||||
| postBackURL = getStringValue("async_status_post_back", broadcastNode); | |||||
| if (postBackURL != null && (postBackURL=postBackURL.trim()).length() == 0) postBackURL = null; | |||||
| if (postBackURL == null) | |||||
| postbackURL = getStringValue("async_status_post_back", broadcastNode); | |||||
| if (postbackURL != null && (postbackURL=postbackURL.trim()).length() == 0) postbackURL = null; | |||||
| if (postbackURL == null) | |||||
| { | { | ||||
| CommonLogger.alarm.warn("Missing asyn_status_post_back in POST data"); | CommonLogger.alarm.warn("Missing asyn_status_post_back in POST data"); | ||||
| } | } | ||||
| @@ -100,7 +100,7 @@ public abstract class XMLSAXBroadcast extends Broadcast | |||||
| } | } | ||||
| else if (qName.equals("async_status_post_back")) | else if (qName.equals("async_status_post_back")) | ||||
| { | { | ||||
| postBackURL = getTrimmedText(); | |||||
| postbackURL = getTrimmedText(); | |||||
| } | } | ||||
| else if (qName.equals("expire_time")) | else if (qName.equals("expire_time")) | ||||
| { | { | ||||
| @@ -1,452 +0,0 @@ | |||||
| package altk.comm.engine.postback; | |||||
| import java.io.ByteArrayInputStream; | |||||
| import java.io.IOException; | |||||
| import java.io.UnsupportedEncodingException; | |||||
| import java.security.KeyManagementException; | |||||
| import java.security.KeyStoreException; | |||||
| import java.security.NoSuchAlgorithmException; | |||||
| import java.security.cert.X509Certificate; | |||||
| import java.util.ArrayList; | |||||
| import java.util.LinkedList; | |||||
| import java.util.List; | |||||
| import java.util.Queue; | |||||
| import javax.net.ssl.SSLContext; | |||||
| import javax.xml.parsers.DocumentBuilder; | |||||
| import javax.xml.parsers.DocumentBuilderFactory; | |||||
| import javax.xml.xpath.XPath; | |||||
| import javax.xml.xpath.XPathConstants; | |||||
| import javax.xml.xpath.XPathExpressionException; | |||||
| import javax.xml.xpath.XPathFactory; | |||||
| import org.apache.http.StatusLine; | |||||
| import org.apache.http.client.HttpRequestRetryHandler; | |||||
| import org.apache.http.client.methods.CloseableHttpResponse; | |||||
| import org.apache.http.client.methods.HttpPost; | |||||
| import org.apache.http.conn.ssl.NoopHostnameVerifier; | |||||
| import org.apache.http.conn.ssl.SSLConnectionSocketFactory; | |||||
| import org.apache.http.conn.ssl.TrustStrategy; | |||||
| import org.apache.http.entity.StringEntity; | |||||
| import org.apache.http.impl.client.CloseableHttpClient; | |||||
| import org.apache.http.impl.client.HttpClientBuilder; | |||||
| import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; | |||||
| import org.apache.http.protocol.BasicHttpContext; | |||||
| import org.apache.http.protocol.HttpContext; | |||||
| import org.apache.http.ssl.SSLContextBuilder; | |||||
| import org.apache.http.util.EntityUtils; | |||||
| import org.apache.log4j.Logger; | |||||
| import org.w3c.dom.Document; | |||||
| import org.w3c.dom.Node; | |||||
| import altk.comm.engine.CommonLogger; | |||||
| /** | |||||
| * Queues JobReports to be posted back to attribute postBackURL. | |||||
| * Multiple internal class Sender members consume this postQueue, sending items | |||||
| * in postQueue to postBackURL. | |||||
| * | |||||
| * In the future, if postBackURL has problem, or if | |||||
| * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing | |||||
| * everything to backingFile. | |||||
| * | |||||
| * @author Kwong | |||||
| * | |||||
| */ | |||||
| public class PostBack | |||||
| { | |||||
| private static final String XML_VERSION_1_0_ENCODING_UTF_8 = "<?xml version=\"1.0\" encoding=\"utf-8\"?>"; | |||||
| private static final int QUEUE_WAIT = 300; // seconds | |||||
| private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds | |||||
| private static final int RETRIES_DEFAULT = 3; | |||||
| private final String postBackURL; | |||||
| private final String xmlTopElement; | |||||
| private Queue<String> postQueue; | |||||
| private final int maxQueueSize; | |||||
| private List<Sender> senderPool; | |||||
| private final String myName; | |||||
| private int maxBatchSize; | |||||
| private PoolingHttpClientConnectionManager cm; | |||||
| private int threadsWaitingToPost; | |||||
| private TrustStrategy tustAllCerts; | |||||
| SSLContext sslContext; | |||||
| SSLConnectionSocketFactory connectionFactory; | |||||
| CloseableHttpClient httpclient; | |||||
| private int maxRetries; | |||||
| private static Logger myLogger = Logger.getLogger(PostBack.class); | |||||
| public enum PostBackStatus | |||||
| { | |||||
| SUCCESS, | |||||
| SERVER_IO_ERROR, | |||||
| IRRECOVERABLE_ERROR, | |||||
| HTTP_STATUS_ERROR | |||||
| } | |||||
| class Sender extends Thread | |||||
| { | |||||
| private boolean threadShouldStop; | |||||
| private Sender(String name) | |||||
| { | |||||
| setName(name); | |||||
| start(); | |||||
| } | |||||
| public void run() | |||||
| { | |||||
| threadShouldStop = false; | |||||
| myLogger.info(getName() + " started"); | |||||
| String report; | |||||
| for (;;) // Each iteration sends a batch | |||||
| { | |||||
| if (threadShouldStop) | |||||
| { | |||||
| myLogger.info(getName() + " terminating"); | |||||
| System.out.println(getName() + " terminating"); | |||||
| return; | |||||
| } | |||||
| myLogger.debug("Looking for reports"); | |||||
| List<String> reportList = null; | |||||
| synchronized(postQueue) | |||||
| { | |||||
| // Each iteration examines the queue for a batch to send | |||||
| for (;;) | |||||
| { | |||||
| reportList = new ArrayList<String>(); | |||||
| for (int i = 0; i < maxBatchSize ; i++) | |||||
| { | |||||
| report = postQueue.poll(); | |||||
| if (report == null) break; | |||||
| reportList.add(report); | |||||
| } | |||||
| if (reportList.size() > 0) | |||||
| { | |||||
| myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); | |||||
| postQueue.notifyAll(); | |||||
| break; // break out to do the work. | |||||
| } | |||||
| // Nothing to do, so wait a while, and look at the | |||||
| // queue again. | |||||
| try | |||||
| { | |||||
| myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); | |||||
| postQueue.wait(QUEUE_WAIT * 1000); | |||||
| } | |||||
| catch (InterruptedException e) | |||||
| { | |||||
| CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e); | |||||
| break; | |||||
| } | |||||
| CommonLogger.health.info("Surfacing from wait"); | |||||
| System.out.println(getName() + " surfacing from wait"); | |||||
| continue; | |||||
| } | |||||
| } // synchronized() | |||||
| if (reportList != null && reportList.size() > 0) | |||||
| { | |||||
| switch (post(reportList)) | |||||
| { | |||||
| case IRRECOVERABLE_ERROR: | |||||
| case SUCCESS: | |||||
| break; | |||||
| case SERVER_IO_ERROR: | |||||
| /* Should not requeue report for this may lead to dead lock on this queu. | |||||
| // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log. | |||||
| // Re-queue these reports | |||||
| for (String rpt : reportList) | |||||
| { | |||||
| queueReport(rpt); | |||||
| } | |||||
| */ | |||||
| // Sleep for a while before retrying this PostBack server. | |||||
| CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); | |||||
| try | |||||
| { | |||||
| Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000); | |||||
| } | |||||
| catch (InterruptedException e) | |||||
| { | |||||
| CommonLogger.alarm.warn("Caught while PostBack thread sleeps: " + e); | |||||
| } | |||||
| default: | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| /** | |||||
| * | |||||
| * @param reportList | |||||
| * @return SUCCESS, | |||||
| * SERVER_IO_ERROR, when postback receiver has problem | |||||
| * IRRECOVERABLE_ERROR | |||||
| */ | |||||
| private PostBackStatus post(List<String> reportList) | |||||
| { | |||||
| StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8); | |||||
| xml.append("<"); xml.append(xmlTopElement); xml.append(">"); | |||||
| for (String report : reportList) | |||||
| { | |||||
| xml.append(report + "\r\n"); | |||||
| } | |||||
| xml.append("</"); xml.append(xmlTopElement); xml.append(">"); | |||||
| HttpPost httpPost = new HttpPost(postBackURL); | |||||
| StringEntity requestEntity; | |||||
| CloseableHttpResponse response = null; | |||||
| byte[] xmlBytes = null; | |||||
| try | |||||
| { | |||||
| requestEntity = (new StringEntity(xml.toString())); | |||||
| httpPost.setEntity(requestEntity); | |||||
| myLogger.debug("Posting to " + postBackURL + ": " + xml); | |||||
| response = httpclient.execute(httpPost, new BasicHttpContext()); | |||||
| StatusLine statusLine = response.getStatusLine(); | |||||
| int statusCode = statusLine.getStatusCode(); | |||||
| if (statusCode != 200) | |||||
| { | |||||
| CommonLogger.alarm.error("Got error status code " + statusCode + " while posting status to broadcast requester"); | |||||
| return PostBackStatus.HTTP_STATUS_ERROR; | |||||
| } | |||||
| } | |||||
| catch (UnsupportedEncodingException e) | |||||
| { | |||||
| CommonLogger.alarm.warn("While adding this application/xml content to PostBack: " + xml + " -- " + e); | |||||
| return PostBackStatus.IRRECOVERABLE_ERROR; | |||||
| } | |||||
| catch (IOException e) | |||||
| { | |||||
| CommonLogger.alarm.error("While posting back to broadcast requester: " + e); | |||||
| return PostBackStatus.IRRECOVERABLE_ERROR; | |||||
| } | |||||
| String xmlStr; | |||||
| try | |||||
| { | |||||
| xmlBytes = EntityUtils.toByteArray(response.getEntity()); | |||||
| xmlStr = new String(xmlBytes); | |||||
| myLogger.debug("Received resposne: " + xmlStr); | |||||
| } | |||||
| catch (IOException e) | |||||
| { | |||||
| CommonLogger.alarm.error("While getting response from posting to broadcast requester: " + e); | |||||
| return PostBackStatus.SERVER_IO_ERROR; | |||||
| } | |||||
| Document xmlDoc = null; | |||||
| try | |||||
| { | |||||
| DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); | |||||
| DocumentBuilder builder = factory.newDocumentBuilder(); | |||||
| xmlDoc = builder.parse(new ByteArrayInputStream(xmlBytes)); | |||||
| } | |||||
| catch (Exception e) | |||||
| { | |||||
| CommonLogger.alarm.warn("xml parse problem on received response from " + postBackURL + ": " + xmlStr); | |||||
| return PostBackStatus.IRRECOVERABLE_ERROR; | |||||
| } | |||||
| if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement)) | |||||
| { | |||||
| CommonLogger.alarm.warn("xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + xmlStr); | |||||
| return PostBackStatus.IRRECOVERABLE_ERROR; | |||||
| } | |||||
| XPath xpathEngine = XPathFactory.newInstance().newXPath(); | |||||
| String xpath = null; | |||||
| try | |||||
| { | |||||
| xpath = "@error"; | |||||
| Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE); | |||||
| if (errorNode != null) | |||||
| { | |||||
| String errorCode = errorNode.getNodeValue(); | |||||
| xpath = "error_text"; | |||||
| String errorText = (String)xpathEngine.evaluate(xpath, | |||||
| xmlDoc.getDocumentElement(), XPathConstants.STRING); | |||||
| CommonLogger.alarm.warn("Error response to <" + xmlTopElement + "> post back to " | |||||
| + postBackURL + " -- error code=\"" + errorCode + "\", error text = \"" | |||||
| + errorText + "\""); | |||||
| return PostBackStatus.IRRECOVERABLE_ERROR; | |||||
| } | |||||
| } | |||||
| catch (XPathExpressionException e) | |||||
| { | |||||
| CommonLogger.alarm.warn("Bad xpath: " + xpath); | |||||
| return PostBackStatus.IRRECOVERABLE_ERROR; | |||||
| } | |||||
| catch (Exception e) | |||||
| { | |||||
| CommonLogger.alarm.warn("While decoding post back response from server: " + e); | |||||
| return PostBackStatus.IRRECOVERABLE_ERROR; | |||||
| } | |||||
| myLogger.debug("returned from posting"); | |||||
| return PostBackStatus.SUCCESS; | |||||
| } | |||||
| public void terminate() | |||||
| { | |||||
| if (threadShouldStop) return; | |||||
| threadShouldStop = true; | |||||
| //Wait for at most 100 ms for thread to stop | |||||
| interrupt(); | |||||
| } | |||||
| } | |||||
| /** | |||||
| * Constructs a pool of threads doing posting from a common job queue, | |||||
| * to the supplied postBackURL. The top element of the XML that gets | |||||
| * posted back has the give name. | |||||
| * | |||||
| * Requires these System properties: | |||||
| * postback_max_queue_size | |||||
| * postback_threadpool_size | |||||
| * | |||||
| * @param postBackURL | |||||
| * @param xmlTopElementName | |||||
| * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is | |||||
| * not supplied nor valid. | |||||
| * @throws KeyStoreException | |||||
| * @throws NoSuchAlgorithmException | |||||
| * @throws KeyManagementException | |||||
| */ | |||||
| public PostBack(String postBackURL, String xmlTopElementName, | |||||
| int maxQueueSize, int senderPoolSize, int maxBatchSize) | |||||
| throws IllegalArgumentException, KeyManagementException, NoSuchAlgorithmException, KeyStoreException | |||||
| { | |||||
| if (postBackURL == null || postBackURL.length() == 0) | |||||
| { | |||||
| throw new IllegalArgumentException("PostBack class given null postBackURL"); | |||||
| } | |||||
| myName = "Postback-" + postBackURL; | |||||
| if (xmlTopElementName == null || xmlTopElementName.length() == 0) | |||||
| { | |||||
| throw new IllegalArgumentException(myName + ": PostBack class given null xmlTopElement"); | |||||
| } | |||||
| this.postBackURL = postBackURL; | |||||
| this.xmlTopElement = xmlTopElementName; | |||||
| this.maxQueueSize = maxQueueSize; | |||||
| this.maxBatchSize = maxBatchSize; | |||||
| postQueue = new LinkedList<String>(); | |||||
| threadsWaitingToPost = 0; | |||||
| cm = new PoolingHttpClientConnectionManager(); | |||||
| cm.setMaxTotal(senderPoolSize); | |||||
| cm.setDefaultMaxPerRoute(senderPoolSize); | |||||
| tustAllCerts = new TrustStrategy() { public boolean isTrusted(X509Certificate[] chain, String authType) { return true; } }; | |||||
| sslContext = SSLContextBuilder.create().loadTrustMaterial(tustAllCerts).build(); | |||||
| connectionFactory = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier()); | |||||
| // | |||||
| // Retry handler | |||||
| maxRetries = RETRIES_DEFAULT; | |||||
| HttpRequestRetryHandler retryHandler = new HttpRequestRetryHandler() | |||||
| { | |||||
| public boolean retryRequest( | |||||
| IOException exception, | |||||
| int executionCount, | |||||
| HttpContext context) | |||||
| { | |||||
| if (executionCount >= maxRetries) return false; | |||||
| return true; | |||||
| } | |||||
| }; | |||||
| httpclient = HttpClientBuilder.create() | |||||
| // .setConnectionManager(cm) | |||||
| // .setRetryHandler(retryHandler) | |||||
| .setSSLSocketFactory(connectionFactory) | |||||
| .build(); | |||||
| senderPool = new ArrayList<Sender>(); | |||||
| for (int i = 0; i < senderPoolSize; i++) | |||||
| { | |||||
| Sender sender = new Sender(myName + '-' + i); | |||||
| senderPool.add(sender); | |||||
| } | |||||
| } | |||||
| /** | |||||
| * Queues report to postQueue only if the queue size has not reached the | |||||
| * maxQueueSize. | |||||
| * @param report | |||||
| * @return true if report is added to queue, false otherwise (queue full) | |||||
| */ | |||||
| public boolean queueReport(String report) | |||||
| { | |||||
| // Log for recovery in case of problem in posting report. | |||||
| CommonLogger.activity.info("Attempting to queue report"); | |||||
| synchronized(postQueue) | |||||
| { | |||||
| for (;;) | |||||
| { | |||||
| if (postQueue.size() < maxQueueSize) | |||||
| { | |||||
| myLogger.debug("Queing report" + report); | |||||
| postQueue.add(report); | |||||
| myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); | |||||
| postQueue.notifyAll(); | |||||
| return true; | |||||
| } | |||||
| else | |||||
| { | |||||
| myLogger.debug("Waiting for space - postQueue size: " + postQueue.size()); | |||||
| try | |||||
| { | |||||
| threadsWaitingToPost++; | |||||
| myLogger.debug("Threads waiting to post: " + threadsWaitingToPost); | |||||
| postQueue.wait(QUEUE_WAIT * 1000); | |||||
| } | |||||
| catch (InterruptedException e) | |||||
| { | |||||
| break; | |||||
| } | |||||
| threadsWaitingToPost--; | |||||
| } | |||||
| } | |||||
| } | |||||
| myLogger.info("Interrupted while waiting for space to queue report"); | |||||
| return false; | |||||
| } | |||||
| public void terminate() | |||||
| { | |||||
| try | |||||
| { | |||||
| httpclient.close(); | |||||
| } | |||||
| catch (IOException e) | |||||
| { | |||||
| myLogger.error("Caught when closing HttpClient: " + e.getMessage()); | |||||
| } | |||||
| // Terminate postback threads | |||||
| for (Sender sender : senderPool) | |||||
| { | |||||
| sender.terminate(); | |||||
| } | |||||
| } | |||||
| } | |||||