Adding and Removing Processing Nodes for Kafka

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Reading Data into s-Server > Reading from Other Sources  > Reading from Kafka >

Adding and Removing Processing Nodes for Kafka

Previous pageReturn to chapter overviewNext page

After aggregating data in multiple instances of s-Server, you can create one or more s-Server pumps in order to write to Kafka topics, and add and remove these pumps using the ALTER PUMP command.

Implementing the Kafka ECDA Adapter to Pump Data to Kafka

Every time you use an adapter, you need to implement it within an s-Server schema. The following code first creates a schema and implements the Kafka ECDA adapter.

CREATE OR REPLACE SCHEMA "kafkaAggregation";

SET SCHEMA '"kafkaAggregation"';

SET PATH '"kafkaAggregation"';

 

CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA'

FOREIGN DATA WRAPPER ECDA;

 

CREATE OR REPLACE FOREIGN STREAM "KafkaAggregatedData"

(offset LONG NOT NULL,

"ts" TIMESTAMP NOT NULL,

"PARTITION" INT NOT NULL,

"zipcode" CHAR(5) NOT NULL,

"transactionTotal" DECIMAL(18,2) NOT NULL,

"transactionCount" INT NOT NULL)

SERVER "KafkaServer"

 

OPTIONS

(topic: 'AggregatedData',

seed_brokers: 'localhost',

starting_time: 'latest',

parser 'CSV',

character_encoding 'UTF-8',

skip_header 'false');

 

Setting up the Pump

The code below creates a stream with three columns, zipcode, transactionTotal, and transactionCount. This stream will be used to pump data from a Kafka topic.

CREATE OR REPLACE STREAM "AggregatedData"(

      "zipcode" CHAR(5) NOT NULL,

      "transactionTotal" DECIMAL(18,2) NOT NULL,

      "transactionCount" INT NOT NULL);

 

The next code block creates a view on the foreign stream KafkaAggregatedData, ordered by timestamp ("ts") and selecting the columns PARTITION, zipcode, transactionTotal, and transactionCount.

CREATE OR REPLACE VIEW "AggregatedDataWithRowTime" AS

SELECT STREAM "ts" AS ROWTIME, "PARTITION", "zipcode", "transactionTotal", "transactionCount"

FROM "KafkaAggregatedData"

ORDER BY "ts" WITHIN INTERVAL '2' SECOND;

 

The next code block uses a WHERE statement to identify and discard duplicate rows.

CREATE OR REPLACE VIEW "AggregatedDataDeDuped" AS

SELECT STREAM "partition", "zipcode", "transactionTotal", "transactionCount"

FROM (SELECT STREAM *,

COUNT(*) OVER (PARTITION BY "partition", "zipcode" RANGE INTERVAL '0' SECOND PRECEDING) AS c

FROM "AggregatedDataWithRowTime") AS dd

WHERE c = 1;*) OVER (PARTITION BY "PARTITION", "zipcode" RANGE INTERVAL '0' SECOND PRECEDING) = 1

 

The next code block pumps the the columns zipcode, the total of column transactionTotal and the total of column transactionCount.

CREATE OR REPLACE PUMP "rollupPump" STOPPED AS

INSERT INTO "AggregatedData"

SELECT STREAM "zipcode", sum("transactionTotal"), sum("transactionCount")

FROM "AggregatedDataDeDuped"

GROUP BY "AggregatedDataDeDuped".ROWTIME, "zipcode";

 

Starting and Stopping the Pump

You can then start and stop the pump using the ALTER PUMP command:

ALTER PUMP "rollupPump" START;

ALTER PUMP "rollupPump" STOP;

 

See the topic ALTER PUMP in the Streaming SQL Reference Guide for more details.

Next: Using the TableLookup UDX to Prefetch a Partitioned Portion of a Database