Reading from Kafka
The ECDA Kafka adapter option lets you read data (topics) from Kafka servers.
All adapter or agent implementations involve configuring options. For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.
For the ECDA Kafka adapter, you can use an options query to set options dynamically. See Using the Options Query Property for more details.
Like all streams (but unlike server objects or data wrappers), foreign streams must be defined within a schema. The following code first creates a schema called "kafkaSource," then creates a foreign stream called "KafkaSourceStream" with the server "KafkaServer" as a server option. To read data from this stream, you will need to SELECT against it. This step simply sets up the stream, with named columns and Kafka-specific options. (These options are discussed below.)
To read from Kafka, you need to create a server object which references the data wrapper of type 'Kafka':
CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA'
FOREIGN DATA WRAPPER ECDA
Unlike server objects, all foreign streams need to be created in a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named "KafkaSourceStream."
CREATE OR REPLACE SCHEMA "kafkaSource";
SET SCHEMA '"kafkaSource"';
CREATE OR REPLACE FOREIGN STREAM "KafkaSourceStream"
(offset BIGINT NOT NULL,
"ts" TIMESTAMP NOT NULL,
"partition" INT NOT NULL,
"zipcode" CHAR(10) NOT NULL,
"transactionTotal" DOUBLE NOT NULL,
"transactionCount" INT NOT NULL)
As with all ECDA adapters, data is only moved from Kafka to s-Server once you execute a SELECT statement on the foreign stream that you have defined for the adapter. For example, the following code block copies the values of the columns partition, zipcode, transactionTotal, and transactionCount into s-Server using the foreign stream defined above, KafkaSourceStream.
SELECT STREAM "ts" AS ROWTIME, "partition", "zipcode", "transactionTotal", "transactionCount"
ORDER BY "ts" WITHIN INTERVAL '2' SECOND;