From 1ed5321d14ece697c3576e7293847a564118a4e3 Mon Sep 17 00:00:00 2001 From: ymlam Date: Sat, 22 Sep 2018 20:34:55 +0000 Subject: [PATCH] Change to require the service threads to wait for space in the postQueue in order to post reports, instead of dropping reports. Better diagnostic logging to help identify threads' life and death events. --- src/altk/comm/engine/postback/PostBack.java | 49 ++++++++++++++++----- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/src/altk/comm/engine/postback/PostBack.java b/src/altk/comm/engine/postback/PostBack.java index 8b3be62..71f9cd7 100644 --- a/src/altk/comm/engine/postback/PostBack.java +++ b/src/altk/comm/engine/postback/PostBack.java @@ -57,6 +57,8 @@ public class PostBack private final String myName; private int maxBatchSize; + private int threadsWaitingToPost; + private static Logger myLogger = Logger.getLogger(PostBack.class); public enum PostBackStatus @@ -110,7 +112,7 @@ public class PostBack if (reportList.size() > 0) { myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); - postQueue.notify(); + postQueue.notifyAll(); break; // break out to do the work. } @@ -118,6 +120,7 @@ public class PostBack // queue again. try { + myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); postQueue.wait(QUEUE_WAIT * 1000); } catch (InterruptedException e) @@ -214,7 +217,9 @@ public class PostBack { CommonLogger.activity.info("redirected to " + url); } + long beginPost = System.currentTimeMillis(); int statusCode = client.executeMethod(post); + long postingTime = System.currentTimeMillis() - beginPost; // msec if (statusCode == 302) { @@ -238,6 +243,7 @@ public class PostBack } responseBody = post.getResponseBodyAsString().trim(); post.releaseConnection(); + myLogger.debug("Postback time (msec): " + postingTime); CommonLogger.activity.info("Received response: " + (responseBody.length() == 0? "[empty]" : responseBody)); if (responseBody.trim().length() == 0) return PostBackStatus.SUCCESS; break; @@ -357,6 +363,7 @@ public class PostBack this.maxQueueSize = maxQueueSize; this.maxBatchSize = maxBatchSize; postQueue = new LinkedList(); + threadsWaitingToPost = 0; senderPool = new ArrayList(); for (int i = 0; i < senderPoolSize; i++) @@ -374,22 +381,40 @@ public class PostBack */ public boolean queueReport(String report) { + // Log for recovery in case of problem in posting report. + CommonLogger.activity.info("Attempting to queue report"); + synchronized(postQueue) { - if (postQueue.size() < maxQueueSize) - { - CommonLogger.activity.info("queing report: " + report); - postQueue.add(report); - myLogger.debug("postQueue size: " + postQueue.size()); - postQueue.notify(); - return true; - } - else + for (;;) { - CommonLogger.alarm.warn("Queue full - did not queue report: " + report + " -- postQueue size: " + postQueue.size()); - return false; + if (postQueue.size() < maxQueueSize) + { + myLogger.debug("Queing report" + report); + postQueue.add(report); + myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); + postQueue.notifyAll(); + return true; + } + else + { + myLogger.debug("Waiting for space - postQueue size: " + postQueue.size()); + try + { + threadsWaitingToPost++; + myLogger.debug("Threads waiting to post: " + threadsWaitingToPost); + postQueue.wait(QUEUE_WAIT * 1000); + } + catch (InterruptedException e) + { + break; + } + threadsWaitingToPost--; + } } } + myLogger.info("Interrupted while waiting for space to queue report"); + return false; } /**