| @@ -30,8 +30,19 @@ import altk.comm.engine.postback.PostBack; | |||||
| public abstract class Broadcast | public abstract class Broadcast | ||||
| { | { | ||||
| private static final int SCHEDULER_THREAD_POOL_SIZE = 5; | private static final int SCHEDULER_THREAD_POOL_SIZE = 5; | ||||
| private static final String ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT = "activity_record_id"; | |||||
| public final String broadcastType; | public final String broadcastType; | ||||
| private String broadcastId; | private String broadcastId; | ||||
| private BroadcastState state = BroadcastState.INSTALLING; | |||||
| String haltReason; | |||||
| String stateErrorText; | |||||
| public long changeStateTime; | |||||
| private int completedJobCount = 0; | |||||
| protected String activityRecordIdParamName; | |||||
| private String jobReportRootNodeName; | |||||
| /** | /** | ||||
| * Set when reading request XML, but never used. | * Set when reading request XML, but never used. | ||||
| @@ -192,13 +203,6 @@ public abstract class Broadcast | |||||
| } | } | ||||
| } | } | ||||
| private BroadcastState state = BroadcastState.INSTALLING; | |||||
| String haltReason; | |||||
| String stateErrorText; | |||||
| public long changeStateTime; | |||||
| private int completedJobCount = 0; | |||||
| protected class Service extends Thread | protected class Service extends Thread | ||||
| { | { | ||||
| Object serviceProvider; | Object serviceProvider; | ||||
| @@ -304,10 +308,17 @@ public abstract class Broadcast | |||||
| } | } | ||||
| } | } | ||||
| protected Broadcast(String broadcastType) | |||||
| /** | |||||
| * | |||||
| * @param broadcastType | |||||
| * @param activityRecordIdParamName - if null, default is used. | |||||
| * @param jobReportRootNodeName | |||||
| */ | |||||
| protected Broadcast(String broadcastType, String activityRecordIdParamName, String jobReportRootNodeName) | |||||
| { | { | ||||
| this.broadcastType = broadcastType; | this.broadcastType = broadcastType; | ||||
| this.activityRecordIdParamName = activityRecordIdParamName == null? ACTIVITY_RECORD_ID_PARAM_NAME_DEFAULT : activityRecordIdParamName; | |||||
| this.jobReportRootNodeName = jobReportRootNodeName; | |||||
| readyQueue = new LinkedBlockingQueue<Job>(); | readyQueue = new LinkedBlockingQueue<Job>(); | ||||
| serviceThreadPool = new ArrayList<Thread>(); | serviceThreadPool = new ArrayList<Thread>(); | ||||
| recipientList = new ArrayList<Recipient>(); | recipientList = new ArrayList<Recipient>(); | ||||
| @@ -704,8 +715,14 @@ public abstract class Broadcast | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| abstract protected JobReport mkJobReport(Job job); | |||||
| /** | |||||
| * Derived class may make its own Implementation of JobReport | |||||
| * @return | |||||
| */ | |||||
| protected JobReport mkJobReport() | |||||
| { | |||||
| return new JobReport(); | |||||
| } | |||||
| public void addJob(Job job) | public void addJob(Job job) | ||||
| { | { | ||||
| @@ -748,68 +765,6 @@ public abstract class Broadcast | |||||
| return state == BroadcastState.HALTED || state == BroadcastState.HALTING; | return state == BroadcastState.HALTED || state == BroadcastState.HALTING; | ||||
| } | } | ||||
| /* | |||||
| @Override | |||||
| public void run() | |||||
| { | |||||
| for (;;) | |||||
| { | |||||
| if (threadsShouldStop()) return; | |||||
| if (threadsShouldPause()) | |||||
| { | |||||
| try | |||||
| { | |||||
| resumeFlag.wait(); | |||||
| } | |||||
| catch (InterruptedException e) | |||||
| { | |||||
| myLogger.warn("Dispatcher thread interrupted while waiting to resume"); | |||||
| return; | |||||
| } | |||||
| } | |||||
| List<Job> batch = null; | |||||
| synchronized(readyQueue) | |||||
| { | |||||
| // get a batch of jobs | |||||
| Job job = readyQueue.poll(); | |||||
| if (job == null) | |||||
| try | |||||
| { | |||||
| readyQueue.wait(); | |||||
| continue; | |||||
| } | |||||
| catch (InterruptedException e) | |||||
| { | |||||
| return; | |||||
| } | |||||
| batch = new ArrayList<Job>(); | |||||
| batch.add(job); | |||||
| for (int i = 1; i < getJobBatchSize(); i++) | |||||
| { | |||||
| job = readyQueue.poll(); | |||||
| if (job == null) break; | |||||
| batch.add(job); | |||||
| } | |||||
| } | |||||
| if (batch != null) | |||||
| { | |||||
| try | |||||
| { | |||||
| processJobs(batch); | |||||
| } | |||||
| catch (EngineException e) | |||||
| { | |||||
| terminate(BroadcastState.ABORTED, e.getMessage()); | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| */ | |||||
| /** | /** | ||||
| * job status is reported back to this broadcast, via the logAndQueueForPostBack method. | * job status is reported back to this broadcast, via the logAndQueueForPostBack method. | ||||
| * @param batch | * @param batch | ||||
| @@ -839,7 +794,9 @@ public abstract class Broadcast | |||||
| { | { | ||||
| if (postBack != null) | if (postBack != null) | ||||
| { | { | ||||
| JobReport report = mkJobReport(job); | |||||
| JobReport report = mkJobReport(); | |||||
| report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); | |||||
| report.init(job); | |||||
| postBack.queueReport(report.toString()); | postBack.queueReport(report.toString()); | ||||
| } | } | ||||
| @@ -27,13 +27,9 @@ import altk.comm.engine.exception.PlatformError; | |||||
| import altk.comm.engine.exception.PlatformException; | import altk.comm.engine.exception.PlatformException; | ||||
| import altk.comm.engine.postback.PostBack; | import altk.comm.engine.postback.PostBack; | ||||
| @SuppressWarnings("serial") | |||||
| public abstract class CommEngine extends HttpServlet | public abstract class CommEngine extends HttpServlet | ||||
| { | |||||
| /** | |||||
| * | |||||
| */ | |||||
| private static final long serialVersionUID = 6887055442875818654L; | |||||
| { | |||||
| static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; | static final String REQUEST_TOP_ELEMENT_NAME_DEFAULT = "Request"; | ||||
| private static final int SCHEDULER_THREAD_POOL_SIZE = 1; | private static final int SCHEDULER_THREAD_POOL_SIZE = 1; | ||||
| @@ -393,7 +389,7 @@ public abstract class CommEngine extends HttpServlet | |||||
| { | { | ||||
| out.write(broadcast.mkStatusReport()); | out.write(broadcast.mkStatusReport()); | ||||
| } | } | ||||
| out.write("<job_summary completed='" + getCompletedJobCount() + "' active='" + getActiveJobCount() + "' ready='" + getReadyJobCount() + "'/>"); | |||||
| out.write("<job_summary completed='" + getCompletedJobCount() + "' ready='" + getReadyJobCount() + "' active='" + getActiveJobCount() + "'/>"); | |||||
| out.write("</" + tag + ">"); | out.write("</" + tag + ">"); | ||||
| } | } | ||||
| } | } | ||||
| @@ -1,18 +1,23 @@ | |||||
| package altk.comm.engine; | package altk.comm.engine; | ||||
| abstract public class JobReport | |||||
| public class JobReport | |||||
| { | { | ||||
| private static final String ACTIVITY_RECORD_ID_NAME = "activity_record_id"; | |||||
| public final long reportTime; | |||||
| public final String broadcastId; | |||||
| public final String launchRecordId; | |||||
| public final String contactId; | |||||
| public final String recordId; // id into table, e.g. call_record, sms_record, etc. | |||||
| public final String jobStatus; // Note: not enum | |||||
| public final String errorText; | |||||
| public long startTime; | |||||
| private String broadcastId; | |||||
| private String launchRecordId; | |||||
| private String contactId; | |||||
| private String recordId; // id into table, e.g. call_record, sms_record, etc. | |||||
| private String jobStatus; // Note: not enum | |||||
| private String errorText; | |||||
| private long startTime; | |||||
| private String activityRecordIdParamName; | |||||
| private String xmlRootNodeName; | |||||
| public JobReport(Job job, String broadcastId, String launchRecordId) | |||||
| protected JobReport() | |||||
| { | |||||
| } | |||||
| protected final void initBase(Job job, String broadcastId, String launchRecordId, | |||||
| String activityRecordIdParamName, String xmlRootNodeName) | |||||
| { | { | ||||
| if (broadcastId == null || broadcastId.length() == 0) | if (broadcastId == null || broadcastId.length() == 0) | ||||
| { | { | ||||
| @@ -25,24 +30,23 @@ abstract public class JobReport | |||||
| this.broadcastId = broadcastId; | this.broadcastId = broadcastId; | ||||
| this.launchRecordId = launchRecordId; | this.launchRecordId = launchRecordId; | ||||
| this.activityRecordIdParamName = activityRecordIdParamName; | |||||
| this.xmlRootNodeName = xmlRootNodeName; | |||||
| startTime = job.startTime; | startTime = job.startTime; | ||||
| contactId = job.recipient.contact_id; | contactId = job.recipient.contact_id; | ||||
| recordId = job.recipient.activity_record_id; | recordId = job.recipient.activity_record_id; | ||||
| jobStatus = job.jobStatus.toString(); | jobStatus = job.jobStatus.toString(); | ||||
| errorText = job.errorText; | errorText = job.errorText; | ||||
| reportTime = System.currentTimeMillis(); | |||||
| } | } | ||||
| /** | /** | ||||
| * @return "email" for example. | |||||
| * Derived class may initialize its own attributes. | |||||
| * @param job | |||||
| */ | */ | ||||
| abstract protected String getXMLRootNodeName(); | |||||
| /** | |||||
| * @return "email_record_id" for example. | |||||
| abstract protected String getActivityRecordIdname(); | |||||
| */ | |||||
| public void init(Job job) | |||||
| { | |||||
| } | |||||
| public String toString() | public String toString() | ||||
| { | { | ||||
| StringBuffer xml = new StringBuffer(); | StringBuffer xml = new StringBuffer(); | ||||
| @@ -52,9 +56,9 @@ abstract public class JobReport | |||||
| public final StringBuffer appendXML(StringBuffer xml) | public final StringBuffer appendXML(StringBuffer xml) | ||||
| { | { | ||||
| xml.append("<" + getXMLRootNodeName() + " broadcast_id=\"" + broadcastId | |||||
| xml.append("<" + xmlRootNodeName + " broadcast_id=\"" + broadcastId | |||||
| + "\" launch_record_id=\"" + launchRecordId | + "\" launch_record_id=\"" + launchRecordId | ||||
| + "\" " + getActivityRecordIdName() + "=\"" + recordId | |||||
| + "\" " + activityRecordIdParamName + "=\"" + recordId | |||||
| + "\" contact_id=\"" + contactId | + "\" contact_id=\"" + contactId | ||||
| + "\" recipient_status=\"" + jobStatus + "\" >\r\n"); | + "\" recipient_status=\"" + jobStatus + "\" >\r\n"); | ||||
| xml.append("<start_time>" + startTime/1000 + "</start_time>\r\n"); | xml.append("<start_time>" + startTime/1000 + "</start_time>\r\n"); | ||||
| @@ -65,27 +69,20 @@ abstract public class JobReport | |||||
| xml.append(errorText.replaceAll("&", "&").replaceAll("<", "<")); | xml.append(errorText.replaceAll("&", "&").replaceAll("<", "<")); | ||||
| xml.append("</error_text>\r\n"); | xml.append("</error_text>\r\n"); | ||||
| } | } | ||||
| xml.append("</" + getXMLRootNodeName() + ">"); | |||||
| xml.append("</" + xmlRootNodeName + ">"); | |||||
| return xml; | return xml; | ||||
| } | } | ||||
| /** | |||||
| * Derived class may override this method to re-define name of activity_record_id, | |||||
| * e.g. email_record_id, sms_record_id, etc. | |||||
| * @return | |||||
| */ | |||||
| protected String getActivityRecordIdName() | |||||
| { | |||||
| return ACTIVITY_RECORD_ID_NAME; | |||||
| } | |||||
| /** | /** | ||||
| * Append data to xml which is specific to the derived class. For example, email | * Append data to xml which is specific to the derived class. For example, email | ||||
| * address for EmailJobReport. | * address for EmailJobReport. | ||||
| * @param xml | * @param xml | ||||
| * @return | * @return | ||||
| */ | */ | ||||
| protected abstract StringBuffer appendSpecificXML(StringBuffer xml); | |||||
| protected StringBuffer appendSpecificXML(StringBuffer xml) | |||||
| { | |||||
| return xml; | |||||
| } | |||||
| } | } | ||||
| @@ -45,9 +45,16 @@ public abstract class XMLDOMBroadcast extends Broadcast | |||||
| protected DocumentBuilder builder; | protected DocumentBuilder builder; | ||||
| protected Element broadcastNode; | protected Element broadcastNode; | ||||
| protected XMLDOMBroadcast(String broadcastType) | |||||
| /** | |||||
| * | |||||
| * @param broadcastType | |||||
| * @param activityRecordIdParamName - if null, default is used | |||||
| * @param jobReportRootNodeName - job report root node name | |||||
| */ | |||||
| protected XMLDOMBroadcast(String broadcastType, String activityRecordIdParamName, | |||||
| String jobReportRootNodeName) | |||||
| { | { | ||||
| super(broadcastType); | |||||
| super(broadcastType, activityRecordIdParamName, jobReportRootNodeName); | |||||
| } | } | ||||
| /** | /** | ||||
| @@ -187,7 +194,7 @@ public abstract class XMLDOMBroadcast extends Broadcast | |||||
| myLogger.warn("Missing or empty contact_id for a recipient in broadcast " + getBroadcastId()); | myLogger.warn("Missing or empty contact_id for a recipient in broadcast " + getBroadcastId()); | ||||
| continue; | continue; | ||||
| } | } | ||||
| activity_record_id = recipientNode.getAttribute(getActivityRecordIdName()); | |||||
| activity_record_id = recipientNode.getAttribute(activityRecordIdParamName); | |||||
| if (activity_record_id == null || (activity_record_id = activity_record_id.trim()).length() == 0) | if (activity_record_id == null || (activity_record_id = activity_record_id.trim()).length() == 0) | ||||
| { | { | ||||
| throw new BroadcastMsgException("Missing or empty " + getActivityRecordIdName() + " attribute for a recipient in broadcast " + getBroadcastId()); | throw new BroadcastMsgException("Missing or empty " + getActivityRecordIdName() + " attribute for a recipient in broadcast " + getBroadcastId()); | ||||