浏览代码

Change to require the service threads to wait for space in the postQueue in order to post reports, instead of dropping reports. Better diagnostic logging to help identify threads' life and death events.

tags/Production_2018_09_22
ymlam 7 年前
父节点
当前提交
1ed5321d14
共有 1 个文件被更改,包括 37 次插入12 次删除
  1. +37
    -12
      src/altk/comm/engine/postback/PostBack.java

+ 37
- 12
src/altk/comm/engine/postback/PostBack.java 查看文件

@@ -57,6 +57,8 @@ public class PostBack
private final String myName;
private int maxBatchSize;

private int threadsWaitingToPost;

private static Logger myLogger = Logger.getLogger(PostBack.class);

public enum PostBackStatus
@@ -110,7 +112,7 @@ public class PostBack
if (reportList.size() > 0)
{
myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size()));
postQueue.notify();
postQueue.notifyAll();
break; // break out to do the work.
}
@@ -118,6 +120,7 @@ public class PostBack
// queue again.
try
{
myLogger.debug("Going to wait " + QUEUE_WAIT * 1000);
postQueue.wait(QUEUE_WAIT * 1000);
}
catch (InterruptedException e)
@@ -214,7 +217,9 @@ public class PostBack
{
CommonLogger.activity.info("redirected to " + url);
}
long beginPost = System.currentTimeMillis();
int statusCode = client.executeMethod(post);
long postingTime = System.currentTimeMillis() - beginPost; // msec
if (statusCode == 302)
{
@@ -238,6 +243,7 @@ public class PostBack
}
responseBody = post.getResponseBodyAsString().trim();
post.releaseConnection();
myLogger.debug("Postback time (msec): " + postingTime);
CommonLogger.activity.info("Received response: " + (responseBody.length() == 0? "[empty]" : responseBody));
if (responseBody.trim().length() == 0) return PostBackStatus.SUCCESS;
break;
@@ -357,6 +363,7 @@ public class PostBack
this.maxQueueSize = maxQueueSize;
this.maxBatchSize = maxBatchSize;
postQueue = new LinkedList<String>();
threadsWaitingToPost = 0;
senderPool = new ArrayList<Sender>();
for (int i = 0; i < senderPoolSize; i++)
@@ -374,22 +381,40 @@ public class PostBack
*/
public boolean queueReport(String report)
{
// Log for recovery in case of problem in posting report.
CommonLogger.activity.info("Attempting to queue report");
synchronized(postQueue)
{
if (postQueue.size() < maxQueueSize)
{
CommonLogger.activity.info("queing report: " + report);
postQueue.add(report);
myLogger.debug("postQueue size: " + postQueue.size());
postQueue.notify();
return true;
}
else
for (;;)
{
CommonLogger.alarm.warn("Queue full - did not queue report: " + report + " -- postQueue size: " + postQueue.size());
return false;
if (postQueue.size() < maxQueueSize)
{
myLogger.debug("Queing report" + report);
postQueue.add(report);
myLogger.debug("Added 1 report - postQueue size: " + postQueue.size());
postQueue.notifyAll();
return true;
}
else
{
myLogger.debug("Waiting for space - postQueue size: " + postQueue.size());
try
{
threadsWaitingToPost++;
myLogger.debug("Threads waiting to post: " + threadsWaitingToPost);
postQueue.wait(QUEUE_WAIT * 1000);
}
catch (InterruptedException e)
{
break;
}
threadsWaitingToPost--;
}
}
}
myLogger.info("Interrupted while waiting for space to queue report");
return false;
}
/**


正在加载...
取消
保存