Sample Code for Parser

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > SQLstream Software Development Kit (SDK) > Writing an Extensible Common Data Framework Plugin >

Sample Code for Parser

Previous pageReturn to chapter overviewNext page

The following code creates a parser for Key Value Pairs. It is useful as an example of how to write a parser plugin The topic Extensible Common Data Framework: Parsing Key Value Pairs describes the functionality of this plugin.

/*

// Copyright (C) 2016-2017 SQLstream, Inc.

*/

package com.sqlstream.aspen.namespace.keyvalue;

 

import java.io.IOException;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Properties;

import java.util.Set;

import java.util.logging.Level;

 

import com.sqlstream.aspen.namespace.common.BuffersInputStream;

import com.sqlstream.aspen.namespace.common.CommonRowParser;

import com.sqlstream.aspen.namespace.common.EcdaPluginFactory;

import com.sqlstream.aspen.namespace.common.OptionsUtils;

import com.sqlstream.aspen.namespace.common.TypeParser;

 

/**

* KeyValuePArser is a parser for KeyValue data. It parses a record at a time

* from a stream of bytes, reading up to the next line delimiter. The line is

* split into fields by the column delimiter, unless protected by quotes. (The

* quote and both delimiters are attruibutes. By default " and ,). Then the

* field values are parsed from byte-strings to scalar values and inserted into

* the next row of the target table or stream.

*

* For the sake of speed the code avoids making java objects. In particular, the

* current line is not a java String or a java byte array, but simply a range of

* contiguous bytes in a larger byte array. The same holds for each field to be

* parsed.

*

**/

public class KeyValueParser extends CommonRowParser {

 

   private final static String KEY_VALUE_SEPARATOR_CHARACTER = "KEY_VALUE_SEPARATOR_CHARACTER";

 

   protected final static String [] REQUIRED_PROPERTIES = {

   };

   protected final static String [] OPTIONAL_PROPERTIES = {

       ROW_SEPARATOR_CHAR_KEY, SEPARATOR_CHAR_KEY, PARSER_SKIP_HEADER_KEY,

       COLUMN_QUOTE_CHARACTER, QUOTED_COLUMNS, KEY_VALUE_SEPARATOR_CHARACTER

   };

   private BytePointer[] columns;

   private ArrayList<BytePointer> unmatchedKeys;

 

   private byte[] rowDelimiter;

   private byte[] colDelimiter;

   private byte[] quotes;

   private byte[] keyValueSeparator;

   private int quotesLength = 0;

   private int keyValueSeparatorLength;

   private boolean rowDelimiterSpecified;

 

   protected TypeParser[] rowParsers;

   protected BuffersInputStream.Line currentLine; // without line delimiter

 

   private boolean rowGenerated;

   private int rowCount = 0;

 

   private static BytePointer key;

 

   class Impl implements Runnable {

 

       public void run() {

           long rowStartPosition = 0;

           try {

               tracer.log(Level.FINE, "Started parsing the stream");

               while (inputStream.getLine(currentLine, rowDelimiter)) {

                   parseRow(rowStartPosition);

                   rowStartPosition = inputStream.getPosition();

               }

               tracer.log(Level.FINE, "Finished parsing the stream");

           } catch (Throwable e) {

               tracer.log(Level.SEVERE, "Exiting parser", e);

           } finally {

               context.close();

           }

       }

 

   }

 

   public KeyValueParser() {

       super();

   }

 

   /**

    * Initializes this formatter and throws an exception if something is

    * underspecified

    */

   public void init(Properties props) throws Exception {

       super.init(props);

       OptionsUtils.filterInitProperties(

           props, REQUIRED_PROPERTIES, OPTIONAL_PROPERTIES, initProps);

       key = new BytePointer();

       // Record separator. By default is a \n character

       String rowSeparator = props.getProperty(ROW_SEPARATOR_CHAR_KEY);

       rowDelimiterSpecified = (rowSeparator != null);

       if (rowSeparator == null) {

           rowSeparator = "\n";

       }

       rowDelimiter = rowSeparator.getBytes(charset);

 

       // Field (key=value) separator. By default is a comma (,)

       String colSeparator = props.getProperty(SEPARATOR_CHAR_KEY);

       if (colSeparator == null) {

           colSeparator = ",";

       }

       colDelimiter = colSeparator.getBytes(charset);

 

       initDefaultTypeParsers(props, initProps);

       rowParsers = initRowParsers();

 

       // The quote character. By defautl it is a double quote character"

       String quoteCharacter = props.getProperty(COLUMN_QUOTE_CHARACTER);

       String quotedColumns = props.getProperty(QUOTED_COLUMNS);

       if (null == quoteCharacter && quotedColumns != null) {

           quoteCharacter = "\"";

       }

       if (quoteCharacter != null) {

           quotes = quoteCharacter.getBytes(charset);

       } else {

           quotes = "\"".getBytes(charset);

       }

       quotesLength = quotes.length;

 

       // The character that separates a key from a value. By default is a =

       // It can be configured using the KEY_VALUE_SEPARATOR_CHARACTER property

       String keyValueSeparatorCharacter = props.getProperty(KEY_VALUE_SEPARATOR_CHARACTER);

       if (null == keyValueSeparatorCharacter) {

           keyValueSeparatorCharacter = "=";

       }

 

       keyValueSeparator = keyValueSeparatorCharacter.getBytes(charset);

       keyValueSeparatorLength = keyValueSeparator.length;

 

   }

 

   /**

    * Parses a single record

    *

    * @param rowStartPosition

    * @throws Exception

    */

   public void parseRow(long rowStartPosition) throws Exception {

       rowGenerated = false;

       final int rowLength = currentLine.length();

       key.setBytes(currentLine.buffer);

       int a = currentLine.buffer.length;

       assert (rowLength >= 0);

       try {

           traceLine(Level.FINEST, currentLine);

 

           // String s = new String (currentLine.buffer, charset);

           // Brief explanation of the index variables

           // assuming a a row with a "key=value" field, where the index of k=0

           // and the index of the last 'e' is 8 then

           // startKey = 0

           // endKEy = 2

           // startValue = 4

           // endValue = 8

           // colStart = 0

           // colEnd = 8

 

           int rowEnd = rowLength;

           int colStart = 0, colEnd = 0; // offsets in currentLine

           boolean sawComma = false;

 

           boolean finish = false;

 

           // parse the fields until it reaches the end of the line

           while (!finish) {

             

               // find next field in the line (exclude delimiters)

               colStart = colEnd;

               if (colStart < rowEnd) {

                   if (sawComma) {

                       colStart += colDelimiter.length;

                   }

                   colEnd = currentLine.findBytes(colStart, colDelimiter);

                   if (colEnd < 0) {

                       // it's the last key=value in the row

                       colEnd = rowEnd;

                       sawComma = false;

                       finish = true;

                   } else {

                       sawComma = true;

                   }

               }

               int startKey = colStart;

               int endKey = currentLine.findBytes(startKey, colEnd, keyValueSeparator);

 

               int startValue = endKey + keyValueSeparatorLength;

               int endValue = colEnd;

 

               // key = new BytePointer(startKey, endKey, currentLine.buffer);

 

               key.setLimits(startKey, endKey, currentLine.start);

               //

 

               // parse the text from colStart to colEnd

               try {

                                   //traceField(Level.FINEST, null, startKey, endKey, currentLine, colStart, colEnd);

                   TypeParser parser = null;

                   int iCol = 0;

                   for (iCol = 0; iCol < numColumns; iCol++) {

                       if (columns[iCol].equals(key)) {

                           parser = rowParsers[iCol];

                           iCol++;

                           break;

                       }

                   }

 

                   // this condition check if the current key is present in the

                   // table,

                   // if not is is traced (only once for each new key) and the

                   // loop

                   // skips this iteration

                   if (parser == null) {

                       // unmatched key

                       boolean contains = false;

                       for(BytePointer unmatchedKey : this.unmatchedKeys){

                           if(unmatchedKey.equals(key)){

                               contains = true;

                           }

                       }

                     

                       if (!contains) {

                           BytePointer unmatchedKey = (BytePointer)key.clone();

                           this.unmatchedKeys.add(unmatchedKey);

                           tracer.log(Level.WARNING, "Unmatched key: " + unmatchedKey.toString());

                       }

 

                       continue;

                   }

 

                   // Checks if this value starts with a quote character

                   if (atQuote(startValue)) {

                       // strip off quotes; but what about string "" vs

                       // string null??

                       int start = startValue + quotesLength;

                       int end = currentLine.findBytes(start, endValue, quotes);

                       if (end < 0) {

                           end = colEnd; // expect a parse err with a msg

                       }

                       traceField(Level.FINEST, "trimmed ", startKey, endKey, currentLine, start, end);

                       parser.parse(currentLine.buffer, currentLine.start + start, currentLine.start + end, inputStmt,

                               iCol);

                   } else {

                       parser.parse(currentLine.buffer, currentLine.start + startValue, currentLine.start + endValue,

                               inputStmt, iCol);

                   }

 

               } catch (Exception e) {

                   setParserPosition(rowStartPosition - messageStartPosition + colEnd);

                   String msg = e.getMessage() + " while parsing field " + key.toString() + " at line " + getParserLineNumber()

                           + " of " + inputStream.locationDescription();

                   setParserError(msg);

                   tracer.log(Level.WARNING, msg, e);

               }

           }

           rowGenerated = true;

       } finally {

           if (rowGenerated) {

               submitRow(tracer);

           }

           incParserLineNumber();

       }

 

   }

 

   /**

    * Logs the given line to the tracer

    *

    * @param level

    * @param line

    */

   private void traceLine(Level level, BuffersInputStream.Line line) {

       if (tracer.isLoggable(level)) {

           StringBuilder sb = new StringBuilder();

           sb.append("row ").append(rowCount).append(" location ").append(inputStream.locationDescription())

                   .append(" line ").append(getParserLineNumber()).append(": ").append(line.asString());

           tracer.log(level, sb.toString());

       }

   }

 

   /**

    * Logs a single field (key=value) to the tracer

    *

    * @param level

    * @param prefix

    * @param startKey

    * @param endKey

    * @param line

    * @param startValue

    * @param endValue

    * @throws SQLException

    */

   private void traceField(

       Level level, String prefix, int startKey, int endKey, BuffersInputStream.Line line,

       int startValue, int endValue)

       throws SQLException

   {

       if (tracer.isLoggable(level)) {

           StringBuilder sb = new StringBuilder();

           if (prefix != null)

               sb.append(prefix);

           sb.append(" ").append(line.asString(startKey, (endKey - startKey))).append(" parse: ")

                   .append(line.asString(startValue, (endValue - startValue)));

           tracer.log(level, sb.toString());

       }

   }

 

   /**

    * Initializes the required row parsers

    *

    * @return

    * @throws Exception

    */

   private TypeParser[] initRowParsers() throws Exception {

       Set<String> alreadyMapped = context.getMappedSpecialColumns();

       this.columns = new BytePointer[numColumns];

       this.unmatchedKeys = new ArrayList<>();

       ArrayList<TypeParser> parsers = new ArrayList<>();

 

       for (int i = 1; i <= numColumns; i++) {

           byte[] fieldName = metaData.getFieldName(i).getBytes();

 

           this.columns[i - 1] = new BytePointer(0, fieldName.length, fieldName);

           if (alreadyMapped.contains(fieldName)) {

               parsers.add(null);

               continue;

           }

           TypeParser customParser = customParsers.get(fieldName);

           if (customParser != null) {

               if (logLevel.isTraceFine()) {

                   tracer.fine("custom parser " + customParser);

               }

               parsers.add(customParser);

           } else {

               parsers.add(getTypeParser(i));

           }

       }

       if (logLevel.isTraceFinest()) {

           tracer.finest("row parsers size " + parsers.size());

           tracer.finest("using type parsers " + parsers);

       }

       return parsers.toArray(new TypeParser[0]);

   }

 

   /**

    * Checks if an input field start with a quote

    *

    * @param valueStart

    *            the index of the beginning of the value

    * @return true if this value starts with a quote character

    * @throws IOException

    */

   private boolean atQuote(int valueStart) throws IOException {

       if (quotes == null)

           return false;

       for (int i = 0; i < quotesLength; ++i) {

           byte b = currentLine.buffer[currentLine.start + valueStart + i];

           if (b != quotes[i]) {

               return false;

           }

       }

       return true;

   }

 

   @Override

   public void closeAllocation() {

       tracer.fine("final rowCount=" + rowCount);

       super.closeAllocation();

   }

 

   @Override

   public void start() {

       Impl impl = new Impl();

       inputStream = context.getInputStream();

       if (quotes != null) {

           inputStream.setQuoteCharacters(quotes);

       }

       currentLine = inputStream.newLine();

       Thread thread = new Thread(impl);

       thread.start();

   }

 

   public static class PluginFactory extends EcdaPluginFactory {

       /**

        * Installs the parser

        */

       public void installPlugins() {

           installParser("KV", KeyValueParser.class);

           tracer.log(Level.FINE, "Installed KeyValue parser");

       }

   }

 

   /**

    *

    * A BytePointer is a type that contains a pointer to a byte[] and a start

    * and end indexes. The BytePointer is used to compares values of the stream

    * (requires less computation power than Strings)

    *

    */

   class BytePointer {

       private int start;

       private int end;

       private byte[] bytes;

 

       public BytePointer() {

           super();

           this.start = 0;

           this.end = 0;

           this.bytes = null;

       }

 

       public BytePointer(int start, int end, byte[] bytes) {

           super();

           this.start = start;

           this.end = end;

           this.bytes = bytes;

       }

 

       public void setLimits(int start, int end, long rowStartPosition) {

         

           this.start = start + (int)rowStartPosition;

           this.end = end + (int)rowStartPosition;

         

       }

 

       /**

        * hashCode implementation similar to the String hashCode

        */

       @Override

       public int hashCode() {

           int result = 0;

           int n = end - start;

           int i = 1;

           for (int index = start; index < end; index++) {

 

               result += (bytes[index] * Math.pow(31, n - i));

               i++;

           }

 

           return result;

       }

 

       /**

        * This equals compares two bytePointers. In order to be equal, they

        * muys have the same length (end-start) and the same bytes in this

        * interval

        */

       @Override

       public boolean equals(Object obj) {

           try{

           BytePointer other = (BytePointer) obj;

         

           if ((this.end - this.start) != (other.getEnd() - other.getStart())) {

               return false;

           }

           byte[] otherByteArray = other.getBytes();

           int otherStart = other.getStart();

           int len = end - start;

 

           for (int i = 0; i < len; i++) {

               if (this.bytes[i + start] != otherByteArray[i + otherStart]) {

                   return false;

               }

           }

           return true;

           }catch(Exception e){

               e.printStackTrace();

           }

           return false;

       }

 

       @Override

       protected Object clone() throws CloneNotSupportedException {

         

           return new BytePointer(0, end-start, Arrays.copyOfRange(bytes, start, end));

         

       }

     

     

       // getters and setters

 

 

       @Override

       public String toString() {

           byte[] byteStr = Arrays.copyOfRange(bytes, start, end);

           return new String(byteStr, charset);

       }

 

       public int getStart() {

           return start;

       }

 

       public void setStart(int start) {

           this.start = start;

       }

 

       public int getEnd() {

           return end;

       }

 

       public void setEnd(int end) {

           this.end = end;

       }

 

       public byte[] getBytes() {

           return bytes;

       }

 

       public void setBytes(byte[] bytes) {

           this.bytes = bytes;

       }

 

   }

 

}

// End KeyValueParser.java