Imported from dev1.link2tek.net CommEngine.git
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

631 lines
21 KiB

  1. package altk.comm.engine;
  2. import java.io.File;
  3. import java.io.FileInputStream;
  4. import java.io.FileNotFoundException;
  5. import java.io.IOException;
  6. import java.io.PrintWriter;
  7. import java.util.Enumeration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. import java.util.Properties;
  11. import java.util.Vector;
  12. import java.util.concurrent.Executors;
  13. import java.util.concurrent.ScheduledExecutorService;
  14. import java.util.concurrent.TimeUnit;
  15. import javax.servlet.ServletContext;
  16. import javax.servlet.http.HttpServlet;
  17. import javax.servlet.http.HttpServletRequest;
  18. import javax.servlet.http.HttpServletResponse;
  19. import org.apache.log4j.Logger;
  20. import org.apache.log4j.PropertyConfigurator;
  21. import altk.comm.engine.Broadcast.BroadcastState;
  22. import altk.comm.engine.exception.BroadcastError;
  23. import altk.comm.engine.exception.BroadcastException;
  24. import altk.comm.engine.exception.PlatformError;
  25. import altk.comm.engine.exception.PlatformException;
  26. import altk.comm.engine.postback.PostBack;
  27. @SuppressWarnings("serial")
  28. public abstract class CommEngine extends HttpServlet
  29. {
  30. static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request";
  31. private static final int SCHEDULER_THREAD_POOL_SIZE = 1;
  32. private static final long DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT = 60;
  33. private static final int SERVICE_THREADPOOL_SIZE_DEFAULT = 1;
  34. private static final int POSTBACK_THREADPOOL_SIZE_DEFAULT = 2;
  35. private static final int POSTBACK_MAX_QUEUE_SIZE_DEFAULT = 10000;
  36. private static final int POSTBACK_MAX_BATCH_SIZE_DEFAULT = 100;
  37. /**
  38. * Maps a broadcastId to a broadcast.
  39. */
  40. private Map<String, Broadcast> broadcasts;
  41. protected boolean notInService;
  42. protected Properties config;
  43. protected Map<String, PostBack> postBackMap;
  44. protected final String engineName; // e.g. "broadcast_sms", "broadcast_voice"
  45. private long startupTimestamp;
  46. // Sequencing naming of broadcast that fails to yield its broadcastId
  47. private int unknownBroadcastIdNdx = 1;
  48. private BroadcastException myException;
  49. /**
  50. * Used to communicate media-specific platform resources to broadcasts
  51. */
  52. protected EngineResources resources;
  53. private static Logger myLogger;
  54. private ScheduledExecutorService scheduler;
  55. private long deadBroadcastViewingMinutes;
  56. private int completedJobCount = 0;
  57. private int serviceThreadPoolSize;
  58. private int postbackMaxQueueSize;
  59. private int postbackSenderPoolSize;
  60. private int postbackMaxBatchSize;
  61. protected String runtimeDirPath;
  62. protected String confDirPath;
  63. abstract protected Broadcast mkBroadcast();
  64. public CommEngine(String engineName)
  65. {
  66. this.engineName = engineName;
  67. broadcasts = new HashMap<String, Broadcast>();
  68. startupTimestamp = System.currentTimeMillis();
  69. myException = null;
  70. }
  71. /**
  72. * Invoked by servlet container during initialization of servlet.
  73. */
  74. public final void init()
  75. {
  76. // check init parameters
  77. ServletContext servletContext = getServletContext();
  78. confDirPath = servletContext.getInitParameter(getConfDirContextName());
  79. System.out.println("Config directory is configured to be '" + confDirPath + "'. Make sure it and its content are readable by user 'tomcat'");
  80. runtimeDirPath = servletContext.getInitParameter(getRunTimeDirContextName());
  81. System.out.println("Runtime directory is configured to be '" + runtimeDirPath + "'. Make sure it and its content are readable by user 'tomcat'");
  82. File propertiesFile = new File(confDirPath + "/properties");
  83. // Configure log4j using log4j.properties file, \
  84. // sibling to the engine properties file.
  85. // This change is backward compatible with placing the log4j.properties file in
  86. // the class path.
  87. String log4j_properties = confDirPath + "/log4j.properties";
  88. // Relocate file property offetting it by the runtimeDirPath
  89. try
  90. {
  91. Properties prop = new Properties();
  92. prop.load(new FileInputStream(log4j_properties));
  93. Enumeration<Object> e = prop.keys();
  94. while (e.hasMoreElements())
  95. {
  96. String key = (String)e.nextElement();
  97. if (key.toLowerCase().endsWith(".file"))
  98. {
  99. prop.setProperty(key, runtimeDirPath + "/" + prop.getProperty(key));
  100. System.out.println(key + "=" + prop.getProperty(key));
  101. }
  102. }
  103. PropertyConfigurator.configure(prop);
  104. }
  105. catch (Exception e)
  106. {
  107. System.out.println("Failed to configure log4: " + e);
  108. // Do nothing, assuming the exception is FileNotFoundException.
  109. // Remaining log4j initialization will look for log4j.properties
  110. // file in the class path.
  111. }
  112. // This activates Logger class instantiation. At this point, if lo4j
  113. // is not yet configured,
  114. // it will look for the log4j.properties file in the class path.
  115. myLogger = Logger.getLogger(CommEngine.class);
  116. myLogger.info("init() invoked");
  117. // File propertiesFile = new File(
  118. // getClass().getClassLoader().getResource("properties").getFile()
  119. // );
  120. CommonLogger.startup.info("Using lo4j properites file " + log4j_properties);
  121. CommonLogger.startup.info("Using configuration file " + propertiesFile.getAbsolutePath());
  122. config = new Properties();
  123. try
  124. {
  125. config.load(new FileInputStream(propertiesFile));
  126. }
  127. catch (FileNotFoundException e)
  128. {
  129. CommonLogger.alarm.fatal("Properties file " + propertiesFile.getAbsolutePath() + " not found -- abort");
  130. notInService = true;
  131. return;
  132. }
  133. catch (IOException e)
  134. {
  135. CommonLogger.alarm.fatal("Problem in reading properties file " + propertiesFile.getAbsolutePath() + ": " + e.getMessage());
  136. notInService = true;
  137. return;
  138. }
  139. postBackMap = new HashMap<String, PostBack>();
  140. // Set up periodic purge of stale broadcasts, based on deadBroadcastViewingMinutes
  141. String periodStr = config.getProperty("dead_broadcast_viewing_period",
  142. new Long(DEAD_BROADCAST_VIEWING_PERIOD_DEFAULT).toString());
  143. deadBroadcastViewingMinutes = Long.parseLong(periodStr);
  144. CommonLogger.startup.info(String.format("Dead broadcast viewing period: %d minutes", deadBroadcastViewingMinutes));
  145. String str = config.getProperty("service_threadpool_size",
  146. new Integer(SERVICE_THREADPOOL_SIZE_DEFAULT).toString());
  147. serviceThreadPoolSize = Integer.parseInt(str);
  148. CommonLogger.startup.info(String.format("service thread pool size: %d", serviceThreadPoolSize));
  149. String string = config.getProperty("postback_max_queue_size",
  150. new Integer(POSTBACK_MAX_QUEUE_SIZE_DEFAULT).toString());
  151. postbackMaxQueueSize = Integer.parseInt(string);
  152. CommonLogger.activity.info("Postback max queue size = " + postbackMaxQueueSize);
  153. string = config.getProperty("postback_threadpool_size",
  154. new Integer(POSTBACK_THREADPOOL_SIZE_DEFAULT).toString());
  155. postbackSenderPoolSize = Integer.parseInt(string);
  156. CommonLogger.activity.info("Postback threadpool size = " + postbackSenderPoolSize);
  157. string = config.getProperty("postback_max_batch_size",
  158. new Integer(POSTBACK_MAX_BATCH_SIZE_DEFAULT).toString());
  159. postbackMaxBatchSize = Integer.parseInt(string);
  160. CommonLogger.activity.info("Postback max batch size = " + postbackMaxBatchSize);
  161. scheduler = Executors.newScheduledThreadPool(SCHEDULER_THREAD_POOL_SIZE);
  162. scheduler.scheduleAtFixedRate(new Runnable() { public void run() { purgeStaleBroadcasts();}},
  163. deadBroadcastViewingMinutes, deadBroadcastViewingMinutes, TimeUnit.MINUTES);
  164. initChild();
  165. }
  166. protected void purgeStaleBroadcasts()
  167. {
  168. long now = System.currentTimeMillis();
  169. synchronized (broadcasts)
  170. {
  171. for (String id : broadcasts.keySet())
  172. {
  173. if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000)
  174. {
  175. Broadcast broadcast = broadcasts.get(id);
  176. completedJobCount += broadcast.getCompletedJobCount();
  177. broadcasts.remove(id);
  178. }
  179. }
  180. }
  181. }
  182. /**
  183. *
  184. * @return name of parameter in Tomcat context file, specifying properties file
  185. * for this SMSEngine.
  186. */
  187. protected abstract String getConfDirContextName();
  188. protected abstract String getRunTimeDirContextName();
  189. @Override
  190. protected void doPost(HttpServletRequest request, HttpServletResponse response)
  191. {
  192. Broadcast broadcast = mkBroadcast();
  193. broadcast.doPost(request, response, this);
  194. }
  195. //@Override
  196. // TODO Not used - delete
  197. @Deprecated
  198. protected void doPost_good(HttpServletRequest request, HttpServletResponse response)
  199. {
  200. myException = null;
  201. try
  202. {
  203. Broadcast broadcast = mkBroadcast();
  204. broadcast.setServiceThreadPoolsize(serviceThreadPoolSize);
  205. try
  206. {
  207. broadcast.decode(request, notInService);
  208. if (notInService)
  209. {
  210. throw new PlatformException(PlatformError.RUNTIME_ERROR,
  211. "Not in service");
  212. }
  213. if (broadcast.recipientList.size() == 0)
  214. {
  215. CommonLogger.activity.info("Broadcast " + broadcast.getBroadcastId() + ": No recipients");
  216. broadcast.setState(BroadcastState.COMPLETED, "No recipients", null);
  217. return;
  218. }
  219. // Determine postBackUrl
  220. String postBackURL = broadcast.getPostBackURL();
  221. PostBack postBack = null;
  222. if (postBackURL != null)
  223. {
  224. postBack = postBackMap.get(postBackURL);
  225. if (postBack == null)
  226. {
  227. postBack = new PostBack(postBackURL, broadcast.broadcastType + "_status",
  228. postbackMaxQueueSize, postbackSenderPoolSize, postbackMaxBatchSize);
  229. postBackMap.put(postBackURL, postBack);
  230. }
  231. }
  232. broadcast.initSync(resources);
  233. broadcast.init(postBack);
  234. if (broadcast.getState() == BroadcastState.COMPLETED) return;
  235. }
  236. catch (BroadcastException e)
  237. {
  238. myException = e;
  239. broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText);
  240. CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage());
  241. myLogger.error("Broadcast aborted", e);
  242. return;
  243. }
  244. catch (Throwable t)
  245. {
  246. // Caught stray unexpected runtime problem
  247. CommonLogger.alarm.error("Broadcast aborted: " + t);
  248. myLogger.error("Broadcast aborted", t);
  249. myException = new BroadcastException(BroadcastError.PLATFORM_ERROR, t.getMessage());
  250. broadcast.setState(BroadcastState.ABORTED, myException.errorCodeText, myException.errorText);
  251. }
  252. finally
  253. {
  254. // Put broadcast in broadcasts map.
  255. String broadcastId = broadcast.getBroadcastId();
  256. if (broadcastId != null && broadcastId.length() != 0)
  257. {
  258. broadcasts.put(broadcastId, broadcast);
  259. }
  260. else
  261. {
  262. String makeUpId = "Unknown" + unknownBroadcastIdNdx++;
  263. broadcasts.put(makeUpId, broadcast);
  264. }
  265. // Return regular or error response
  266. String responseXML = broadcast.getResponseXML(myException);
  267. PrintWriter writer = response.getWriter();
  268. writer.write(responseXML);
  269. writer.close();
  270. }
  271. try
  272. {
  273. broadcast.initAsync();
  274. broadcast.startProcessing();
  275. }
  276. catch (BroadcastException e)
  277. {
  278. broadcast.setState(BroadcastState.ABORTED, e.errorCodeText, e.errorText);
  279. CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage());
  280. myLogger.error("Broadcast aborted", e);
  281. }
  282. }
  283. catch (Throwable t)
  284. {
  285. // Caught stray unexpected runtime problem
  286. CommonLogger.alarm.error("Broadcast aborted: " + t.getMessage());
  287. myLogger.error("Broadcast aborted", t);
  288. }
  289. }
  290. /**
  291. * Functions covered are
  292. * get=status
  293. * get=cancel_broadcast&broadcast_id=XXX
  294. */
  295. @Override
  296. protected void doGet(HttpServletRequest request, HttpServletResponse response)
  297. {
  298. PrintWriter out;
  299. try
  300. {
  301. out = response.getWriter();
  302. }
  303. catch (IOException e)
  304. {
  305. CommonLogger.alarm.error("Cannot write a reply: " + e);
  306. return;
  307. }
  308. String get = (String)request.getParameter("get");
  309. if (get == null)
  310. {
  311. // Return http status BAD REQUEST
  312. int httpStatus = HttpServletResponse.SC_BAD_REQUEST;
  313. try
  314. {
  315. response.sendError(httpStatus);
  316. }
  317. catch (IOException e)
  318. {
  319. myLogger.warn("Unnable to return HTTP error code " + httpStatus);
  320. }
  321. return;
  322. }
  323. if (get.equalsIgnoreCase("status"))
  324. {
  325. getStatus(request, out);
  326. }
  327. else if (get.equalsIgnoreCase("cancel_broadcast"))
  328. {
  329. cancelBroadcast(request, out);
  330. }
  331. else if (get.equalsIgnoreCase("pause_broadcast"))
  332. {
  333. pauseBroadcast(request, out);
  334. }
  335. else if (get.equalsIgnoreCase("resume_broadcast"))
  336. {
  337. resumeBroadcast(request, out);
  338. }
  339. else
  340. {
  341. out.write(get + " not supported");
  342. }
  343. out.close();
  344. }
  345. private void cancelBroadcast(HttpServletRequest request, PrintWriter out)
  346. {
  347. // Get broadcastId from request
  348. String broadcastId = getBroadcastId(request);
  349. Broadcast broadcast = broadcasts.get(broadcastId);
  350. if (broadcast == null)
  351. {
  352. out.format("Broadcast %s does not exist", broadcastId);
  353. return;
  354. }
  355. broadcast.cancel(out);
  356. }
  357. protected void pauseBroadcast(HttpServletRequest request, PrintWriter out)
  358. {
  359. // Get broadcastId from request
  360. String broadcastId = getBroadcastId(request);
  361. Broadcast broadcast = broadcasts.get(broadcastId);
  362. if (broadcast == null)
  363. {
  364. out.format("Broadcast %s does not exist", broadcastId);
  365. return;
  366. }
  367. broadcast.pause();
  368. }
  369. protected void resumeBroadcast(HttpServletRequest request, PrintWriter out)
  370. {
  371. // Get broadcastId from request
  372. String broadcastId = getBroadcastId(request);
  373. Broadcast broadcast = broadcasts.get(broadcastId);
  374. if (broadcast == null)
  375. {
  376. out.format("Broadcast %s does not exist", broadcastId);
  377. return;
  378. }
  379. broadcast.resume();
  380. }
  381. /**
  382. * <CallEngine_status>
  383. * status of each broadcast
  384. * <calls><total>ttt</total><connected>nnn</connected>
  385. * </CallEngine_status>
  386. */
  387. private void getStatus(HttpServletRequest request, PrintWriter out)
  388. {
  389. String broadcastId = request.getParameter("broadcast_id");
  390. if (broadcastId != null)
  391. {
  392. broadcastId = broadcastId.trim();
  393. if (broadcastId.length() == 0)
  394. {
  395. out.write("broadcast_id request parameter cannot be empty");
  396. return;
  397. }
  398. Broadcast broadcast = broadcasts.get(broadcastId);
  399. if (broadcast == null)
  400. {
  401. out.write("<error>No such broadcast</error>");
  402. }
  403. else
  404. {
  405. out.write(broadcast.mkStatusReport());
  406. }
  407. return;
  408. }
  409. else
  410. {
  411. String tag = engineName + "_status";
  412. out.write("<" + tag + ">\r\n");
  413. out.write("<startup_time>" + startupTimestamp
  414. + "</startup_time>\r\n");
  415. // First get a copy of broadcasts, to avoid mutex deadlock.
  416. Vector<Broadcast> broadcastList = new Vector<Broadcast>();
  417. synchronized(broadcasts)
  418. {
  419. for (String key : broadcasts.keySet())
  420. {
  421. broadcastList.add(broadcasts.get(key));
  422. }
  423. }
  424. // We have released the lock.
  425. // Then append status of each broadcast to outBuf.
  426. for (Broadcast broadcast : broadcastList)
  427. {
  428. out.write(broadcast.mkStatusReport());
  429. }
  430. out.write("<job_summary completed='" + getCompletedJobCount() + "' ready='" + getReadyJobCount() + "' active='" + getActiveJobCount() + "'/>");
  431. out.write("</" + tag + ">");
  432. }
  433. }
  434. public int getReadyJobCount()
  435. {
  436. int readyCount = 0;
  437. synchronized(broadcasts)
  438. {
  439. for (Broadcast broadcast : broadcasts.values())
  440. {
  441. readyCount += broadcast.getReadyJobCount();
  442. }
  443. }
  444. return readyCount;
  445. }
  446. public int getActiveJobCount()
  447. {
  448. int activeCount = 0;
  449. synchronized(broadcasts)
  450. {
  451. for (Broadcast broadcast : broadcasts.values())
  452. {
  453. activeCount += broadcast.getActiveJobCount();
  454. }
  455. }
  456. return activeCount;
  457. }
  458. public int getCompletedJobCount()
  459. {
  460. int additionalCompletedJobCount = 0;
  461. synchronized(broadcasts)
  462. {
  463. for (Broadcast broadcast : broadcasts.values())
  464. {
  465. additionalCompletedJobCount += broadcast.getCompletedJobCount();
  466. }
  467. }
  468. return completedJobCount + additionalCompletedJobCount;
  469. }
  470. public void removeBroadcast(String broadcastId)
  471. {
  472. CommonLogger.activity.info("Removing broadcast " + broadcastId);
  473. synchronized(broadcasts)
  474. {
  475. broadcasts.remove(broadcastId);
  476. }
  477. }
  478. public boolean notInService()
  479. {
  480. return notInService;
  481. }
  482. /**
  483. * Decode http GET request for broadcast_id value
  484. * @param request
  485. * @return broadcast_id
  486. */
  487. private String getBroadcastId(HttpServletRequest request)
  488. {
  489. return request.getParameter("broadcast_id");
  490. }
  491. /**
  492. * Invoked by servlet container when servlet is destroyed.
  493. */
  494. public final void destroy()
  495. {
  496. System.out.println(engineName + " destroyed");
  497. // Shutdown threads that periodically purge stale broadcasts.
  498. scheduler.shutdownNow();
  499. // Kill threads in each PostBack, which is remembered in postBackMap.
  500. for (PostBack postback : postBackMap.values())
  501. {
  502. postback.terminate();
  503. }
  504. for (Broadcast broadcast : broadcasts.values())
  505. {
  506. broadcast.terminate(BroadcastState.ABORTED, "Platform termination");
  507. }
  508. destroyChild();
  509. super.destroy();
  510. }
  511. /**
  512. * Indirectly invoked by servlet container during servlet initialization.
  513. */
  514. abstract protected void initChild();
  515. /**
  516. * Indirectly invoked by servlet container during destruction of servlet.
  517. */
  518. abstract protected void destroyChild();
  519. public PostBack getPostBack(String postBackURL, String broadcastType)
  520. {
  521. if (postBackURL == null) return null;
  522. PostBack postBack = postBackMap.get(postBackURL);
  523. if (postBack != null) return postBack;
  524. postBack = new PostBack(postBackURL, broadcastType + "_status",
  525. postbackMaxQueueSize, postbackSenderPoolSize, postbackMaxBatchSize);
  526. postBackMap.put(postBackURL, postBack);
  527. return postBack;
  528. }
  529. public EngineResources getResources()
  530. {
  531. return resources;
  532. }
  533. public void addBroadcast(String broadcastId, Broadcast broadcast)
  534. {
  535. if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++;
  536. broadcasts.put(broadcastId, broadcast);
  537. }
  538. /**
  539. * If broadcast has no id, one will be created for it.
  540. * @param broadcast
  541. */
  542. public void installBroadcast(Broadcast broadcast)
  543. {
  544. String broadcastId = broadcast.getBroadcastId();
  545. if (broadcastId == null) broadcastId = "Unknown" + unknownBroadcastIdNdx++;
  546. broadcasts.put(broadcastId, broadcast);
  547. }
  548. public int getServiceThreadPoolSize()
  549. {
  550. return serviceThreadPoolSize;
  551. }
  552. }