Imported from dev1.link2tek.net CommEngine.git
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

542 lines
17 KiB

  1. package altk.comm.engine;
  2. import java.io.File;
  3. import java.io.FileInputStream;
  4. import java.io.FileNotFoundException;
  5. import java.io.IOException;
  6. import java.io.PrintWriter;
  7. import java.util.Enumeration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. import java.util.Properties;
  11. import java.util.Vector;
  12. import java.util.concurrent.Executors;
  13. import java.util.concurrent.ScheduledExecutorService;
  14. import java.util.concurrent.TimeUnit;
  15. import javax.servlet.ServletContext;
  16. import javax.servlet.http.HttpServlet;
  17. import javax.servlet.http.HttpServletRequest;
  18. import javax.servlet.http.HttpServletResponse;
  19. import org.apache.log4j.Logger;
  20. import org.apache.log4j.PropertyConfigurator;
  21. import altk.comm.engine.Broadcast.BroadcastState;
  22. @SuppressWarnings("serial")
  23. public abstract class CommEngine extends HttpServlet
  24. {
  25. public static final String SERVICE_THREADPOOL_SIZE_KEY = "service_threadpool_size";
  26. static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request";
  27. private static final int SCHEDULER_THREAD_POOL_SIZE = 1;
  28. private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60;
  29. private static final int SERVICE_THREADPOOL_SIZE_DEFAULT = 1;
  30. private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 2;
  31. private static final int POSTBACK_MAX_QUEUE_SIZE_DEFAULT = 10000;
  32. private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100;
  33. /**
  34. * Maps a broadcastId to a broadcast.
  35. */
  36. private Map<String, Broadcast> broadcasts;
  37. protected boolean notInService;
  38. protected Properties config;
  39. protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice"
  40. private long startupTimestamp;
  41. // Sequencing naming of broadcast that fails to yield its broadcastId
  42. private int unknownBroadcastIdNdx = 1;
  43. /**
  44. * Used to communicate media-specific platform resources to broadcasts
  45. */
  46. protected EngineResources resources;
  47. protected static Logger myLogger;
  48. private ScheduledExecutorService scheduler;
  49. private long deadBroadcastViewingMinutes;
  50. private int completedJobCount = 0;
  51. private int serviceThreadPoolSize;
  52. private int postbackMaxQueueSize;
  53. private int postbackSenderPoolSize;
  54. private int postbackMaxBatchSize;
  55. protected String runtimeDirPath;
  56. protected String confDirPath;
  57. abstract protected Broadcast mkBroadcast();
  58. public CommEngine(String engineName)
  59. {
  60. this.engineName = engineName;
  61. broadcasts = new HashMap<String, Broadcast>();
  62. startupTimestamp = System.currentTimeMillis();
  63. }
  64. /**
  65. * Relocates a filepath relative to runtime directory if filepath is not absolute.
  66. * @param filepath
  67. * @return
  68. */
  69. public String relocateToRuntimeDir(String filepath)
  70. {
  71. if (filepath.startsWith("/")) return filepath; // no change to absolute path
  72. String relocated = filepath;
  73. // The next 2 lines take care of pre-git era meaning convention of filepath in properties.
  74. // Then, the runtime is relative to the current working directory of tomcat.
  75. // Now they are relative to the runtimDirPath obtained from the tomcat tomcat context.
  76. // These 2 lines an be deleted when all CommEngines in production are in the git era.
  77. String unwanted_prefix = engineName + "/";
  78. if (filepath.startsWith(unwanted_prefix)) relocated = filepath.substring(unwanted_prefix.length());
  79. relocated = runtimeDirPath + "/" + relocated;
  80. return relocated;
  81. }
  82. /**
  83. * Invoked by servlet container during initialization of servlet.
  84. */
  85. public final void init()
  86. {
  87. // check init parameters
  88. ServletContext servletContext = getServletContext();
  89. confDirPath = servletContext.getInitParameter(getConfDirContextName());
  90. System.out.println("Config directory is configured to be '" + confDirPath + "'. Make sure it and its content are readable by user 'tomcat'");
  91. runtimeDirPath = servletContext.getInitParameter(getRunTimeDirContextName());
  92. System.out.println("Runtime directory is configured to be '" + runtimeDirPath + "'. Make sure it and its content are readable by user 'tomcat'");
  93. File propertiesFile = new File(confDirPath + "/properties");
  94. // Configure log4j using log4j.properties file, \
  95. // sibling to the engine properties file.
  96. // This change is backward compatible with placing the log4j.properties file in
  97. // the class path.
  98. String log4j_properties = confDirPath + "/log4j.properties";
  99. // Relocate file property offetting it by the runtimeDirPath
  100. try
  101. {
  102. Properties prop = new Properties();
  103. prop.load(new FileInputStream(log4j_properties));
  104. Enumeration<Object> e = prop.keys();
  105. while (e.hasMoreElements())
  106. {
  107. String key = (String)e.nextElement();
  108. if (key.toLowerCase().endsWith(".file"))
  109. {
  110. String filepath = prop.getProperty(key);
  111. String relocate = relocateToRuntimeDir(filepath);
  112. prop.setProperty(key, relocate);
  113. System.out.println(key + "=" + relocate);
  114. }
  115. }
  116. PropertyConfigurator.configure(prop);
  117. }
  118. catch (Exception e)
  119. {
  120. System.out.println("Failed to configure log4: " + e);
  121. // Do nothing, assuming the exception is FileNotFoundException.
  122. // Remaining log4j initialization will look for log4j.properties
  123. // file in the class path.
  124. }
  125. // This activates Logger class instantiation. At this point, if lo4j
  126. // is not yet configured,
  127. // it will look for the log4j.properties file in the class path.
  128. myLogger = Logger.getLogger(CommEngine.class);
  129. myLogger.info("init() invoked");
  130. // File propertiesFile = new File(
  131. // getClass().getClassLoader().getResource("properties").getFile()
  132. // );
  133. CommonLogger.startup.info("Using lo4j properites file " + log4j_properties);
  134. CommonLogger.startup.info("Using configuration file " + propertiesFile.getAbsolutePath());
  135. config = new Properties();
  136. try
  137. {
  138. config.load(new FileInputStream(propertiesFile));
  139. }
  140. catch (FileNotFoundException e)
  141. {
  142. CommonLogger.alarm.fatal("Properties file " + propertiesFile.getAbsolutePath() + " not found -- abort");
  143. notInService = true;
  144. return;
  145. }
  146. catch (IOException e)
  147. {
  148. CommonLogger.alarm.fatal("Problem in reading properties file " + propertiesFile.getAbsolutePath() + ": " + e.getMessage());
  149. notInService = true;
  150. return;
  151. }
  152. // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes
  153. String periodStr = config.getProperty("dead_broadcast_viewing_period",
  154. new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString());
  155. deadBroadcastViewingMinutes = Long.parseLong(periodStr);
  156. CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes));
  157. String str = config.getProperty(SERVICE_THREADPOOL_SIZE_KEY,
  158. new Integer(SERVICE_THREADPOOL_SIZE_DEFAULT).toString());
  159. serviceThreadPoolSize = Integer.parseInt(str);
  160. CommonLogger.startup.info(String.format("service thread pool size: %d", serviceThreadPoolSize));
  161. String string = config.getProperty("postback_max_queue_size",
  162. new Integer(POSTBACK_MAX_QUEUE_SIZE_DEFAULT).toString());
  163. postbackMaxQueueSize = Integer.parseInt(string);
  164. CommonLogger.activity.info("Postback max queue size = " + postbackMaxQueueSize);
  165. string = config.getProperty("postback_threadpool_size",
  166. new Integer(POSTBACK_THREADPOOL_SIZE_DEFAULT).toString());
  167. postbackSenderPoolSize = Integer.parseInt(string);
  168. CommonLogger.activity.info("Postback threadpool size = " + postbackSenderPoolSize);
  169. string = config.getProperty("postback_max_batch_size",
  170. new Integer(POSTBACK_MAX_BATCH_SIZE_DEFAULT).toString());
  171. postbackMaxBatchSize = Integer.parseInt(string);
  172. CommonLogger.activity.info("Postback max batch size = " + postbackMaxBatchSize);
  173. scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE);
  174. scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}},
  175. deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES);
  176. initChild();
  177. }
  178. protected void purgeStaleBroadcasts()
  179. {
  180. long now = System.currentTimeMillis();
  181. synchronized (broadcasts)
  182. {
  183. for (String id : broadcasts.keySet())
  184. {
  185. if (now - broadcasts.get(id).changeStateTime > deadBroadcastViewingMinutes * 60 * 1000)
  186. {
  187. Broadcast broadcast = broadcasts.get(id);
  188. completedJobCount += broadcast.getCompletedJobCount();
  189. broadcasts.remove(id);
  190. }
  191. }
  192. }
  193. }
  194. /**
  195. *
  196. * @return name of parameter in Tomcat context file, specifying properties file
  197. * for this SMSEngine.
  198. */
  199. protected abstract String getConfDirContextName();
  200. protected abstract String getRunTimeDirContextName();
  201. @Override
  202. protected void doPost(HttpServletRequest request, HttpServletResponse response)
  203. {
  204. Broadcast broadcast = mkBroadcast();
  205. broadcast.doPost(request, response, this);
  206. }
  207. /**
  208. * Functions covered are
  209. * get=status
  210. * get=cancel_broadcast&broadcast_id=XXX
  211. */
  212. @Override
  213. protected void doGet(HttpServletRequest request, HttpServletResponse response)
  214. {
  215. PrintWriter out;
  216. try
  217. {
  218. out = response.getWriter();
  219. }
  220. catch (IOException e)
  221. {
  222. CommonLogger.alarm.error("Cannot write a reply: " + e);
  223. return;
  224. }
  225. String get = (String)request.getParameter("get");
  226. if (get == null)
  227. {
  228. // Return http status BAD REQUEST
  229. int httpStatus = HttpServletResponse.SC_BAD_REQUEST;
  230. try
  231. {
  232. response.sendError(httpStatus);
  233. }
  234. catch (IOException e)
  235. {
  236. myLogger.warn("Unnable to return HTTP error code " + httpStatus);
  237. }
  238. return;
  239. }
  240. if (get.equalsIgnoreCase("status"))
  241. {
  242. getStatus(request, out);
  243. }
  244. else if (get.equalsIgnoreCase("cancel_broadcast"))
  245. {
  246. cancelBroadcast(request, out);
  247. }
  248. else if (get.equalsIgnoreCase("pause_broadcast"))
  249. {
  250. pauseBroadcast(request, out);
  251. }
  252. else if (get.equalsIgnoreCase("resume_broadcast"))
  253. {
  254. resumeBroadcast(request, out);
  255. }
  256. else
  257. {
  258. out.write(get + " not supported");
  259. }
  260. out.close();
  261. }
  262. private void cancelBroadcast(HttpServletRequest request, PrintWriter out)
  263. {
  264. // Get broadcastId from request
  265. String broadcastId = getBroadcastId(request);
  266. Broadcast broadcast = broadcasts.get(broadcastId);
  267. if (broadcast == null)
  268. {
  269. out.format("Broadcast %s does not exist", broadcastId);
  270. return;
  271. }
  272. broadcast.cancel(out);
  273. }
  274. protected void pauseBroadcast(HttpServletRequest request, PrintWriter out)
  275. {
  276. // Get broadcastId from request
  277. String broadcastId = getBroadcastId(request);
  278. Broadcast broadcast = broadcasts.get(broadcastId);
  279. if (broadcast == null)
  280. {
  281. out.format("Broadcast %s does not exist", broadcastId);
  282. return;
  283. }
  284. broadcast.pause(out);
  285. }
  286. protected void resumeBroadcast(HttpServletRequest request, PrintWriter out)
  287. {
  288. // Get broadcastId from request
  289. String broadcastId = getBroadcastId(request);
  290. Broadcast broadcast = broadcasts.get(broadcastId);
  291. if (broadcast == null)
  292. {
  293. out.format("Broadcast %s does not exist", broadcastId);
  294. return;
  295. }
  296. broadcast.resume(out);
  297. }
  298. /**
  299. * <CallEngine_status>
  300. * status of each broadcast
  301. * <calls><total>ttt</total><connected>nnn</connected>
  302. * </CallEngine_status>
  303. */
  304. private void getStatus(HttpServletRequest request, PrintWriter out)
  305. {
  306. String broadcastId = request.getParameter("broadcast_id");
  307. if (broadcastId != null)
  308. {
  309. broadcastId = broadcastId.trim();
  310. if (broadcastId.length() == 0)
  311. {
  312. out.write("broadcast_id request parameter cannot be empty");
  313. return;
  314. }
  315. Broadcast broadcast = broadcasts.get(broadcastId);
  316. if (broadcast == null)
  317. {
  318. out.write("<error>No such broadcast</error>");
  319. }
  320. else
  321. {
  322. out.write(broadcast.mkStatusReport());
  323. }
  324. return;
  325. }
  326. else
  327. {
  328. String tag = engineName + "_status";
  329. out.write("<" + tag + ">\r\n");
  330. out.write("<startup_time>" + startupTimestamp
  331. + "</startup_time>\r\n");
  332. // First get a copy of broadcasts, to avoid mutex deadlock.
  333. Vector<Broadcast> broadcastList = new Vector<Broadcast>();
  334. synchronized(broadcasts)
  335. {
  336. for (String key : broadcasts.keySet())
  337. {
  338. broadcastList.add(broadcasts.get(key));
  339. }
  340. }
  341. // We have released the lock.
  342. // Then append status of each broadcast to outBuf.
  343. for (Broadcast broadcast : broadcastList)
  344. {
  345. out.write(broadcast.mkStatusReport());
  346. }
  347. out.write("<job_summary completed='" + getCompletedJobCount() + "' pending='" + getPendingJobCount() + "' active='" + getActiveJobCount() + "'/>");
  348. out.write("</" + tag + ">");
  349. }
  350. }
  351. public int getPendingJobCount()
  352. {
  353. int readyCount = 0;
  354. synchronized(broadcasts)
  355. {
  356. for (Broadcast broadcast : broadcasts.values())
  357. {
  358. readyCount += broadcast.getPendingJobCount();
  359. }
  360. }
  361. return readyCount;
  362. }
  363. public int getActiveJobCount()
  364. {
  365. int activeCount = 0;
  366. synchronized(broadcasts)
  367. {
  368. for (Broadcast broadcast : broadcasts.values())
  369. {
  370. activeCount += broadcast.getActiveJobCount();
  371. }
  372. }
  373. return activeCount;
  374. }
  375. public int getCompletedJobCount()
  376. {
  377. int additionalCompletedJobCount = 0;
  378. synchronized(broadcasts)
  379. {
  380. for (Broadcast broadcast : broadcasts.values())
  381. {
  382. additionalCompletedJobCount += broadcast.getCompletedJobCount();
  383. }
  384. }
  385. return completedJobCount + additionalCompletedJobCount;
  386. }
  387. public void removeBroadcast(String broadcastId)
  388. {
  389. CommonLogger.activity.info("Removing broadcast " + broadcastId);
  390. synchronized(broadcasts)
  391. {
  392. broadcasts.remove(broadcastId);
  393. }
  394. }
  395. public boolean notInService()
  396. {
  397. return notInService;
  398. }
  399. /**
  400. * Decode http GET request for broadcast_id value
  401. * @param request
  402. * @return broadcast_id
  403. */
  404. private String getBroadcastId(HttpServletRequest request)
  405. {
  406. return request.getParameter("broadcast_id");
  407. }
  408. /**
  409. * Invoked by servlet container when servlet is destroyed.
  410. */
  411. public final void destroy()
  412. {
  413. System.out.println("Destroying " + engineName);
  414. // Shutdown threads that periodically purge stale broadcasts.
  415. scheduler.shutdownNow();
  416. for (Broadcast broadcast : broadcasts.values())
  417. {
  418. broadcast.terminate(BroadcastState.ABORTED, "Platform termination");
  419. }
  420. destroyChild();
  421. super.destroy();
  422. }
  423. /**
  424. * Indirectly invoked by servlet container during servlet initialization.
  425. */
  426. abstract protected void initChild();
  427. /**
  428. * Indirectly invoked by servlet container during destruction of servlet.
  429. */
  430. abstract protected void destroyChild();
  431. public EngineResources getResources()
  432. {
  433. return resources;
  434. }
  435. public void addBroadcast(String broadcastId, Broadcast broadcast)
  436. {
  437. if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++;
  438. broadcasts.put(broadcastId, broadcast);
  439. }
  440. /**
  441. * If broadcast has no id, one will be created for it.
  442. * @param broadcast
  443. */
  444. public void installBroadcast(Broadcast broadcast)
  445. {
  446. String broadcastId = broadcast.getBroadcastId();
  447. if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++;
  448. broadcasts.put(broadcastId, broadcast);
  449. }
  450. @Deprecated
  451. public int getServiceThreadPoolSize()
  452. {
  453. return serviceThreadPoolSize;
  454. }
  455. public int getPostbackMaxQueueSize() {
  456. return postbackMaxQueueSize;
  457. }
  458. public int getPostbackSenderPoolSize() {
  459. return postbackSenderPoolSize;
  460. }
  461. public int getPostbackMaxBatchSize() {
  462. return postbackMaxBatchSize;
  463. }
  464. }