Writing an Extensible Common Data Framework Plugin

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > SQLstream Software Development Kit (SDK) >

Writing an Extensible Common Data Framework Plugin

Previous pageReturn to chapter overviewNext page

This topic provides information on creating new plugins to process data for s-Server using the Extensible Common Data Framework. Plugins are written in Java and require knowledge of Java programming to develop. After you write them, plugins need to be installed.

Note: Currently, you can write plugins to read and parse data from external sources. We do not support writing plugins to write data to external sources.

The directory $SQLSTREAM_HOME/examples/sdk has two scripts to be used in installing the plugin. The directory $SQLSTREAM_HOME/doc/SDK-API has a Javadoc.

This topic contains the following sections:

Introduction
Setting Up a New ECD Plugin
Writing a New ECD Plugin
Managing Properties for the Plugin
Writing a Source Plugin
Writing a Parser Plugin
Installing a New ECD Plugin

Introduction

The Extensible Common Data Framework lets you independently write a plugin that handles a currently unsupported data source or data format. You can then use these plugins to create foreign data sources in SQL, set up foreign streams which can be queried, analyzed, merged with native streams or tables, archived in an RDBMS source, or otherwise manipulated. (For a list of supported sources and formats, see the topics Reading from Other Sources and Writing to Other Destinations in this guide.)

With parsers, we are concerned with transforming data into row-column pairs. Parsers first validate the data and the parse data out to match columns defined in a foreign stream.
With data sources, we are primarily concerned with delivering buffers and buffer metadata (information about where the data originated, such as sequence number or names of files). This metadata is entered into s-Server streams so that you can later poll it.

Note: Currently, only parsers and sources are supported for writing your own ECD plugin. For help with writing a custom writer or sink, please contact SQLstream technical support.

There are two code examples provided:

Sample Code for Parser provides an example of a parser plugin built for Google Protocol Buffers.
Sample Code for Source Plugin  provides an example of a source plugin built for Kafka.

Setting Up the ECD Plugin

The first part of developing an ECDA plugin involves setting up a Maven project for your plugin. For more information on Maven, see https://maven.apache.org/

Using Maven, create your project under the $SQLSTREAM_HOME/examples/sdk/ directory.

As a parent pom, use $SQLSTREAM_HOME/examples/sdk/pom.xml.

Invoke $SQLSTREAM_HOME/examples/sdk/install.sh to install artifacts.

You will most likely want to modify one of the pom files in $SQLSTREAM_HOME/examples/sdk/kafka or $SQLSTREAM_HOME/examples/sdk/keyvalue.

A sample pom file might look like the following:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<version>5.0.1-SNAPSHOT</version>

<groupId>com.sqlstream.ecda</groupId>

<artifactId>ecda</artifactId>

<name>ECDA Adapters for SQLStream</name>

<url>http://www.sqlstream.com/</url>

<modules>

  <module>ecda-test</module>

  <module>kafka</module>

  <module>keyvalue</module>

</modules>

<packaging>pom</packaging>

<repositories>

  <repository>

    <id>Maven Central repository</id>

    <url>http://repo.maven.apache.org/maven2/</url>

  </repository>

  <!-- Required until the Servlet 3.0 API can be resolved

      in Central -->  

  <repository>

    <id>Glassfish</id>

    <name>Glassfish Maven2 Repository</name>

    <url>http://download.java.net/maven/glassfish/</url>

  </repository>

</repositories>

 

<<dependencies>

  <dependency>

    <groupId>com.sqlstream.aspen</groupId>

    <artifactId>aspen-core</artifactId>

    <version>5.0.1-SNAPSHOT</version>

    <scope>provided</scope>

  </dependency>

  <dependency>

    <!-- net/sf/farrago/farrago/1.0/ -->

    <groupId>net.sf.farrago</groupId>

    <artifactId>farrago-core</artifactId>

    <version>0.9.4-SNAPSHOT</version>

    <scope>provided</scope>

  </dependency>

</dependencies>

<properties>

  <plugin.factory>PluginFactory</plugin.factory>

</properties>

</project>

 

Writing the ECDA Plugin

To write a plugin, begin by creating a subclass of com.sqlstream.aspen.namespace.common.EcdaPluginFactory.

In this subclass, invoke one or more of the following classes:

public static void installSource(String name, Class<? extends DataInputSource> clazz);
public static void installParser(String name, Class<? extends DataInputParser> clazz);
public static void installFactoryPlugin(String name, Class<? extends DataFactoryPlugin> clazz);

For example, for an ECDA parser, you might use code along the following lines:

public class ProtoBufParser implements DataInputParser

 

EcdaPluginFactory

For any Extensible Common Data plugin, you need to reference the class EcdaPluginFactory, and make a reference to this subclass in POM. When you install the plugin, s-Server looks through the manifest for the ECDA plugin factory.

The plugin factory subclass also names the plugin. You invoke this name in SQL through either the FORMAT TYPE parameter (parsers) or SERVER TYPE parameter (sources).

For example, the following code names a parser "PROTOBUF".

public static class PluginFactory extends EcdaPluginFactory {

       public void installPlugins() {

           installParser("PROTOBUF", ProtoBufParser.class);

 

Managing Properties

For all plugins, you need to manage properties for the plugin. These are accessible in SQL and handle how the plugin works, in terms of, for example, collecting metadata or handling offsets. You do so using the OptionsUtils.filterInitProperties method. You should use this method to validate properties and throw a user-understandable exception on error.

getInitProperties should return the subset of properties that are interesting for this data source. getInitProperties is inherited from the interface com.sqlstream.aspen.core.RuntimeObject

In init(), you can use the following helper method for this purpose

initProps = OptionsUtils.filterInitProperties(

             props, requiredParameters, optionalParameters, initProps);

 

Writing a Source Plugin

Source plugins deliver buffers and metadata about buffers. Metadata is generally information about where the data originated, and might include, for example, sequence numbers or file names. To write a source plugin you need to do the following.

extend the class SourceData
extend the class SourceDataFactory.
implement the class DataInputSource

For more information on all three classes, see the SQLstream Java doc, available here.

Sources generally deliver data in one of two ways:

1.Message-based, which have breaks between messages or buffers. Kafka is an example of such a source.
2.Non message-based, which have no natural breaks between messages. Network sockets or the file system both work this way.

Your source type determines how you implement the plugin.

You should use these implementations and extensions in the following way:

SourceDataFactory

Override BufferInfo() boilerplate for source data subclass.

In the constructor, lift names of fields that are interesting to the source plugin.

For Multiple Message Submissions (Like Kafka)

For sources where you need message based sources with multiple buffers/message submissions per work unit, use the following code model for SourceDataFactory. Use {@link BuffersInputStream.Connection#getCurrent()} and {@link BuffersInputStream.Connection#submit()} for each work unit.

   /**

    * @param supportedColumnNames Supported metadata Column names

    * @param context context of DataInputSource

    * @param numWorkUnits number of work units to allow queuing in BuffersInputStream

    * @param workSize Maximum number of bytes per work unit

    * @param sharedBuffer  Use shared buffer for SourceDatas within work unit

    * @throws SQLException

    */

   protected SourceDataFactory(

       String [] supportedColumnNames, EcdaReaderContext context,

       int numWorkUnits, int workSize, boolean sharedBuffer)

       throws SQLException

   {

       this(supportedColumnNames, context, numWorkUnits, 0, workSize, sharedBuffer, false);

   }

For Single Message Submissions (Like a Log File)

For continuous sources where you need message based sources with a single buffer/message submission per work unit, use the following code model for SourceDataFactory. Use {@link BuffersInputStream.Connection#getCurrent()} and {@link BuffersInputStream.Connection#submit()} for each work unit.

   /**

    * @param supportedColumnNames Supported metadata Column names

    * @param context context of DataInputSource

    * @param numBuffers number of buffers to allow queuing in BuffersInputStream

    * @throws SQLException

    */

   protected SourceDataFactory(

       String [] supportedColumnNames, EcdaReaderContext context,

       int numBuffers)

       throws SQLException

SourceData

Override updateLocation to update any metadata columns associated with messages (such as sequence numbers). Use the columnNumbers array to know what columns to update. The columnNumbers array is set based on the list of metadata column names passed to the super class in the constructor of the SourceDataFactory subclass.

For sources that deliver messages using more than one buffer, override the methods isStartOfMessage and isEndOfMessage.

Use LocationDescription to debug (or in other cases where you need to know the current location in the data).

In the constructor, allocate buffer if you are using SourceData to manage the buffer.

If SourceData does not manage buffer then it's a good idea to override resetBeforeFree to set buffer to null.

DataInputSource

start() should spawn a thread if needed and process incoming messages.

init() processes properties, finds the ones that are interesting, and validates them.

closeAllocation() should free any resources and set canceled to true.

You then should write a loop along one of the two following lines:

For Multiple Message Submissions (Like Kafka)

public void run()

   {

    BuffersInputStream inputStream = context.getInputStream();

-- If important for client libraries

Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

    try (BuffersInputStream.Connection<SourceDataSubclass> connection = inputStream.getConnection()) {

      while (!canceled) {

      try {

          ByteBuffer buf = dataSupplier.getBufferIfAvailable();

       if (buf == null) {

           connection.flush();

           dataSuplier.waitForDataToBeAvailable() // or could sleep for a short time.

           continue;

       }

 

SourceDataSubclass dataSource = connection.getCurrentWithSize(buf.remaining());

 

         // when using a shared buffer -

 

         ByteBuffer savedBuf = dataSource.getBuffer();

         buf.get(savedBuf.array(), savedBuf.position(), buf.remaining());

 

         dataSource.setSequenceNumber(dataSupplier.getSequenceNumber);

 

          connection.submit();

            } catch(Exception e) {

              if (e.isRecoverable()) {

              tracer.log(...);

              connection.reset(); -- only needed if there is more than one                                                                                  

                                  -- connection.

               } else {

                 throw...

                   }

               }

           }

       } catch (Throwable e) {

           tracer.log(Level.SEVERE, "Exception in reader", e);

       } finally {

           tracer.config("exiting reader");

           inputStream.sendEOS();

       }

   }

For Single Message Submissions (Like a Log File)

public void run()

   {

    BuffersInputStream inputStream = context.getInputStream();

-- If important for client libraries

Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

    try (BuffersInputStream.Connection<SourceDataSubclass> connection = inputStream.getConnection()) {

      while (!canceled) {

      try {

          ByteBuffer buf = dataSupplier.getBuffer();

          SourceDataSubclass dataSource = connection.getCurrent();

          dataSource.setBuffer(buf);

dataSource.setSequenceNumber(dataSupplier.getSequenceNumber);

          connection.submit();

            } catch(Exception e) {

              if (e.isRecoverable()) {

              tracer.log(...);

              connection.reset(); -- only needed if there is more than one                                                                                  

                                  -- connection.

               } else {

                 throw...

                   }

               }

           }

       } catch (Throwable e) {

           tracer.log(Level.SEVERE, "Exception in reader", e);

       } finally {

           tracer.config("exiting reader");

           inputStream.sendEOS();

       }

   }

Writing a Parser Plugin

Parser plugins receive data, validate this data, and parse the data into matched columns. The particulars of how you write this code will depend on your data source, but you can get ideas about it by looking at the Sample Code for ECD Parser Plugin.

To write a parser plugin, you need to implement DataInputParser.

The list of columns to parse are in context.getMetaData().

context.getMappedSpecialColumns() has set of column names that have already been filled out by the source and should not be parsed

Use code along the following lines to save the passed in context (same as source plugin):

public void setContext(EcdaReaderContext context);

public void setInserter(RowInserter inserter) throws SQLException;

 

Use code along the following lines to set Executorservice (if necessary):

public void setExecutorService(ExecutorService service);

 

Use code along the following lines to return the Charset expected by parser (this could be specified by a property). Binary parsers should return null.

public Charset getCharset();

 

When using public void start() throws SQLException; this should not clear all parameters in RowInserter, but should instead only clear parameters set by parser.

All parser plugins process data either as non-streaming or streaming. It is possible to write a plugin that does either depending on the context.

Non-streaming data is processed by line or chunk, and is implemented with a code block along the following lines:

   class NonStreamingReaderImpl implements Runnable

 --Nonstreaming vs. streaming applies to all parsers,

      --but the either one or the other case presented here

      --does not.

 {

       public void run()

       {

           final BuffersInputStream inputStream = context.getInputStream();

           final BuffersInputStream.Line currentLine = inputStream.newLine(); -- without line delimiter

           try {

               if (nonRepeatedProtoReader != null) {

                   nonRepeatedProtoReader.reset(inputStmt);

               }

               if (repeatedProtoReader != null) {

                   repeatedProtoReader.reset(inputStmt);

               }

               while (inputStream.getLine(currentLine, null) && !canceled) {

                --keep going as long as neither of two things below happen.

               --Iterate over a record at a time. Get line in this case = get whole thing

                            try {

                       if (nonRepeatedProtoReader != null) {

                           CodedInputStream in = CodedInputStream.newInstance(currentLine.buffer, currentLine.start, currentLine.length());

                           nonRepeatedProtoReader.processRecord(in, inputStmt, messageSeparator, messageLengthPrefixed);

                                                 --protobuf input specific

                       }

                       if (repeatedProtoReader != null) {

                           CodedInputStream in = CodedInputStream.newInstance(currentLine.buffer, currentLine.start, currentLine.length());

                           repeatedProtoReader.processRecord(in, inputStmt, messageSeparator, messageLengthPrefixed);

                           if (nonRepeatedProtoReader != null) {

                               nonRepeatedProtoReader.reset(inputStmt);

                           } else{

                               repeatedProtoReader.reset(inputStmt);

                           }

                       }

                   } catch (Exception e) {

                       tracer.log(Level.WARNING, "Error while parsing protobuf stream", e);

                   }

               }

           } catch (Exception e) {

               tracer.log(Level.SEVERE, "Error while reading protobuf stream", e);

           } finally {

         --no matter what goes on, once you leave this

             --section of code, call context.close().

         --Any parser should call this.

             --Otherwise we leave things in a bad state.

               context.close();

                         

           }

       }

   }

Streaming data is processed as it comes in, and is handled by a code block along the following lines:

class StreamingReaderImpl implements Runnable {

       public void run()

       {

           final BuffersInputStream inputStream = context.getInputStream();

           CodedInputStream in = CodedInputStream.newInstance(inputStream);

                 --Wraps it in new stream. Turns it into protobuf input stream.

                 

           ProtoMessageReader protoReader = nonRepeatedProtoReader != null

               ? nonRepeatedProtoReader : repeatedProtoReader;

           try {

               if (nonRepeatedProtoReader != null) {

                   nonRepeatedProtoReader.reset(inputStmt);

               }

               if (repeatedProtoReader != null) {

                   repeatedProtoReader.reset(inputStmt);

               }

               while (!canceled) {

                         --important pattern: try, but if you get error, skip message. Gives you place to start parsing again.

                   try {

                       protoReader.processRecord(

                           in,

                           inputStmt,

                           messageSeparator,

                           messageLengthPrefixed);

                   } catch (Exception e) {

                       try {

                           inputStream.skipCurrentMessage();

                           if (inputStream.isAtEos()) {

                               return;

                           }

                           in = CodedInputStream.newInstance(inputStream);

                       } catch (Exception e1) {

                           tracer.log(

                               Level.SEVERE,

                               "Error while reading protobuf stream",

                               e1);

                           return;

                       }

                       tracer.log(

                           Level.SEVERE,

                           "Error while reading protobuf stream",

                           e);

                   }

               }

           } catch (Exception e) {

               tracer.log(Level.SEVERE, "Error while reading protobuf stream", e);

                 --no matter what goes on, once you leave this

             --section of code, call context.close().

         --Any parser should call this.

             --Otherwise we leave things in a bad state.

                 

                 }finally {

               context.close();

           }

       }

   }

Installing the ECD Plugin

In your pom add the following property override:

<properties>

<plugin.factory>your.PluginFactorySubclassname</plugin.factory>

</properties>

 

Build the plugin by invoking the following code. The makeTarball.sh script is available here.

adapters/makeTarball.sh <plugin dir name>

 

Unpack this tarball into the directory $SQLSTREAM_HOME/plugins.

Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/5.0.XXX/s-Server.

In the created directory, you should find install.sql. To install your plugin, invoke sqllineClient (available in $SQLSTREAM_HOME/bin) with this script.