Browse Source

Ability to allow Broadcast descendent classes to access properties.

The common portion of the CommEngine properties file, are made
available to Broadcasts.
tags/1.0.4
Yuk-Ming Lam 4 years ago
parent
commit
a5152269a6
2 changed files with 90 additions and 53 deletions
  1. +25
    -7
      src/main/java/altk/comm/engine/Broadcast.java
  2. +65
    -46
      src/main/java/altk/comm/engine/CommEngine.java

+ 25
- 7
src/main/java/altk/comm/engine/Broadcast.java View File

@@ -43,6 +43,7 @@ public abstract class Broadcast


String haltReason; String haltReason;
String stateErrorText; String stateErrorText;
public CommEngine commEngine;
public final long receiveTime; public final long receiveTime;
public long serviceStartTime; public long serviceStartTime;
public long serviceEndTime; public long serviceEndTime;
@@ -78,6 +79,7 @@ public abstract class Broadcast
private Integer transactions; private Integer transactions;
private Integer serviceActivityCount; private Integer serviceActivityCount;

public static enum BroadcastState public static enum BroadcastState
{ {
@@ -435,6 +437,7 @@ public abstract class Broadcast
{ {
myLogger.debug("Entering Broadcast.doPost method"); myLogger.debug("Entering Broadcast.doPost method");
BroadcastException myException = null; BroadcastException myException = null;
this.commEngine = commEngine;
try try
{ {
boolean notInService = commEngine.notInService(); boolean notInService = commEngine.notInService();
@@ -450,7 +453,6 @@ public abstract class Broadcast
} }
if (recipientList.size() == 0) if (recipientList.size() == 0)
{ {
// TODO: Got to return HTTP content before returning.
CommonLogger.activity.info("Broadcast " + getBroadcastId() + ": No recipients"); CommonLogger.activity.info("Broadcast " + getBroadcastId() + ": No recipients");
setState(BroadcastState.COMPLETED, "No recipients", null); setState(BroadcastState.COMPLETED, "No recipients", null);
return; return;
@@ -464,7 +466,6 @@ public abstract class Broadcast
} }
catch (BroadcastException e) catch (BroadcastException e)
{ {
// TODO: Got to return HTTP content before returning.
myException = e; myException = e;
setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText); setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText);
CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage());
@@ -510,9 +511,9 @@ public abstract class Broadcast
jobsTotal = recipientList.size(); jobsTotal = recipientList.size();
postback = new Postback(this, postback = new Postback(this,
commEngine.getPostbackMaxQueueSize(),
getPostbackMaxQueueSize(),
getPostbackSenderPoolSize(), getPostbackSenderPoolSize(),
commEngine.getPostbackMaxBatchSize());
getPostbackMaxBatchSize());
// Create service thread pool to dispatch jobs, // Create service thread pool to dispatch jobs,
// at the same time, setting up a list of service thread names // at the same time, setting up a list of service thread names
@@ -544,9 +545,25 @@ public abstract class Broadcast
} }
} }
protected abstract int getServiceThreadPoolSize();
protected int getPostbackMaxQueueSize()
{
return commEngine.getPostbackMaxQueueSize();
}
protected int getPostbackMaxBatchSize()
{
return commEngine.getPostbackMaxBatchSize();
}
protected abstract int getPostbackSenderPoolSize();
protected int getServiceThreadPoolSize()
{
return commEngine.getServiceThreadPoolSize();
}
protected int getPostbackSenderPoolSize()
{
return commEngine.getPostbackSenderPoolSize();
}
protected abstract void returnPrerequisites(ServicePrerequisites prerequisites); protected abstract void returnPrerequisites(ServicePrerequisites prerequisites);
@@ -607,7 +624,8 @@ public abstract class Broadcast
if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime; if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime;
if (postback != null) if (postback != null)
{ {
postback.queueReportFirst(mkStatusReport());
if (state == BroadcastState.ALLDONE) postback.queueReport(mkStatusReport());
else postback.queueReportFirst(mkStatusReport());
} }
return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev);
} }


+ 65
- 46
src/main/java/altk/comm/engine/CommEngine.java View File

@@ -27,7 +27,6 @@ import altk.comm.engine.Broadcast.BroadcastState;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public abstract class CommEngine extends HttpServlet public abstract class CommEngine extends HttpServlet
{ {
public static final String SERVICE_THREADPOOL_SIZE_KEY = "service_threadpool_size";


static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request";


@@ -35,10 +34,16 @@ public abstract class CommEngine extends HttpServlet
private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60; private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60;


public static final String SERVICE_THREADPOOL_SIZE_KEY = "service_threadpool_size";
private static final int SERVICE_THREADPOOL_SIZE_DEFAULT = 1; private static final int SERVICE_THREADPOOL_SIZE_DEFAULT = 1;


private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 2;
private static final String POSTBACK_THREADPOOL_SIZE_KEY = "postback_threadpool_size";
private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 20;
private static final String POSTBACK_MAX_QUEUE_SIZE_KEY = "postback_max_queue_size";
private static final int POSTBACK_MAX_QUEUE_SIZE_DEFAULT = 10000; private static final int POSTBACK_MAX_QUEUE_SIZE_DEFAULT = 10000;
private static final String POSTBACK_MAX_BATCH_SIZE_KEY = "postback_max_batch_size";
private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100; private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100;
/** /**
* Maps a broadcastId to a broadcast. * Maps a broadcastId to a broadcast.
@@ -69,14 +74,6 @@ public abstract class CommEngine extends HttpServlet


private int completedJobCount = 0; private int completedJobCount = 0;


private int serviceThreadPoolSize;

private int postbackMaxQueueSize;

private int postbackSenderPoolSize;

private int postbackMaxBatchSize;

protected String runtimeDirPath; protected String runtimeDirPath;
protected String confDirPath; protected String confDirPath;


@@ -193,25 +190,10 @@ public abstract class CommEngine extends HttpServlet
deadBroadcastViewingMinutes = Long.parseLong(periodStr); deadBroadcastViewingMinutes = Long.parseLong(periodStr);
CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes)); CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes));


String str = config.getProperty(SERVICE_THREADPOOL_SIZE_KEY,
new Integer(SERVICE_THREADPOOL_SIZE_DEFAULT).toString());
serviceThreadPoolSize = Integer.parseInt(str);
CommonLogger.startup.info(String.format("service thread pool size: %d", serviceThreadPoolSize));

String string = config.getProperty("postback_max_queue_size",
new Integer(POSTBACK_MAX_QUEUE_SIZE_DEFAULT).toString());
postbackMaxQueueSize = Integer.parseInt(string);
CommonLogger.activity.info("Postback max queue size = " + postbackMaxQueueSize);

string = config.getProperty("postback_threadpool_size",
new Integer(POSTBACK_THREADPOOL_SIZE_DEFAULT).toString());
postbackSenderPoolSize = Integer.parseInt(string);
CommonLogger.activity.info("Postback threadpool size = " + postbackSenderPoolSize);
string = config.getProperty("postback_max_batch_size",
new Integer(POSTBACK_MAX_BATCH_SIZE_DEFAULT).toString());
postbackMaxBatchSize = Integer.parseInt(string);
CommonLogger.activity.info("Postback max batch size = " + postbackMaxBatchSize);
CommonLogger.startup.info(String.format("service thread pool size: %d", getServiceThreadPoolSize()));
CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize());
CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize());
CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize());
scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE); scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE);
scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}}, scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}},
@@ -219,6 +201,60 @@ public abstract class CommEngine extends HttpServlet


initChild(); initChild();
} }
public int getServiceThreadPoolSize()
{
return getServiceThreadPoolSize(config);
}
public int getServiceThreadPoolSize(Properties properties)
{
String str = properties.
getProperty(SERVICE_THREADPOOL_SIZE_KEY, String.valueOf(SERVICE_THREADPOOL_SIZE_DEFAULT));
int size = Integer.valueOf(str);
return size;
}

public int getPostbackSenderPoolSize()
{
return getPostbackSenderPoolSize(config);
}
public int getPostbackSenderPoolSize(Properties properties)
{
String str = properties.
getProperty(POSTBACK_THREADPOOL_SIZE_KEY, String.valueOf(POSTBACK_THREADPOOL_SIZE_DEFAULT));
int size = Integer.valueOf(str);
return size;
}
public int getPostbackMaxQueueSize()
{
return getPostbackMaxQueueSize(config);
}
public int getPostbackMaxQueueSize(Properties properties)
{
String str = properties.
getProperty(POSTBACK_MAX_QUEUE_SIZE_KEY, String.valueOf(POSTBACK_MAX_QUEUE_SIZE_DEFAULT));
int size = Integer.valueOf(str);
return size;
}
public int getPostbackMaxBatchSize()
{
return getPostbackMaxBatchSize(config);
}
public int getPostbackMaxBatchSize(Properties properties)
{
String str = properties.
getProperty(POSTBACK_MAX_BATCH_SIZE_KEY, String.valueOf(POSTBACK_MAX_BATCH_SIZE_DEFAULT));
int size = Integer.valueOf(str);
return size;
}



protected void purgeStaleBroadcasts() protected void purgeStaleBroadcasts()
{ {
@@ -521,21 +557,4 @@ public abstract class CommEngine extends HttpServlet
broadcasts.put(broadcastId, broadcast); broadcasts.put(broadcastId, broadcast);
} }


@Deprecated
public int getServiceThreadPoolSize()
{
return serviceThreadPoolSize;
}

public int getPostbackMaxQueueSize() {
return postbackMaxQueueSize;
}

public int getPostbackSenderPoolSize() {
return postbackSenderPoolSize;
}

public int getPostbackMaxBatchSize() {
return postbackMaxBatchSize;
}
} }

Loading…
Cancel
Save