Blending SQL and Custom Operators

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Transforming Data in s-Server >

Blending SQL and Custom Operators

Previous pageReturn to chapter overviewNext page

Once you create UDFs and UDXes as functions, you can use these to create custom windowed aggregation functions that aggregate values on a sliding window of rows over time.

Aggregating Values on Sliding Windows with UDFs

You can use a JAVA implementation of a "stateful" User Defined Function (UDF) to create custom windowed aggregation functions that aggregate values on a "sliding" window of rows over time.

A User Defined Transform (UDX) written in JAVA is a more powerful alternative. A UDX is essentially a table function implemented in JAVA. See Aggregating Values on Sliding Windows with UDXes below.

The following example uses standard s-Server Windowed Aggregation syntax. See the topic Window Clause of the SELECT Statement in the Streaming SQL Reference Guide for more details.

SELECT STREAM "CustomAgg"("aggCol") OVER (PARTITION BY "partCol" RANGE INTERVAL '1' MINUTE PRECEDING), *

FROM "aStream";

 

Here, "CustomAgg" is a custom aggregation function that a you implement in JAVA. The following describes how to implement such a function and use it to implement custom aggregation.

First, you declare (create) the UDF that can be used to perform Windowed Aggregation. See the topic CREATE FUNCTION in the Streaming SQL Reference Guide for more details.

CREATE OR REPLACE FUNCTION "WindowedAggUDF"(

   "aggCol" DOUBLE, "partitionCol" VARCHAR(128),

   "rowtimeCol" TIMESTAMP, "windowSize" INTEGER)

   RETURNS DOUBLE

   LANGUAGE JAVA

   PARAMETER STYLE SYSTEM DEFINED JAVA

   NO SQL

   EXTERNAL NAME '"SampleJar":com.sqlstream.sample.functions.Sample.SimpleUDF';

 

Note: You need to create the SimpleUDF before executing this statement. See the topic CREATE JAR in the Streaming SQL Reference Guide for more details. The sections below provide sample Java code that you can use to implement this JAR.

EXTERNAL NAME is used to define where s-Server should look for the Java class implementing the function. Java-defined external functions must be located either in a class file on SQLstream s-Server's Java classpath, or in an external Java archive (JAR) file loaded into the system using CREATE JAR. In the latter case, the qualified jar name is the name given to the jar as part of CREATE FUNCTION. If the jar name was not defined in the current schema, then the fully qualified <schema>.<jar name> format must be used.

At this point, you can implement the semantics expressed in the windowed aggregation syntax above by invoking the sample UDF with the UDF definition you created earlier.

SELECT STREAM "WindowedAggUDF"("aggCol", "partCol", s.ROWTIME, 60000), *

FROM "aStream" AS s;

 

The JAVA code below defines a function called simpleUDF. The code shows the basic structure of how windowed aggregation semantics can be implemented using a function called "context" to maintain "windows" of rows and aggregated results that are partitioned by partition column.

public static double simpleUDF(

   double aggCol, String partCol, long rowtimeCol, int windowSize)

{

   ...

   Object functionContext = PluginUtil.get().getContext();

   if (functionContext == null) {

       functionContext = getNewContext();

       PluginUtil.get().setContext(functionContext);

   }

   ...

   WindowPartition p = functionContext.getPartition(partCol);

   if (p == null) {

       functionContext.addPartition(newPartition(partCol));

       functionConext.addAggBucket(newAggBucket(partCol));

   }

   AggBucket agg = functionContext.getAggBucket(partCol);

   Row newRow = new Row(aggCol, rowtimeCol);

   p.addRow(newRow);

   while (p.oldestRow().needsPurge(rowtimeCol, windowSize)) {

       Row oldestRow = p.purgeOldestRow();

       agg.updateBucket(DROP_ROW, oldestRow);

       -- if partition has 0 rows in it then delete the partition

       -- as well as corresponding aggBucket from context.

   }

   agg.updateBucket(ADD_ROW, newRow);

   ...

}

 

Aggregating Values on Sliding Windows with UDXes

You can use a JAVA User Defined Transform (UDX)  to create custom windowed aggregation functions that aggregate values on a "sliding" window of rows over time. To do so, you will need to follow the steps to create and implement a UDX.

SQL standard Windowed Aggregation syntax

SELECT STREAM "CustomAgg"("aggCol") OVER (PARTITION BY "partCol" RANGE INTERVAL '1' MINUTE PRECEDING), *

FROM "aStream";

 

In the SQL example above, "CustomAgg" is a custom aggregation function that you implement in JAVA. Subsequent paragraphs describe how you can use a JAVA UDX to implement such custom aggregation.

First, you declare (create) the UDX that can be used to perform Windowed Aggregation. See the topic CREATE FUNCTION in the Streaming SQL Reference Guide for more details.

CREATE OR REPLACE FUNCTION "WindowedAggUDX"(

    "inputRows" CURSOR

    "aggColName" VARCHAR(64), "partitionColName" VARCHAR(64),

    "rowtimeColName" VARCHAR(64), "windowSize" INTEGER)

    RETURNS TABLE (

        "inputRows".*, "aggResult" DOUBLE

    )

    LANGUAGE JAVA

    PARAMETER STYLE SYSTEM DEFINED JAVA

    NO SQL

    EXTERNAL NAME '"SampleJar":com.sqlstream.sample.functions.Sample.SimpleUDX';

 

Note: You need to create the SimpleUDF before executing this statement. See the topic CREATE JAR in the Streaming SQL Reference Guide for more details. The sections below provide sample Java code that you can use to implement this JAR.

EXTERNAL NAME is used to define where s-Server should look for the Java class implementing the function. Java-defined external functions must be located either in a class file on SQLstream s-Server's Java classpath, or in an external Java archive (JAR) file loaded into the system using CREATE JAR. In the latter case, the qualified jar name is the name given to the jar as part of CREATE FUNCTION. If the jar name was not defined in the current schema, then the fully qualified <schema>.<jar name> format must be used.

 

At this point, you can implement the semantics expressed in the windowed aggregation syntax above by invoking the sample UDX as described below.

SELECT STREAM STREAM("WindowedAggUDX"(

    CURSOR(SELECT STREAM * FROM "aStream"),

    ‘aggCol’, ‘partCol’, ‘ROWTIME’, 60000));

 

The JAVA code below shows the basic structure of how windowed aggregation semantics can be implemented using the  function "context" to maintain "windows" of rows and aggregated results that are partitioned by partition column.

import com.sqlstream.plugin.impl.AbstractBaseUdx;

import java.sql.*;

import com.sqlstream.jdbc.*;

 

public class Sample extends AbstractBaseUdx {

       FunctionContext context;

 

private Sample(

    java.sql.ResultSet inputRows,

    java.sql.PreparedStatement resultInserter)

    throws SQLException

{

    super(newTracer(), inputRows, resultInserter);

    context = newFunctionContext();

}

 

public static double simpleUDX(

    java.sql.ResultSet inputRows,

    String aggColName, String partColName, String rowtimeColName,

    int windowSize, java.sql.PreparedStatement resultInserter)

    throws SQLException

{

    Sample instance = new Sample(inputRows, resultInserter);

    

    -- Following determines columnIndexes in inputRows for columns

    -- needed to compute aggregate

    context.updateContext(

        aggColName, partColName, rowtimeColName, windowSize);

 

    instance.run();

}

 

public void run() throws SQLException {

    while (inputRows.next()) {

        ...

        -- API to fast copy pass-through columns from input

        -- to output. Defined in AbstractBaseUdx

        transferColumns(inputRows, resultInserter, passList);

        ...

 

        WindowPartition p = functionContext.getPartition(partCol);

        if (p == null) {

            functionContext.addPartition(newPartition(partCol));

            functionConext.addAggBucket(newAggBucket(partCol));

        }

        AggBucket agg = functionContext.getAggBucket(partCol);

        Row newRow = new Row(aggCol, rowtimeCol);

        p.addRow(newRow);

        while (p.oldestRow().needsPurge(rowtimeCol, windowSize)) {

            Row oldestRow = p.purgeOldestRow();

            agg.updateBucket(DROP_ROW, oldestRow);

            -- if partition has 0 rows in it then delete the partition

            -- as well as corresponding aggBucket from context.

        }

        agg.updateBucket(ADD_ROW, newRow);

        ...

        -- set the parameter for aggResult column defined in

        -- RETURNS clause of the UDX

        resultInserter.setDouble(aggResultIdx, agg.getResult());

 

        -- insert the result row to the output table of the UDX.

        resultInserter.executeUpdate();

    }

}

}