package altk.comm.engine; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.zip.ZipInputStream; import javax.servlet.http.HttpServletRequest; import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; import org.xml.sax.Attributes; import org.xml.sax.InputSource; import org.xml.sax.SAXException; import org.xml.sax.helpers.DefaultHandler; import altk.comm.engine.exception.BroadcastMsgException; import altk.comm.engine.exception.EngineException; import altk.common.engine.util.LoggingInputStreamReader; import altk.common.engine.util.UTF8BenevolentDecoder; public abstract class XMLSAXBroadcast extends Broadcast { protected class BaseParserHandler extends DefaultHandler { /** * Text collector used during SAX parsing */ StringBuffer characters; private String contact_id; private String activity_record_id; private Map recipientProp; private boolean inRecipient = false; private boolean inRecipientProp; /** * This method is called by the SAX parser when a text node * is encountered in the XML tree during parsing. */ @Override public void characters(char[] ch, int start, int length) { characters.append(ch, start, length); } @Override public void startElement(String uri, String localName, String qName, Attributes attributes) { // Initialize characters to start collecting characters = new StringBuffer(); if (qName.equals("recipient")) { // get attributes: contact_id, call_record_id contact_id = attributes.getValue("", "contact_id"); activity_record_id = attributes.getValue("", activityRecordIdParamName); // Prepare contact properties recipientProp = new HashMap(); inRecipient = true; inRecipientProp = false; } else if (qName.equals(broadcastType)) { // broadcast_id myLogger.debug("broadcast_id: " + attributes.getValue("", "broadcast_id")); setBroadcastId(attributes.getValue("", "broadcast_id")); // launch_record_id setLaunchRecordId(attributes.getValue("", "launch_record_id")); } else if (inRecipient) { inRecipientProp = true; } } @Override public void endElement(String uri, String localName, String qName) { if (qName.equals("recipient")) { Recipient recipient; try { recipient = new Recipient(contact_id, activity_record_id, recipientProp); recipientList.add(recipient); } catch (IllegalArgumentException e) { CommonLogger.alarm.warn("This contact not added (" + e.getMessage() + "): " + "' contact_id='" + contact_id + "' " + activityRecordIdParamName + "='" + activity_record_id + "'"); } } else if (qName.equals("async_status_post_back")) { postBackURL = getTrimmedText(); } else if (qName.equals("expire_time")) { expireTime = Long.parseLong(getTrimmedText()); // defaults to 20 minutes myLogger.debug("expire_time decoded to be " + expireTime); if (expireTime == 0) { expireTime = System.currentTimeMillis() + 20 * 60 * 1000; myLogger.debug("expire time adjusted to be " + expireTime); } } else if (inRecipientProp) { recipientProp.put(qName, getText()); inRecipientProp = false; } } /** * * @return char string collected in characters, but not trimmed */ protected String getText() { return characters.toString(); } /** * * @return char string collected in characters, and trimmed */ protected String getTrimmedText() { return characters.toString().trim(); } } public XMLSAXBroadcast(String broadcastType, String activityRecordIdParamName, String jobReportRootNodeName) { super(broadcastType, activityRecordIdParamName, jobReportRootNodeName); } @Override protected final void decode(HttpServletRequest request, boolean notInService) throws EngineException { try { // Check if content is zipped InputStream inb; String contentType = request.getContentType(); if (contentType != null && contentType.split("/")[1].equalsIgnoreCase("zip")) { ZipInputStream zip = new ZipInputStream(request.getInputStream()); zip.getNextEntry(); inb = zip; } else { inb = request.getInputStream(); } // Get a XML parser myLogger.info("Begin parsing"); SAXParser parser = SAXParserFactory.newInstance().newSAXParser(); long parseStart = System.currentTimeMillis(); int contentLength = request.getContentLength(); CommonLogger.activity.info("Receiving " + contentLength + " bytes of data"); // Convert InputStream inb into InputSource, using the UTF-8 decoder that // cleans content off bad UTF-8 characters. InputSource is = new InputSource(new LoggingInputStreamReader(inb, UTF8BenevolentDecoder.getDecoder(), CommonLogger.activity)); DefaultHandler parserHandler = getParserHandler(); parser.parse(is, parserHandler); long parseEnd = System.currentTimeMillis(); myLogger.info("Parsing took " + (parseEnd - parseStart)/1000 + " seconds"); } catch (ParserConfigurationException e) { String errorText = "While parsing: " + e.getMessage(); myLogger.warn(errorText, e); throw new BroadcastMsgException(e.getMessage(), e); } catch (SAXException e) { String errorText = "While parsing: " + e.getMessage(); myLogger.warn(errorText, e); throw new BroadcastMsgException(e.getMessage(), e); } catch (IOException e) { String errorText = "While parsing: " + e.getMessage(); myLogger.warn(errorText, e); throw new BroadcastMsgException(e.getMessage(), e); } catch (Exception e) { // catch any problem we may have missed String errorText = "While parsing: " + e.getMessage(); myLogger.warn(errorText, e); throw new BroadcastMsgException(e.getMessage(), e); } } protected abstract DefaultHandler getParserHandler(); }