|
- package altk.comm.engine;
-
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.PrintWriter;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Properties;
- import java.util.Vector;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- import javax.servlet.ServletContext;
- import javax.servlet.http.HttpServlet;
- import javax.servlet.http.HttpServletRequest;
- import javax.servlet.http.HttpServletResponse;
-
- import org.apache.log4j.Logger;
-
- import altk.comm.engine.Broadcast.BroadcastState;
- import altk.comm.engine.exception.BroadcastError;
- import altk.comm.engine.exception.BroadcastException;
- import altk.comm.engine.exception.PlatformError;
- import altk.comm.engine.exception.PlatformException;
- import altk.comm.engine.postback.PostBack;
-
- @SuppressWarnings("serial")
- public abstract class CommEngine extends HttpServlet
- {
- static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request";
-
- private static final int SCHEDULER_THREAD_POOL_SIZE = 1;
-
- private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60;
-
- private static final int SERVICE_THREADPOOL_SIZE_DEFAULT = 1;
-
- private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 2;
- private static final int POSTBACK_MAX_QUEUE_SIZE_DEFAULT = 10000;
- private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100;
- /**
- * Maps a broadcastId to a broadcast.
- */
- private Map<String, Broadcast> broadcasts;
-
- protected boolean notInService;
-
- protected Properties config;
-
- protected Map<String, PostBack> postBackMap;
-
- protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice"
-
- private long startupTimestamp;
-
- // Sequencing naming of broadcast that fails to yield its broadcastId
- private int unknownBroadcastIdNdx = 1;
-
- private BroadcastException myException;
-
- /**
- * Used to communicate media-specific platform resources to broadcasts
- */
- protected EngineResources resources;
-
- private static Logger myLogger = Logger.getLogger(CommEngine.class);
-
- private ScheduledExecutorService scheduler;
-
- private long deadBroadcastViewingMinutes;
-
- private int completedJobCount = 0;
-
- private int serviceThreadPoolSize;
-
- private int postbackMaxQueueSize;
-
- private int postbackSenderPoolSize;
-
- private int postbackMaxBatchSize;
-
- abstract protected Broadcast mkBroadcast();
-
- public CommEngine(String engineName)
- {
- this.engineName = engineName;
- broadcasts = new HashMap<String, Broadcast>();
- startupTimestamp = System.currentTimeMillis();
- myException = null;
- }
-
- /**
- * Invoked by servlet container during initialization of servlet.
- */
- public final void init()
- {
- myLogger.info("init() invoked");
- // check init parameters
- ServletContext servletContext = getServletContext();
- String propertiesFilePath;
-
- propertiesFilePath = servletContext.getInitParameter(getPropertiesContextName());
- File propertiesFile = new File(propertiesFilePath);
- CommonLogger.startup.info("Using configuration file " + propertiesFile.getAbsolutePath());
- config = new Properties();
- try
- {
- config.load(new FileInputStream(propertiesFile));
- }
- catch (FileNotFoundException e)
- {
- CommonLogger.alarm.fatal("Properties file " + propertiesFile.getAbsolutePath() + " not found -- abort");
- notInService = true;
- return;
- }
- catch (IOException e)
- {
- CommonLogger.alarm.fatal("Problem in reading properties file " + propertiesFile.getAbsolutePath() + ": " + e.getMessage());
- notInService = true;
- return;
- }
-
- postBackMap = new HashMap<String, PostBack>();
-
- // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes
- String periodStr = config.getProperty("dead_broadcast_viewing_period",
- new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString());
- deadBroadcastViewingMinutes = Long.parseLong(periodStr);
- CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes));
-
- String str = config.getProperty("service_threadpool_size",
- new Integer(SERVICE_THREADPOOL_SIZE_DEFAULT).toString());
- serviceThreadPoolSize = Integer.parseInt(str);
- CommonLogger.startup.info(String.format("service thread pool size: %d", serviceThreadPoolSize));
-
- String string = config.getProperty("postback_max_queue_size",
- new Integer(POSTBACK_MAX_QUEUE_SIZE_DEFAULT).toString());
- postbackMaxQueueSize = Integer.parseInt(string);
- CommonLogger.activity.info("Postback max queue size = " + postbackMaxQueueSize);
-
- string = config.getProperty("postback_threadpool_size",
- new Integer(POSTBACK_THREADPOOL_SIZE_DEFAULT).toString());
- postbackSenderPoolSize = Integer.parseInt(string);
- CommonLogger.activity.info("Postback threadpool size = " + postbackSenderPoolSize);
-
- string = config.getProperty("postback_max_batch_size",
- new Integer(POSTBACK_MAX_BATCH_SIZE_DEFAULT).toString());
- postbackMaxBatchSize = Integer.parseInt(string);
- CommonLogger.activity.info("Postback max batch size = " + postbackMaxBatchSize);
-
- scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE);
- scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}},
- deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES);
-
- initChild();
- }
-
- protected void purgeStaleBroadcasts()
- {
- long now = System.currentTimeMillis();
- synchronized (broadcasts)
- {
- for (String id : broadcasts.keySet())
- {
- if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000)
- {
- Broadcast broadcast = broadcasts.get(id);
- completedJobCount += broadcast.getCompletedJobCount();
- broadcasts.remove(id);
- }
- }
- }
- }
-
- /**
- *
- * @return name of parameter in Tomcat context file, specifying properties file
- * for this SMSEngine.
- */
- protected abstract String getPropertiesContextName();
-
-
- @Override
- protected void doPost(HttpServletRequest request, HttpServletResponse response)
- {
- myException = null;
- try
- {
- Broadcast broadcast = mkBroadcast();
- broadcast.setServiceThreadPoolsize(serviceThreadPoolSize);
- try
- {
- broadcast.decode(request, notInService);
- if (notInService)
- {
- throw new PlatformException(PlatformError.RUNTIME_ERROR,
- "Not in service");
- }
- if (broadcast.recipientList.size() == 0)
- {
- CommonLogger.activity.info("Broadcast " + broadcast.getBroadcastId() + ": No recipients");
- broadcast.setState(BroadcastState.COMPLETED, "No recipients", null);
- return;
- }
- // Determine postBackUrl
- String postBackURL = broadcast.getPostBackURL();
- PostBack postBack = null;
- if (postBackURL != null)
- {
- postBack = postBackMap.get(postBackURL);
- if (postBack == null)
- {
- postBack = new PostBack(postBackURL, broadcast.broadcastType + "_status",
- postbackMaxQueueSize, postbackSenderPoolSize, postbackMaxBatchSize);
- postBackMap.put(postBackURL, postBack);
- }
- }
- broadcast.initSync(resources);
- broadcast.init(postBack);
- if (broadcast.getState() == BroadcastState.COMPLETED) return;
- }
- catch (BroadcastException e)
- {
- myException = e;
- broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText);
- CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage());
- myLogger.error("Broadcast aborted", e);
- return;
- }
- catch (Throwable t)
- {
- // Caught stray unexpected runtime problem
- CommonLogger.alarm.error("Broadcast aborted: " + t);
- myLogger.error("Broadcast aborted", t);
- myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage());
- broadcast.setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText);
- }
- finally
- {
- // Put broadcast in broadcasts map.
- String broadcastId = broadcast.getBroadcastId();
- if (broadcastId != null && broadcastId.length() != 0)
- {
- broadcasts.put(broadcastId, broadcast);
- }
- else
- {
- String makeUpId = "Unknown" + unknownBroadcastIdNdx++;
- broadcasts.put(makeUpId, broadcast);
- }
-
- // Return regular or error response
- String responseXML = broadcast.getResponseXML(myException);
- PrintWriter writer = response.getWriter();
- writer.write(responseXML);
- writer.close();
- }
- try
- {
- broadcast.initAsync();
- broadcast.startProcessing();
- }
- catch (BroadcastException e)
- {
- broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText);
- CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage());
- myLogger.error("Broadcast aborted", e);
- }
- }
- catch (Throwable t)
- {
- // Caught stray unexpected runtime problem
- CommonLogger.alarm.error("Broadcast aborted: " + t.getMessage());
- myLogger.error("Broadcast aborted", t);
- }
- }
-
- /**
- * Functions covered are
- * get=status
- * get=cancel_broadcast&broadcast_id=XXX
- */
- @Override
- protected void doGet(HttpServletRequest request, HttpServletResponse response)
- {
- PrintWriter out;
- try
- {
- out = response.getWriter();
- }
- catch (IOException e)
- {
- CommonLogger.alarm.error("Cannot write a reply: " + e);
- return;
- }
- String get = (String)request.getParameter("get");
- if (get == null)
- {
- // Return http status BAD REQUEST
- int httpStatus = HttpServletResponse.SC_BAD_REQUEST;
- try
- {
- response.sendError(httpStatus);
- }
- catch (IOException e)
- {
- myLogger.warn("Unnable to return HTTP error code " + httpStatus);
- }
- return;
- }
- if (get.equalsIgnoreCase("status"))
- {
- getStatus(request, out);
- }
- else if (get.equalsIgnoreCase("cancel_broadcast"))
- {
- cancelBroadcast(request, out);
- }
- else if (get.equalsIgnoreCase("pause_broadcast"))
- {
- pauseBroadcast(request, out);
- }
- else if (get.equalsIgnoreCase("resume_broadcast"))
- {
- resumeBroadcast(request, out);
- }
- else
- {
- out.write(get + " not supported");
- }
- out.close();
- }
-
- private void cancelBroadcast(HttpServletRequest request, PrintWriter out)
- {
- // Get broadcastId from request
- String broadcastId = getBroadcastId(request);
- Broadcast broadcast = broadcasts.get(broadcastId);
- if (broadcast == null)
- {
- out.format("Broadcast %s does not exist", broadcastId);
- return;
- }
- broadcast.cancel();
- }
-
- protected void pauseBroadcast(HttpServletRequest request, PrintWriter out)
- {
- // Get broadcastId from request
- String broadcastId = getBroadcastId(request);
- Broadcast broadcast = broadcasts.get(broadcastId);
- if (broadcast == null)
- {
- out.format("Broadcast %s does not exist", broadcastId);
- return;
- }
- broadcast.pause();
- }
-
-
- protected void resumeBroadcast(HttpServletRequest request, PrintWriter out)
- {
- // Get broadcastId from request
- String broadcastId = getBroadcastId(request);
- Broadcast broadcast = broadcasts.get(broadcastId);
- if (broadcast == null)
- {
- out.format("Broadcast %s does not exist", broadcastId);
- return;
- }
- broadcast.resume();
- }
-
- /**
- * <CallEngine_status>
- * status of each broadcast
- * <calls><total>ttt</total><connected>nnn</connected>
- * </CallEngine_status>
- */
- private void getStatus(HttpServletRequest request, PrintWriter out)
- {
- String broadcastId = request.getParameter("broadcast_id");
- if (broadcastId != null)
- {
- broadcastId = broadcastId.trim();
- if (broadcastId.length() == 0)
- {
- out.write("broadcast_id request parameter cannot be empty");
- return;
- }
- Broadcast broadcast = broadcasts.get(broadcastId);
- if (broadcast == null)
- {
- out.write("<error>No such broadcast</error>");
- }
- else
- {
- out.write(broadcast.mkStatusReport());
- }
- return;
- }
- else
- {
- String tag = engineName + "_status";
- out.write("<" + tag + ">\r\n");
-
- out.write("<startup_time>" + startupTimestamp
- + "</startup_time>\r\n");
-
- // First get a copy of broadcasts, to avoid mutex deadlock.
- Vector<Broadcast> broadcastList = new Vector<Broadcast>();
- synchronized(broadcasts)
- {
- for (String key : broadcasts.keySet())
- {
- broadcastList.add(broadcasts.get(key));
- }
- }
- // We have released the lock.
- // Then append status of each broadcast to outBuf.
- for (Broadcast broadcast : broadcastList)
- {
- out.write(broadcast.mkStatusReport());
- }
- out.write("<job_summary completed='" + getCompletedJobCount() + "' ready='" + getReadyJobCount() + "' active='" + getActiveJobCount() + "'/>");
- out.write("</" + tag + ">");
- }
- }
-
- public int getReadyJobCount()
- {
- int readyCount = 0;
- synchronized(broadcasts)
- {
- for (Broadcast broadcast : broadcasts.values())
- {
- readyCount += broadcast.getReadyJobCount();
- }
- }
- return readyCount;
-
- }
-
- public int getActiveJobCount()
- {
- int activeCount = 0;
- synchronized(broadcasts)
- {
- for (Broadcast broadcast : broadcasts.values())
- {
- activeCount += broadcast.getActiveJobCount();
- }
- }
- return activeCount;
- }
-
- public int getCompletedJobCount()
- {
- int additionalCompletedJobCount = 0;
- synchronized(broadcasts)
- {
- for (Broadcast broadcast : broadcasts.values())
- {
- additionalCompletedJobCount += broadcast.getCompletedJobCount();
- }
- }
- return completedJobCount + additionalCompletedJobCount;
- }
-
- public void removeBroadcast(String broadcastId)
- {
- CommonLogger.activity.info("Removing broadcast " + broadcastId);
- synchronized(broadcasts)
- {
- broadcasts.remove(broadcastId);
- }
- }
-
- public boolean notInService()
- {
- return notInService;
- }
-
- /**
- * Decode http GET request for broadcast_id value
- * @param request
- * @return broadcast_id
- */
- private String getBroadcastId(HttpServletRequest request)
- {
- return request.getParameter("broadcast_id");
- }
-
- /**
- * Invoked by servlet container when servlet is destroyed.
- */
- public final void destroy()
- {
- System.out.println(engineName + " destroyed");
-
- // Shutdown threads that periodically purge stale broadcasts.
- scheduler.shutdownNow();
-
- // Kill threads in each PostBack, which is remembered in postBackMap.
- for (PostBack postback : postBackMap.values())
- {
- postback.terminate();
- }
-
- for (Broadcast broadcast : broadcasts.values())
- {
- broadcast.terminate(BroadcastState.ABORTED, "Platform termination");
- }
-
- destroyChild();
- super.destroy();
- }
-
- /**
- * Indirectly invoked by servlet container during servlet initialization.
- */
- abstract protected void initChild();
-
- /**
- * Indirectly invoked by serlet container during destruction of servlet.
- */
- abstract protected void destroyChild();
- }
|