Преглед на файлове

API to configure daily clock

tags/1.0.14
ymlam преди 3 години
родител
ревизия
9c8d6cab4d
променени са 2 файла, в които са добавени 193 реда и са изтрити 59 реда
  1. +45
    -3
      src/main/java/altk/comm/engine/Broadcast.java
  2. +148
    -56
      src/main/java/altk/comm/engine/CommEngine.java

+ 45
- 3
src/main/java/altk/comm/engine/Broadcast.java Целия файл

@@ -19,6 +19,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.log4j.Logger;
import org.json.simple.JSONObject;

import altk.comm.engine.Job.JobStatus;
import altk.comm.engine.exception.BroadcastError;
@@ -38,6 +39,7 @@ public abstract class Broadcast
private static final int SCHEDULER_THREAD_POOL_SIZE = 5;
private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id";
private static final long SLEEP_BETWEEN_JOBS_DEFAULT = 0;

public final String broadcastType;
private String broadcastId;
@@ -63,8 +65,8 @@ public abstract class Broadcast
protected String postbackURL;
private Postback postback;
public long expireTime;
protected String clock_24hr_resume;
protected String clock_24hr_pause;
protected String daily_resume = "";
protected String daily_pause = "";
/**
* Sleep time in milliseconds between consecutive job processing (actualliy batch)
@@ -238,7 +240,7 @@ public abstract class Broadcast
BroadcastState.RUNNING, // User action
BroadcastState.COMPLETED,
BroadcastState.ABORTING,
BroadcastState.CANCELED, // User action
BroadcastState.CANCELING, // User action
BroadcastState.PAUSED,
BroadcastState.PURGED // User action
));
@@ -888,6 +890,8 @@ public abstract class Broadcast
"' ready='" + getPendingJobCount() + "'");
statusBf.append(" active='" + getActiveJobCount() + "'");
statusBf.append("></job_summary>");
statusBf.append("<daily_pause>" + daily_pause + "</daily_pause>\n");
statusBf.append("<daily_resume>" + daily_resume + "</daily_resume>\n");
statusBf.append(additionalStatusXML());
statusBf.append("</" + topLevelTag + ">");
String statusReport = statusBf.toString();
@@ -1405,4 +1409,42 @@ public abstract class Broadcast
}
}

/**
* @return null or configuration in XML
*/
public String getConfigXML()
{
StringBuffer configBuf = new StringBuffer();
configBuf.append("<broadcast_configuration broadcast_id='" + broadcastId + "'>");
configBuf.append("<" + CommEngine.DAILY_PAUSE_KEY + ">" + daily_pause + "</" + CommEngine.DAILY_PAUSE_KEY + ">");
configBuf.append("<" + CommEngine.DAILY_RESUME_KEY + ">" + daily_resume + "</" + CommEngine.DAILY_RESUME_KEY + ">");
configBuf.append("</broadcast_configuration>");
return configBuf.toString();
}

public void configure(JSONObject configuration) throws Exception {
String value, timeOfDay, key;
key = CommEngine.DAILY_PAUSE_KEY;
value = (String)configuration.get(key);
if (value != null) {
timeOfDay = CommEngine.checkTimeOfDay(value);
if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value));
daily_pause = timeOfDay;
}
key = CommEngine.DAILY_RESUME_KEY;
value = (String)configuration.get(key);
if (value != null) {
timeOfDay = CommEngine.checkTimeOfDay(value);
if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value));
daily_resume = timeOfDay;
}
}

public JSONObject getConfigJSON() {
Map<Object, Object> dataMap = new HashMap<Object, Object>();
dataMap.put(CommEngine.DAILY_PAUSE_KEY, daily_pause);
dataMap.put(CommEngine.DAILY_RESUME_KEY, daily_resume);
return new JSONObject(dataMap);
}

}

+ 148
- 56
src/main/java/altk/comm/engine/CommEngine.java Целия файл

@@ -5,7 +5,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Enumeration;
@@ -17,12 +16,14 @@ import java.util.Vector;
import java.util.concurrent.ScheduledExecutorService;

import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;

@@ -32,9 +33,9 @@ import altk.comm.engine.Broadcast.BroadcastState;
public abstract class CommEngine extends HttpServlet
{

private static final String CLOCK_24HR_PAUSE_KEY = "clock_24hr_pause";
static final String DAILY_PAUSE_KEY = "daily_pause";

private static final String CLOCK_24HR_RESUME_KEY = "clock_24hr_resume";
static final String DAILY_RESUME_KEY = "daily_resume";

static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request";

@@ -87,13 +88,13 @@ public abstract class CommEngine extends HttpServlet
protected String runtimeDirPath;
protected String confDirPath;
/** Daily resume all broadcasts time. No action if "" */
protected String clock_24hr_resume;
protected String daily_resume = "";
/** Daily pause all broadcasts time. No action if "" */
protected String clock_24hr_pause;
protected String daily_pause = "";

private Clock_24hr clock_24hr;
private DailyClock dailyClock;
protected class Clock_24hr extends Thread
protected class DailyClock extends Thread
{
private boolean threadShouldStop = false;
@@ -104,23 +105,22 @@ public abstract class CommEngine extends HttpServlet
long startTime = System.currentTimeMillis();;

String timeOfDay = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm"));
//System.out.println(String.format("timeOfDay %s, pause %s, resume %s", timeOfDay, clock_24hr_pause, clock_24hr_resume));
// Check for pause
for (Broadcast broadcast : broadcasts.values())
{
// Check for pause
if (broadcast.clock_24hr_pause != null && broadcast.clock_24hr_pause.length() > 0)
if (broadcast.daily_pause.length() > 0)
{
if (timeOfDay.equals(broadcast.clock_24hr_pause)) broadcast.pause("clock", null);
if (timeOfDay.equals(broadcast.daily_pause)) broadcast.pause("clock", null);
}
else if (timeOfDay.equals(clock_24hr_pause)) broadcast.pause("clock", null);
else if (timeOfDay.equals(daily_pause)) broadcast.pause("clock", null);

// Check for resume
if (broadcast.clock_24hr_resume != null && broadcast.clock_24hr_resume.length() > 0)
if (broadcast.daily_resume.length() > 0)
{
if (timeOfDay.equals(broadcast.clock_24hr_resume)) broadcast.resume("clock", null);
if (timeOfDay.equals(broadcast.daily_resume)) broadcast.resume("clock", null);
}
else if (timeOfDay.equals(clock_24hr_resume)) broadcast.resume("clock", null);
else if (timeOfDay.equals(daily_resume)) broadcast.resume("clock", null);
}

// Wakes up every 1/2 minute to provide minute resolution
@@ -134,7 +134,7 @@ public abstract class CommEngine extends HttpServlet
}
catch (Exception e)
{
myLogger.error("Clock_24hr thread caught: " + e.getMessage(), e);
myLogger.error("DailyClock thread caught: " + e.getMessage(), e);
return;
}
}
@@ -180,8 +180,9 @@ public abstract class CommEngine extends HttpServlet
/**
* Invoked by servlet container during initialization of servlet.
* @throws ServletException
*/
public final void init()
public final void init() throws ServletException
{
// check init parameters
ServletContext servletContext = getServletContext();
@@ -254,9 +255,21 @@ public abstract class CommEngine extends HttpServlet
// Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes
String periodStr = config.getProperty("dead_broadcast_viewing_period", Long.toString(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT));
deadBroadcastViewingMinutes = Long.parseLong(periodStr);
clock_24hr_resume = config.getProperty(CLOCK_24HR_RESUME_KEY, "");
clock_24hr_pause = config.getProperty(CLOCK_24HR_PAUSE_KEY, "");
Thread clock_24hr;
String timeOfDay, timeOfDayStr, key;
key = DAILY_RESUME_KEY;
timeOfDayStr = config.getProperty(key , "");
timeOfDay = checkTimeOfDay(timeOfDayStr);
if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr));
daily_resume = timeOfDay;
key = DAILY_PAUSE_KEY;
timeOfDayStr = config.getProperty(key, "");
timeOfDay = checkTimeOfDay(timeOfDayStr);
if (timeOfDay == null) throw new ServletException(String.format("Invlaid valud for %s: %s", key, timeOfDayStr));
daily_pause = timeOfDay;
Thread dailyClock;
CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes));

@@ -264,15 +277,15 @@ public abstract class CommEngine extends HttpServlet
CommonLogger.activity.info("Postback max queue size = " + getPostbackMaxQueueSize());
CommonLogger.activity.info("Postback threadpool size = " + getPostbackSenderPoolSize());
CommonLogger.activity.info("Postback max batch size = " + getPostbackMaxBatchSize());
CommonLogger.activity.info("clock_24hr_resume = " + clock_24hr_resume);
CommonLogger.activity.info("clock_24hr_pause = " + clock_24hr_pause);
CommonLogger.activity.info("daily_resume = " + daily_resume);
CommonLogger.activity.info("daily_pause = " + daily_pause);
clock_24hr = new Clock_24hr();
clock_24hr.start();
dailyClock = new DailyClock();
dailyClock.start();
initChild();
}
public int getPauseThreshold()
{
return getPauseThreshold(config);
@@ -424,6 +437,10 @@ public abstract class CommEngine extends HttpServlet
{
resumeBroadcast(request, out);
}
else if (get.equalsIgnoreCase("configuration"))
{
getConfiguration(request, out);
}
else if (get.equalsIgnoreCase("configure"))
{
configure(request, out);
@@ -435,7 +452,43 @@ public abstract class CommEngine extends HttpServlet
out.close();
}

private void cancelBroadcast(HttpServletRequest request, PrintWriter out)
/**
* Writes configuration in JSON string to out
* @param request
* @param out
*/
protected void getConfiguration(HttpServletRequest request, PrintWriter out)
{
JSONObject configuration = getConfigJSON();
out.print(configuration);
}
private JSONObject getConfigJSON()
{
Map<Object, Object> dataMap = new HashMap<Object, Object>();
// engine configuration
Map<Object, Object> EngineMap = new HashMap<Object, Object>();
dataMap.put("engine", EngineMap);
EngineMap.put(DAILY_PAUSE_KEY, daily_pause);
EngineMap.put(DAILY_RESUME_KEY, daily_resume);
// broadcast configuration
Map<Object, Object> broadcastsMap = new HashMap<Object, Object>();
synchronized (broadcasts) {
for (String broadcastId : broadcasts.keySet())
{
Broadcast broadcast = broadcasts.get(broadcastId);
if (broadcast.getState().isFinal) continue;
JSONObject configJSON = broadcast.getConfigJSON();
broadcastsMap.put(broadcastId, configJSON);
}
}
if (broadcastsMap.size() > 0) dataMap.put("broadcasts", broadcastsMap);
return new JSONObject(dataMap);
}

private void cancelBroadcast(HttpServletRequest request, PrintWriter out)
{
// Get broadcastId from request
String broadcastId = getBroadcastId(request);
@@ -477,35 +530,78 @@ public abstract class CommEngine extends HttpServlet
broadcast.resume(reason, out);
}
/**
* Check if timeOfDay is of the form "HH::mm"
* @param timeOfDay
* @return timeOfDay if valid, otherwise null
*/
protected static String checkTimeOfDay(String timeOfDay) {
timeOfDay = timeOfDay.trim();
if (timeOfDay.length() == 0) return timeOfDay;
if (timeOfDay.length() != 5) return null;
return timeOfDay;
}
/**
* Writes error message to out. Otherwise writes nothing to out.
* @param request
* @param out
*/
protected void configure(HttpServletRequest request, PrintWriter out) {
String jsonData = request.getParameter("data");
System.out.println("jsonData: " + jsonData);
JSONParser parser = new JSONParser();
JSONObject currConfig = getConfigJSON();
String jsonString = request.getParameter("data");
try {
JSONObject jsonObject = (JSONObject) parser.parse(new StringReader(jsonData));
System.out.println(jsonObject);

String valueStr;
// clock_24hr_pause
valueStr = (String) jsonObject.get(CLOCK_24HR_PAUSE_KEY);
if (valueStr != null && valueStr.length() == 5) {
clock_24hr_pause = valueStr;
out.println("clock_24hr_pause updated.");
}
// clock_24hr_resume
valueStr = (String) jsonObject.get(CLOCK_24HR_RESUME_KEY);
if (valueStr != null && valueStr.length() == 5) {
clock_24hr_resume = valueStr;
out.println("clock_24hr_resume updated.");
}
JSONParser parser = new JSONParser();
JSONObject configuration = (JSONObject) parser.parse(jsonString);
configure(configuration);
} catch (Exception e) {
out.println("Error - " + e.getMessage());
myLogger.error(e);
out.write("Error - " + e.getMessage());
// restore to original confiuration
try {
configure(currConfig);
} catch (Exception e1) {
out.write("\nInternal error in restoring original configuration: " + e1.getMessage());
}
}
}
private void configureEngine(JSONObject configuration) throws Exception {
String value, timeOfDay, key;
key = DAILY_PAUSE_KEY;
value = (String)configuration.get(key);
if (value != null) {
timeOfDay = checkTimeOfDay(value);
if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value));
daily_pause = timeOfDay;
}
key = DAILY_RESUME_KEY;
value = (String)configuration.get(key);
if (value != null) {
timeOfDay = checkTimeOfDay(value);
if (timeOfDay == null) throw new Exception(String.format("Invalid value for %s: %s", key, value));
daily_resume = timeOfDay;
}
}
private void configure(JSONObject configuration) throws Exception {
// emgine
JSONObject engineConfig = (JSONObject)configuration.get("engine");
configureEngine(engineConfig);
// broadcasts
JSONObject broadcastsConfig = (JSONObject)configuration.get("broadcasts");
if (broadcastsConfig != null) {
for (Object broadcastId : broadcastsConfig.keySet())
{
JSONObject broadcastConfig = (JSONObject)broadcastsConfig.get(broadcastId);
Broadcast broadcast = broadcasts.get(broadcastId);
if (broadcast == null) continue;
broadcast.configure(broadcastConfig);
}
}
}

/**
* <CallEngine_status>
@@ -557,13 +653,9 @@ public abstract class CommEngine extends HttpServlet
// Then append status of each broadcast to outBuf.
for (Broadcast broadcast : broadcastList)
{
out.write(broadcast.mkStatusReport());
out.write(broadcast.mkStatusReport() + "\n");
}
out.write("<job_summary completed='" + getCompletedJobCount() + "' pending='" + getPendingJobCount() + "' active='" + getActiveJobCount() + "'/>\n");
out.write("<configuration>\n");
out.write("<" + CLOCK_24HR_PAUSE_KEY + ">" + clock_24hr_pause + "</" + CLOCK_24HR_PAUSE_KEY + ">\n");
out.write("<" + CLOCK_24HR_RESUME_KEY + ">" + clock_24hr_resume + "</" + CLOCK_24HR_RESUME_KEY + ">\n");
out.write("</configuration>\n");
out.write("</" + tag + ">");
}
}
@@ -650,11 +742,11 @@ public abstract class CommEngine extends HttpServlet
}
}
// Destroy clock_24hr thread
// Destroy dailyClock thread
try
{
clock_24hr.terminate();
clock_24hr.join();
dailyClock.terminate();
dailyClock.join();
}
catch (InterruptedException e)
{


Зареждане…
Отказ
Запис