diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 917f249..d3646f4 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -24,7 +24,6 @@ import altk.comm.engine.exception.BroadcastException; import altk.comm.engine.exception.EngineException; import altk.comm.engine.exception.PlatformError; import altk.comm.engine.exception.PlatformException; -import altk.comm.engine.postback.PostBack; /** * Broadcast class absorbs what was formerly known as Dispatcher class. @@ -68,8 +67,8 @@ public abstract class Broadcast private String launchRecordId; // protected XPath xpathEngine; - protected String postBackURL; - private PostBack postBack; + protected String postbackURL; + private Postback postback; public long expireTime; /** @@ -83,7 +82,7 @@ public abstract class Broadcast protected List serviceThreadPool; private Object resumeFlag; // Semaphore for dispatcher threads to resume. protected List recipientList; - //private int remainingJobs; + private int jobReportsQueued; private ScheduledExecutorService scheduler; private int serviceThreadPoolSize; @@ -381,6 +380,8 @@ public abstract class Broadcast } } + + /** * * @param broadcastType @@ -392,12 +393,15 @@ public abstract class Broadcast this.broadcastType = broadcastType; this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName; this.jobReportRootNodeName = jobReportRootNodeName; + + jobReportsQueued = 0; sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT; readyQueue = new LinkedBlockingQueue(); serviceThreadPool = new ArrayList(); recipientList = new ArrayList(); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); resumeFlag = new Object(); + receiveTime = System.currentTimeMillis(); } /** @@ -455,9 +459,8 @@ public abstract class Broadcast setState(BroadcastState.COMPLETED, "No recipients", null); return; } - postBack = (PostBack)commEngine.getPostBack(getPostBackURL(), broadcastType); initSync(commEngine.getResources()); - init(postBack); + init(); if (getState() == BroadcastState.COMPLETED) return; } catch (BroadcastException e) @@ -506,7 +509,12 @@ public abstract class Broadcast { initAsync(); - effectiveJobCount = recipientList.size(); + int jobsTotal = recipientList.size(); + effectiveJobCount = jobsTotal; + postback = new Postback(this, + commEngine.getPostbackMaxQueueSize(), + commEngine.getPostbackSenderPoolSize(), + commEngine.getPostbackMaxBatchSize()); // Create service thread pool to dispatch jobs, // at the same time, setting up a list of service thread names @@ -516,7 +524,7 @@ public abstract class Broadcast List serviceThreadNames = new ArrayList(); for (int i = 0; i < serviceThreadPoolSize; i++) { - String threadName = broadcastId + "_service_thread_" + i; + String threadName = broadcastId + "-service-thread." + i; Service serviceThread = new Service(threadName); serviceThreadPool.add(serviceThread); serviceThreadNames.add(threadName); @@ -531,7 +539,11 @@ public abstract class Broadcast CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); } - + catch (Exception e) + { + CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); + myLogger.error("Broadcast aborted", e); + } } protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); @@ -595,9 +607,9 @@ public abstract class Broadcast this.stateErrorText = stateErrorText; CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, prev, state)); - if (postBack != null) + if (postback != null) { - postBack.queueReport(mkStatusReport()); + postback.queueReport(mkStatusReport()); } return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); @@ -687,9 +699,9 @@ public abstract class Broadcast return responseXML.toString(); } - public String getPostBackURL() + public String getPostbackURL() { - return postBackURL; + return postbackURL; } protected String mkResponseXML(String errorCode, String errorText) @@ -878,16 +890,12 @@ public abstract class Broadcast * * @throws BroadcastException */ - protected final void init(PostBack postBack) + protected final void init() { - // Remember postBack - this.postBack = postBack; - for (Recipient recipient : recipientList) { readyQueue.add(mkJob(recipient)); } - //remainingJobs = readyQueue.size(); } protected abstract void initSync(EngineResources resources) throws BroadcastException; @@ -1012,7 +1020,7 @@ public abstract class Broadcast */ protected void close() { - // Do nothing in base class + postback = null; } /** @@ -1075,31 +1083,6 @@ public abstract class Broadcast public void postJobStatus(Job job) { postJobStatus(job, -1); - /* - if (postBack != null) - { - JobReport report = mkJobReport(); - report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); - report.init(job); - postBack.queueReport(report.toString()); - } - - if (job.jobStatus.isTerminal()) - { - remainingJobs--; - completedJobCount++; - - if (remainingJobs == 0) - { - terminate(BroadcastState.COMPLETED); - } - else if (getActiveJobCount() == 0) - { - if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); - else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED); - } - } - */ } /** @@ -1114,12 +1097,13 @@ public abstract class Broadcast //postJobStatus(job); logJobCount("Entering postJobStatus"); myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS); - if (postBack != null) + if (postback != null) { JobReport report = mkJobReport(); report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); report.init(job); - postBack.queueReport(report.toString()); + postback.queueReport(report.toString()); + jobReportsQueued++; } //if (job.jobStatus.isTerminal()) @@ -1135,7 +1119,8 @@ public abstract class Broadcast incrementCompletedCount(); logJobCount("Completed a job"); - if (getRemainingJobCount() == 0) + //if (getRemainingJobCount() == 0) + if (jobReportsQueued == recipientList.size()) { terminate(BroadcastState.COMPLETED); } @@ -1225,4 +1210,8 @@ public abstract class Broadcast { this.serviceThreadPoolSize = serviceThreadPoolSize; } + + public String getBroadcastType() { + return broadcastType; + } } diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index bcc5c29..3fc050c 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -5,9 +5,6 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; @@ -26,11 +23,6 @@ import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import altk.comm.engine.Broadcast.BroadcastState; -import altk.comm.engine.exception.BroadcastError; -import altk.comm.engine.exception.BroadcastException; -import altk.comm.engine.exception.PlatformError; -import altk.comm.engine.exception.PlatformException; -import altk.comm.engine.postback.PostBack; @SuppressWarnings("serial") public abstract class CommEngine extends HttpServlet @@ -55,8 +47,6 @@ public abstract class CommEngine extends HttpServlet protected Properties config; - protected Map postBackMap; - protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice" private long startupTimestamp; @@ -64,8 +54,6 @@ public abstract class CommEngine extends HttpServlet // Sequencing naming of broadcast that fails to yield its broadcastId private int unknownBroadcastIdNdx = 1; - private BroadcastException myException; - /** * Used to communicate media-specific platform resources to broadcasts */ @@ -97,7 +85,6 @@ public abstract class CommEngine extends HttpServlet this.engineName = engineName; broadcasts = new HashMap(); startupTimestamp = System.currentTimeMillis(); - myException = null; } /** @@ -198,8 +185,6 @@ public abstract class CommEngine extends HttpServlet return; } - postBackMap = new HashMap(); - // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes String periodStr = config.getProperty("dead_broadcast_viewing_period", new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString()); @@ -492,12 +477,6 @@ public abstract class CommEngine extends HttpServlet // Shutdown threads that periodically purge stale broadcasts. scheduler.shutdownNow(); - // Kill threads in each PostBack, which is remembered in postBackMap. - for (PostBack postback : postBackMap.values()) - { - postback.terminate(); - } - for (Broadcast broadcast : broadcasts.values()) { broadcast.terminate(BroadcastState.ABORTED, "Platform termination"); @@ -517,23 +496,6 @@ public abstract class CommEngine extends HttpServlet */ abstract protected void destroyChild(); - public PostBack getPostBack(String postBackURL, String broadcastType) throws BroadcastException - { - if (postBackURL == null) return null; - - PostBack postBack = postBackMap.get(postBackURL); - if (postBack != null) return postBack; - - try { - postBack = new PostBack(postBackURL, broadcastType + "_status", - postbackMaxQueueSize, postbackSenderPoolSize, postbackMaxBatchSize); - } catch (KeyManagementException | IllegalArgumentException | NoSuchAlgorithmException | KeyStoreException e) { - throw new BroadcastException(BroadcastError.PLATFORM_ERROR, e.getMessage(), e); - } - postBackMap.put(postBackURL, postBack); - return postBack; - } - public EngineResources getResources() { return resources; @@ -561,4 +523,16 @@ public abstract class CommEngine extends HttpServlet { return serviceThreadPoolSize; } + + public int getPostbackMaxQueueSize() { + return postbackMaxQueueSize; + } + + public int getPostbackSenderPoolSize() { + return postbackSenderPoolSize; + } + + public int getPostbackMaxBatchSize() { + return postbackMaxBatchSize; + } } diff --git a/src/main/java/altk/comm/engine/XMLDOMBroadcast.java b/src/main/java/altk/comm/engine/XMLDOMBroadcast.java index a5662d9..c844cb7 100644 --- a/src/main/java/altk/comm/engine/XMLDOMBroadcast.java +++ b/src/main/java/altk/comm/engine/XMLDOMBroadcast.java @@ -158,9 +158,9 @@ public abstract class XMLDOMBroadcast extends Broadcast } // Postback - postBackURL = getStringValue("async_status_post_back", broadcastNode); - if (postBackURL != null && (postBackURL=postBackURL.trim()).length() == 0) postBackURL = null; - if (postBackURL == null) + postbackURL = getStringValue("async_status_post_back", broadcastNode); + if (postbackURL != null && (postbackURL=postbackURL.trim()).length() == 0) postbackURL = null; + if (postbackURL == null) { CommonLogger.alarm.warn("Missing asyn_status_post_back in POST data"); } diff --git a/src/main/java/altk/comm/engine/XMLSAXBroadcast.java b/src/main/java/altk/comm/engine/XMLSAXBroadcast.java index 7dd82a0..6e0fc4e 100644 --- a/src/main/java/altk/comm/engine/XMLSAXBroadcast.java +++ b/src/main/java/altk/comm/engine/XMLSAXBroadcast.java @@ -100,7 +100,7 @@ public abstract class XMLSAXBroadcast extends Broadcast } else if (qName.equals("async_status_post_back")) { - postBackURL = getTrimmedText(); + postbackURL = getTrimmedText(); } else if (qName.equals("expire_time")) { diff --git a/src/main/java/altk/comm/engine/postback/PostBack.java b/src/main/java/altk/comm/engine/postback/PostBack.java deleted file mode 100644 index e02bfc8..0000000 --- a/src/main/java/altk/comm/engine/postback/PostBack.java +++ /dev/null @@ -1,452 +0,0 @@ -package altk.comm.engine.postback; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; - -import javax.net.ssl.SSLContext; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.xpath.XPath; -import javax.xml.xpath.XPathConstants; -import javax.xml.xpath.XPathExpressionException; -import javax.xml.xpath.XPathFactory; - -import org.apache.http.StatusLine; -import org.apache.http.client.HttpRequestRetryHandler; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.conn.ssl.TrustStrategy; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.http.protocol.BasicHttpContext; -import org.apache.http.protocol.HttpContext; -import org.apache.http.ssl.SSLContextBuilder; -import org.apache.http.util.EntityUtils; -import org.apache.log4j.Logger; -import org.w3c.dom.Document; -import org.w3c.dom.Node; - -import altk.comm.engine.CommonLogger; - - -/** - * Queues JobReports to be posted back to attribute postBackURL. - * Multiple internal class Sender members consume this postQueue, sending items - * in postQueue to postBackURL. - * - * In the future, if postBackURL has problem, or if - * length of postQueue is more than a MAX_QUEUE_LENGTH, then it starts writing - * everything to backingFile. - * - * @author Kwong - * - */ -public class PostBack -{ - private static final String XML_VERSION_1_0_ENCODING_UTF_8 = ""; - - private static final int QUEUE_WAIT = 300; // seconds - private static final int POSTBACK_SERVER_WAIT_TIME = 10; // seconds - - private static final int RETRIES_DEFAULT = 3; - - private final String postBackURL; - private final String xmlTopElement; - private Queue postQueue; - private final int maxQueueSize; - private List senderPool; - private final String myName; - private int maxBatchSize; - - private PoolingHttpClientConnectionManager cm; - private int threadsWaitingToPost; - private TrustStrategy tustAllCerts; - SSLContext sslContext; - SSLConnectionSocketFactory connectionFactory; - CloseableHttpClient httpclient; - - private int maxRetries; - - - - private static Logger myLogger = Logger.getLogger(PostBack.class); - - public enum PostBackStatus - { - SUCCESS, - SERVER_IO_ERROR, - IRRECOVERABLE_ERROR, - HTTP_STATUS_ERROR - } - - class Sender extends Thread - { - private boolean threadShouldStop; - - - private Sender(String name) - { - setName(name); - start(); - } - - public void run() - { - threadShouldStop = false; - - myLogger.info(getName() + " started"); - - String report; - - for (;;) // Each iteration sends a batch - { - if (threadShouldStop) - { - myLogger.info(getName() + " terminating"); - System.out.println(getName() + " terminating"); - return; - } - myLogger.debug("Looking for reports"); - List reportList = null; - synchronized(postQueue) - { - // Each iteration examines the queue for a batch to send - for (;;) - { - reportList = new ArrayList(); - for (int i = 0; i < maxBatchSize ; i++) - { - report = postQueue.poll(); - if (report == null) break; - reportList.add(report); - } - if (reportList.size() > 0) - { - myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); - postQueue.notifyAll(); - break; // break out to do the work. - } - - // Nothing to do, so wait a while, and look at the - // queue again. - try - { - myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); - postQueue.wait(QUEUE_WAIT * 1000); - } - catch (InterruptedException e) - { - CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e); - break; - } - CommonLogger.health.info("Surfacing from wait"); - System.out.println(getName() + " surfacing from wait"); - continue; - } - } // synchronized() - if (reportList != null && reportList.size() > 0) - { - switch (post(reportList)) - { - case IRRECOVERABLE_ERROR: - case SUCCESS: - break; - - case SERVER_IO_ERROR: - /* Should not requeue report for this may lead to dead lock on this queu. - // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log. - // Re-queue these reports - for (String rpt : reportList) - { - queueReport(rpt); - } - */ - // Sleep for a while before retrying this PostBack server. - CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); - try - { - Thread.sleep(POSTBACK_SERVER_WAIT_TIME * 1000); - } - catch (InterruptedException e) - { - CommonLogger.alarm.warn("Caught while PostBack thread sleeps: " + e); - } - default: - } - } - } - } - - /** - * - * @param reportList - * @return SUCCESS, - * SERVER_IO_ERROR, when postback receiver has problem - * IRRECOVERABLE_ERROR - */ - private PostBackStatus post(List reportList) - { - StringBuffer xml = new StringBuffer(XML_VERSION_1_0_ENCODING_UTF_8); - xml.append("<"); xml.append(xmlTopElement); xml.append(">"); - for (String report : reportList) - { - xml.append(report + "\r\n"); - } - xml.append(""); - - HttpPost httpPost = new HttpPost(postBackURL); - StringEntity requestEntity; - CloseableHttpResponse response = null; - byte[] xmlBytes = null; - try - { - requestEntity = (new StringEntity(xml.toString())); - httpPost.setEntity(requestEntity); - myLogger.debug("Posting to " + postBackURL + ": " + xml); - response = httpclient.execute(httpPost, new BasicHttpContext()); - StatusLine statusLine = response.getStatusLine(); - int statusCode = statusLine.getStatusCode(); - if (statusCode != 200) - { - CommonLogger.alarm.error("Got error status code " + statusCode + " while posting status to broadcast requester"); - return PostBackStatus.HTTP_STATUS_ERROR; - } - } - catch (UnsupportedEncodingException e) - { - CommonLogger.alarm.warn("While adding this application/xml content to PostBack: " + xml + " -- " + e); - return PostBackStatus.IRRECOVERABLE_ERROR; - } - catch (IOException e) - { - CommonLogger.alarm.error("While posting back to broadcast requester: " + e); - return PostBackStatus.IRRECOVERABLE_ERROR; - } - - - String xmlStr; - try - { - xmlBytes = EntityUtils.toByteArray(response.getEntity()); - xmlStr = new String(xmlBytes); - myLogger.debug("Received resposne: " + xmlStr); - } - catch (IOException e) - { - CommonLogger.alarm.error("While getting response from posting to broadcast requester: " + e); - return PostBackStatus.SERVER_IO_ERROR; - } - - Document xmlDoc = null; - try - { - DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - DocumentBuilder builder = factory.newDocumentBuilder(); - xmlDoc = builder.parse(new ByteArrayInputStream(xmlBytes)); - } - catch (Exception e) - { - CommonLogger.alarm.warn("xml parse problem on received response from " + postBackURL + ": " + xmlStr); - return PostBackStatus.IRRECOVERABLE_ERROR; - } - if (!xmlDoc.getDocumentElement().getNodeName().startsWith(xmlTopElement)) - { - CommonLogger.alarm.warn("xml response from " + postBackURL + " not a <" + xmlTopElement + "> response: " + xmlStr); - return PostBackStatus.IRRECOVERABLE_ERROR; - } - XPath xpathEngine = XPathFactory.newInstance().newXPath(); - String xpath = null; - try - { - xpath = "@error"; - Node errorNode = (Node)xpathEngine.evaluate(xpath, xmlDoc.getDocumentElement(), XPathConstants.NODE); - if (errorNode != null) - { - String errorCode = errorNode.getNodeValue(); - xpath = "error_text"; - String errorText = (String)xpathEngine.evaluate(xpath, - xmlDoc.getDocumentElement(), XPathConstants.STRING); - CommonLogger.alarm.warn("Error response to <" + xmlTopElement + "> post back to " - + postBackURL + " -- error code=\"" + errorCode + "\", error text = \"" - + errorText + "\""); - return PostBackStatus.IRRECOVERABLE_ERROR; - } - } - catch (XPathExpressionException e) - { - CommonLogger.alarm.warn("Bad xpath: " + xpath); - return PostBackStatus.IRRECOVERABLE_ERROR; - } - catch (Exception e) - { - CommonLogger.alarm.warn("While decoding post back response from server: " + e); - return PostBackStatus.IRRECOVERABLE_ERROR; - } - myLogger.debug("returned from posting"); - return PostBackStatus.SUCCESS; - } - - public void terminate() - { - if (threadShouldStop) return; - - threadShouldStop = true; - - //Wait for at most 100 ms for thread to stop - interrupt(); - } - - } - - /** - * Constructs a pool of threads doing posting from a common job queue, - * to the supplied postBackURL. The top element of the XML that gets - * posted back has the give name. - * - * Requires these System properties: - * postback_max_queue_size - * postback_threadpool_size - * - * @param postBackURL - * @param xmlTopElementName - * @throws IllegalArgumentException if either postBackURL or xmlTopElementName is - * not supplied nor valid. - * @throws KeyStoreException - * @throws NoSuchAlgorithmException - * @throws KeyManagementException - */ - public PostBack(String postBackURL, String xmlTopElementName, - int maxQueueSize, int senderPoolSize, int maxBatchSize) - throws IllegalArgumentException, KeyManagementException, NoSuchAlgorithmException, KeyStoreException - { - if (postBackURL == null || postBackURL.length() == 0) - { - throw new IllegalArgumentException("PostBack class given null postBackURL"); - } - myName = "Postback-" + postBackURL; - if (xmlTopElementName == null || xmlTopElementName.length() == 0) - { - throw new IllegalArgumentException(myName + ": PostBack class given null xmlTopElement"); - } - this.postBackURL = postBackURL; - this.xmlTopElement = xmlTopElementName; - this.maxQueueSize = maxQueueSize; - this.maxBatchSize = maxBatchSize; - postQueue = new LinkedList(); - threadsWaitingToPost = 0; - cm = new PoolingHttpClientConnectionManager(); - cm.setMaxTotal(senderPoolSize); - cm.setDefaultMaxPerRoute(senderPoolSize); - - tustAllCerts = new TrustStrategy() { public boolean isTrusted(X509Certificate[] chain, String authType) { return true; } }; - - sslContext = SSLContextBuilder.create().loadTrustMaterial(tustAllCerts).build(); - connectionFactory = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier()); - - // - // Retry handler - maxRetries = RETRIES_DEFAULT; - HttpRequestRetryHandler retryHandler = new HttpRequestRetryHandler() - { - public boolean retryRequest( - IOException exception, - int executionCount, - HttpContext context) - { - if (executionCount >= maxRetries) return false; - return true; - } - }; - - httpclient = HttpClientBuilder.create() - // .setConnectionManager(cm) - // .setRetryHandler(retryHandler) - .setSSLSocketFactory(connectionFactory) - .build(); - - senderPool = new ArrayList(); - for (int i = 0; i < senderPoolSize; i++) - { - Sender sender = new Sender(myName + '-' + i); - senderPool.add(sender); - } - } - - /** - * Queues report to postQueue only if the queue size has not reached the - * maxQueueSize. - * @param report - * @return true if report is added to queue, false otherwise (queue full) - */ - public boolean queueReport(String report) - { - // Log for recovery in case of problem in posting report. - CommonLogger.activity.info("Attempting to queue report"); - - synchronized(postQueue) - { - for (;;) - { - if (postQueue.size() < maxQueueSize) - { - myLogger.debug("Queing report" + report); - postQueue.add(report); - myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); - postQueue.notifyAll(); - return true; - } - else - { - myLogger.debug("Waiting for space - postQueue size: " + postQueue.size()); - try - { - threadsWaitingToPost++; - myLogger.debug("Threads waiting to post: " + threadsWaitingToPost); - postQueue.wait(QUEUE_WAIT * 1000); - } - catch (InterruptedException e) - { - break; - } - threadsWaitingToPost--; - } - } - } - myLogger.info("Interrupted while waiting for space to queue report"); - return false; - } - - - public void terminate() - { - try - { - httpclient.close(); - } - catch (IOException e) - { - myLogger.error("Caught when closing HttpClient: " + e.getMessage()); - } - - // Terminate postback threads - for (Sender sender : senderPool) - { - sender.terminate(); - } - - } - -}