Writing a UDX

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Transforming Data in s-Server >

Writing a UDX

Previous pageReturn to chapter overviewNext page

User Defined Transforms (UDX) are simply User Defined Functions (UDFs) that return a table or stream instead of a scalar value. SQLstream supports UDXs written in Java. Developing custom UDXs requires knowledge of Java programming. Once you write the and compile the UDX, you implement it in s-Server using the SQL statement CREATE FUNCTION. The EXTERNAL NAME clause of a CREATE FUNCTION statement associates the UDX with a public Java class. For more information, see Declaring a UDX below. That class must contain a public execute() method.

Note: This execute() method may raise a SQLException.

The execute() method must have one more argument than the UDX has parameters. A UDX typically accepts one or more cursors (drawn from java.sql.ResultSet) along with scalar values (optional) as input parameters. These cursors can also employ "streaming" result sets, using an extension of java.sql.ResultSet called com.sqlstream.jdbc.StreamingResultSet. The final argument of the execute() method must have type java.sql.PreparedStatement. For more information, see the topic Streaming ResultSet in this guide.

The leading arguments correspond, in order, to the UDX parameters. Such leading arguments can have the following types:

A scalar UDX parameter (in CREATE FUNCTION, this corresponds to a Java argument, as described in the CREATE FUNCTION topic of the Streaming SQL Reference Guide).
A non-streaming UDX cursor parameter corresponds to a Java argument of type java.sql.ResultSet.
A streaming UDX cursor parameter corresponds to a Java argument of type com.sqlstream.jdbc.StreamingResultSet.

An abstract class, com.sqlstream.plugin.impl.AbstractBaseUdx, is included in the file $SQLSTREAM_HOME/lib/aspen.jar.  This file is installed with s-Server. For best efficiency, we recommend building custom UDXs by extending this class. We provide an example of doing so below.

Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/5.0.XXX/s-Server. . The Javadoc for AbstractBaseUdx is located at $SQLSTREAM_HOME/doc/SQLstream-JDBC-API.zip.

Simple UDX implementation using java.sql.ResultSet & java.sql.PreparedStatement

Here is a simple example of a non-streaming UDX implementation that transfers column values from the input cursor (java.sql.PreparedStatement) to the output cursor (java.sql.ResultSet). This code will function as a UDX, but is not as efficient as extending AbstractBaseUDX, described below.

public static void execute(

   //copy input data to output prepared statement

   java.sql.ResultSet inputRows, int input1, String input2, double input3,

   java.sql.PreparedStatement  results)

    throws SQLException

   {

   while(inputRows.next()) {

        //read row from input

       try {

           int numColumns = inputRows.getMetaData().getColumnCount();

          for(int i = 1; i <= numColumns; i++) {results.setObject(i, inputRows.getObject(i));}

          // …

          // custom code that transforms data according to your specifications

          // …

       } catch (SQLException se) {

           tracer.warning(

               "Failed to execute function or set parameters for output");

       }

       results.executeUpdate();     // push the row out.

   }

}

 

Using a JAVA User Defined Transform (UDX) to Create Custom Windowed Aggregation Functions

The topic below describes how a JAVA User Defined Transform (UDX) can be used to create custom windowed aggregation functions that aggregate values on a "tumbling" window of rows over time.

SQL standard Aggregation syntax

SELECT STREAM

   FLOOR(s.ROWTIME TO MINUTE), "partCol1", "partCol2",

   "CustomAgg1"("aggCol") AS "aggResult1",

   "CustomAgg2"("aggCol2") AS "aggResult2"

FROM "aStream" AS s

GROUP BY FLOOR(s.ROWTIME TO MINUTE), "partCol1", "partCol2";

 

In the SQL example above, "CustomAgg1" & "CustomAgg2" are custom aggregation functions that a user needs to implement in JAVA. Subsequent paragraphs describe how you can use a JAVA UDX can be used to implement such custom aggregation.

First, we declare (create) the UDX that can be used to perform Windowed Aggregation

CREATE OR REPLACE FUNCTION "GroupingAggUDX"(

   "inputRows" CURSOR

   "aggColName1" VARCHAR(64), "partitionColName1" VARCHAR(64),

   "aggColName2" VARCHAR(64), "partitionColName2" VARCHAR(64),

   "rowtimeColName" VARCHAR(64), "windowSize" INTEGER)

   RETURNS TABLE (

       ROWTIME TIMESTAMP,

       "partCol1" VARCHAR(64),

       "partCol2" INTEGER,

       "aggResult1" DOUBLE,

       "aggResult2" DOUBLE

   )

   LANGUAGE JAVA

   PARAMETER STYLE SYSTEM DEFINED JAVA

   NO SQL

   EXTERNAL NAME '"SampleJar":com.sqlstream.sample.functions.Sample.SimpleUDX';

 

As of now, the semantics expressed in windowed aggregation syntax above can be implemented by invoking the sample UDX as described below with the UDX definition we created earlier.

SELECT STREAM ("GroupingAggUDX"(

   CURSOR(SELECT STREAM * FROM "aStream"),

    'aggCol1', 'partCol1', 'aggCol2', 'partCol2', 'ROWTIME', 60000));

 

The JAVA code below shows basic structure of how windowed aggregation semantics can be implemented

using function "context" to maintain "windows" of rows and aggregated results that are partitioned by partition column.

import com.sqlstream.plugin.impl.AbstractBaseUdx;

import java.sql.*;

import com.sqlstream.jdbc.*;

 

public class Sample extends AbstractBaseUdx {

      FunctionContext context;

 

private Sample(

   java.sql.ResultSet inputRows,

   java.sql.PreparedStatement resultInserter)

   throws SQLException

{

   super(newTracer(), inputRows, resultInserter);

   context = newFunctionContext();

}

 

public static double simpleUDX(

   java.sql.ResultSet inputRows,

   String aggColName1, String aggColName2,

   String partColName1, String partColName2, String rowtimeColName,

   int windowSize, java.sql.PreparedStatement resultInserter)

   throws SQLException

{

   Sample instance = new Sample(inputRows, resultInserter);

 

   // Following determines columnIndexes in inputRows for columns

   // needed to compute aggregate

   context.updateContext(

       aggColName1, aggColName2,

       partColName1, partColName2, rowtimeColName,

       windowSize);

 

   instance.run();

}

 

public void run() throws SQLException {

   while (inputRows.next()) {

       ...

       String partCol1 = inputRows.getString(functionContext.partColIdx1);

       int partCol2 = inputRows.getInt(functionContext.partColIdx2);

       long rowtimeCol =

          inputRows.getTimestamp(functionContext.rowtimeIdx).getTime();

       while (functionContext.flushGroupedResults(rowtimeCol)) {

 

           AggBucket bucket = functionContext.getNextBucket();

           functionContext.passGroupingColumns(bucket, resultInserter);

 

           // set the parameter for aggResult column defined in

           // RETURNS clause of the UDX

           resultInserter.setDouble(

               functionContext.aggResultIdx1, bucket.getResult1());

           resultInserter.setDouble(

               functionContext.aggResultIdx2, bucket.getResult2());

 

           // insert the result row to the output table of the UDX.

           resultInserter.executeUpdate();

       }

 

       AggBucket agg = functionContext.getAggBucket(  

           partCol1, partCol2, rowtimeCol, windowSize);

       if (agg == null) {

           functionConext.addAggBucket(newAggBucket(

               partCol1, partCol2, rowtimeCol, windowSize));

       }

     

       double aggCol1 = inputRows.getDouble(functionContext.aggColIdx1);

       double aggCol2 = inputRows.getDouble(functionContext.aggColIdx2);

       agg.updateBucket(aggCol2, aggCol2);

       ...

     

   }

}

}

 

Rowtime bounds and UDXes

The code samples in earlier sections work using simple JDBC APIs. As with all s-Server streaming SQL you need to use the SQLstream extensions to these APIs to handle rowtime bounds (punctuations). The topic Rowtime Bounds and UDXs describes this concept in more detail. As topic discusses, you need to use com.sqlstream.jdbc.StreamingResultSet and com.sqlstream.jdbc.StreamingPreparedStatement interfaces for handling rowtime bounds.

The following code block provides an example of implementing rowtime bounds using these two classes:

import java.sql.*;

import com.sqlstream.jdbc.*;

import com.sqlstream.StreamingResultSet.RowEvent;

public class SimpleUdx extends AbstractBaseUdx {

 

   private int input1;

   private String input2;

   private double input3;

 

   private SimpleUdx(

   java.sql.ResultSet inputRows, int input1, String input2,

   double input3, java.sql.PreparedStatement  results)

{

   super(tracer, inputRows, results);

   createPassMap();

   this.input1 = input1;

   this.input2 = input2;

   this.input3 = input3;

}

 

public static void execute(

   java.sql.ResultSet inputRows, int input1, String input2,

   double input3, java.sql.PreparedStatement  results)

   throws SQLException

{

   SimpleUdx instance =

       new SimpleUdx(inputRows, input1, input2, input3, results);

   instance.run();

}

public void run() throws SQLException

{

   StreamingResultSet in = (StreamingResultSet)inputRows;

   StreamingPreparedStatement out = (StreamingPreparedStatement)results;

                while (true) {

                try {

                        RowEvent e = in.nextRowOrRowtime(maxIdle);

                        switch (e) {

                        case EndOfStream:

                             // < custom code to handle rowtime bound

                            //end of stream >

                             return;  // end of input

                        case Timeout:

                            // <no data after waiting for queryTimeoutMillis>

                            continue;

                             // <custom code for handling what happens when server times out>

                        case NewRow:

                            transferColumns(in, null, out, passList);

                            // <custom code to process new row>

                            break;

                        case NewRowtimeBound:

                        Timestamp newBound = in.getRowtimeBound();

                        // <custom code>

                        // pass the rowtime bound to out

                        out.setRowtimeBound(newBound);

                        continue;

                        }

                    } catch (SQLException se) {

                        // custom code. Typically swallow the exception here.

                    }

                    out.executeUpdate();    // throw SQLException

                }

}

}

 

Compiling the UDX

When you compile the UDX, you need to include aspen.jar in the classpath, as in the following example. The following example assumes you have saved the code for your UDX in a file called SimpleUdx.java.

$ javac -cp  $SQLSTREAM_HOME/lib/SqlStreamJdbc_Complete.jar:$SQLSTREAM_HOME/lib/aspen.jar /path/to/SimpleUdx.java

$ jar cvf  SimpleUdx.jar /path/to/SimpleUdx.class

 

Declaring UDXs in s-Server

In order to be available in s-Server, UDXs need to be declared in SQL. A typical UDX definition in SQL is shown below.

CREATE OR REPLACE JAR simple_jar

   LIBRARY ‘file:plugin/SimpleUdx.jar’

   OPTIONS(0);

CREATE OR REPLACE FUNCTION simple_udx(

   inputRows CURSOR, input1 INTEGER,

   input2 VARCHAR(20), input3 DOUBLE)

RETURNS (inputRows.*, output1 DOUBLE, output2 INTEGER, output3 VARCHAR(40))

LANGUAGE JAVA

PARAMETER STYLE SYSTEM DEFINED JAVA

NO SQL

--EXTERNAL NAME ‘your_jar:com.yourcompany.component.yourUDX.yourMethod’

EXTERNAL NAME simple_jar:com.sqlstream.plugin.simple.SimpleUdx.execute;

 

The function definition above searches for the static method com.sqlstream.plugin.simple.SimpleUdx.execute() in the jar file $SQLSTREAM_HOME/plugin/SimpleUdx.jar.

It looks for the method with the signature

public static void execute(java.sql.ResultSet inputRows, int input1, String input2, double input3, java.sql.PreparedStatement  results) throws SQLException

 

Note that the java method signature has one extra parameter, results, compared to the signature in a typical SQL function definition. The parameter results represents the RETURNS clause of the function definition. It is the java.sql.PreparedStatement for the insert operation to the temporary table/stream being returned by the function. When the UDX is called, s-Server will automatically pass this PreparedStatement as the last parameter of the java method call.

The input cursors can be accessed using the following:

Non-streaming API, java.sql.ResultSet. See http://docs.oracle.com/javase/7/docs/api/java/sql/ResultSet.html for more details.
Streaming API, com.sqlstream.jdbc.StreamingResultSet. This API is defined by SQLstream and extended from java.sql.ResultSet. This API is necessary to handle punctuations in the UDX implementation. See the topic Streaming ResultSet in this guide for more details.