Using the Kafka ECDA to Process Partitioned Streams

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Reading Data into s-Server > Reading from Other Sources  > Reading from Kafka >

Using the Kafka ECDA to Process Partitioned Streams

Previous pageReturn to chapter overviewNext page

This topic describes an example of setting up SQL to process a pipeline handling one partition. For each partition you can have N pipelines (often on separate instances of s-Server) listening to that partition, where N is your redundancy level. It assumes two Kafka topics have been set up:

TransactionData. This takes something like credit card transactions. It should be partitioned with some kind of round robin scheme.
AggregatedData. This will be used to communicate between the agregation pipeline servers an the rollup server.

Each pipeline will

1.Read from the partition of a topic.
2.Parse incoming data into columns using the ECDA CSV Parser.
3.Lookup in address table zipcode for recipient of transaction.
4.Aggregate by zipcode transaction amounts and counts by second.

In order to ensure all pipelines for the same partition output the same data, the code discards data for the first second's aggregation. This lets you restart an instance of s-Server running pipelines at any time without affecting results.

Results are written to the AggregatedData topic. One or more instances of s-Server will then read that AggregatedData topic, discarding duplicate rows. Aggregates are then rolled up and written to a stream.

CREATE OR REPLACE SCHEMA "kafkaAggregation";

SET SCHEMA '"kafkaAggregation"';

SET PATH '"kafkaAggregation"';

 

CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA'

FOREIGN DATA WRAPPER ECDA

 

CREATE OR REPLACE FOREIGN STREAM "KafkaPartitionedInputStream"

(OFFSET BIGINT NOT NULL,

"ts" TIMESTAMP NOT NULL,

"cardNumber" BIGINT NOT NULL,

"zipcode" CHAR(10) NOT NULL,

"transactionAmount" DOUBLE NOT NULL,

"recipientId" BIGINT NOT NULL,

"transactionId" BIGINT NOT NULL)

SERVER "KafkaServer"

OPTIONS

(topic 'TransactionData',

"PARTITION" '1',

"SEED_BROKERS" 'localhost',

"PORT" '9092',

"STARTING_TIME" 'latest',

parser 'CSV',

character_encoding 'UTF-8',

skip_header 'false');

 

CREATE OR REPLACE FOREIGN STREAM "KafkaOutputStream"

(

"ts" TIMESTAMP NOT NULL,

"partition" INT NOT NULL,

"zipcode" CHAR(5) NOT NULL,

"transactionTotal" DOUBLE NOT NULL,

"transactionCount" INT NOT NULL)

SERVER "KafkaServer"

OPTIONS

(topic 'AggregatedData',

"metadata.broker.list" 'localhost:9092',

parser 'CSV',

row_separator '',

character_encoding 'UTF-8');

 

-- Source columns we're interested in

CREATE OR REPLACE VIEW "sourceData" AS

SELECT STREAM "ts" AS ROWTIME, "zipcode", "transactionAmount"

FROM "KafkaPartitionedInputStream";

 

CREATE OR REPLACE FUNCTION "getZipcode"(

     inputRows CURSOR,

        dsName VARCHAR(64),

     tableName VARCHAR(128),

       colName VARCHAR(128),

     cacheSize INTEGER,

  prefetchRows BOOLEAN,

  fuzzyLookups BOOLEAN)

  RETURNS TABLE(

       inputRows.*,

       "zipcode" CHAR(5))

  LANGUAGE JAVA

  PARAMETER STYLE SYSTEM DEFINED JAVA

  NO SQL

  EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';

 

-- adorn with zipcode using in memory lookup if possible

CREATE OR REPLACE VIEW "transactionAndZipcode" AS

SELECT STREAM

FROM TABLE("getZipcode(CURSOR(SELECT STREAM * FROM "sourceData"), 'customerDb', 'address', 'recipientId', 10000000, true, false));

 

CREATE OR REPLACE VIEW "aggregatedData" AS

SELECT STREAM "zipcode", SUM("transactionAmount") AS "transactionTotal", COUNT(*) AS "transactionCount"

FROM "sourceData"

GROUP BY FLOOR((("sourceData".ROWTIME - timestamp '1970-01-01 00:00:00') second)/10 to second), "zipcode";

 

-- Creates output pump

-- Does not output first group of rows (all rows in group will have same rowtime)

-- as this group may be partial if restarting after failure.

CREATE OR REPLACE PUMP "aggregatedDataOutputPump" STOPPED AS

INSERT INTO "kafkaAgg1a"."KafkaOutputStream"

SELECT STREAM ROWTIME AS "ts", 1 AS "partition", "zipcode", "transactionTotal", "transactionCount"

FROM (select stream *, a.ROWTIME as thistime, FIRST_VALUE(a.ROWTIME) OVER (ROWS UNBOUNDED PRECEDING) as firsttime from "kafkaAgg1a"."aggregatedData"a) b

where firsttime <> thistime;

 

ALTER PUMP "aggregatedDataOutputPump" START;