Writing to Kinesis

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing to Kinesis

Previous pageReturn to chapter overviewNext page

Show/Hide Hidden Text

The Kinesis ECDA adapter writes batches of data to a Kinesis stream. You can specify CSV, XML, or JSON as a format for the data. In order to write to a Kinesis server, you must first define a server object for the Kinesis server. This topic describes setting up and performing an INSERT into a foreign stream in order to write data to a Kinesis server.

This adapter will work best when run from within AWS. If you write to it from outside AWS, all rows will be sent over the Internet. (You would also need to install credentials from an account to do so.)

To configure how the adapter writes to Kinesis, you use foreign stream options. These options are listed below.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Sample Code

Like all streams (but unlike server objects or data wrappers), all streams must be defined within a schema. The following code first creates a schema called "KinesisOutput," then creates a foreign stream called "KinesisOutputStream" with the predefined server "KinesisServer" as a server option. To transfer data into Kinesis from this stream, you will need to INSERT into it. This step simply sets up the stream, with named columns and Kinesis-specific options. (These options are discussed below.)

Here is an example of the SQL used to define a foreign stream for the Kinesis adapter. When you INSERT into this stream (using a pump), s-Server writes data to the defined Kinesis location.

CREATE SERVER "KinesisWriterServer" TYPE 'KINESIS'

FOREIGN DATA WRAPPER ECDA;

 

CREATE OR REPLACE SCHEMA "FileWriterSchema"

SET SCHEMA 'FileWriterSchema';

 

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("ts" TIMESTAMP NOT NULL,

"transactionCount" INT NOT NULL)

SERVER "KinesisServer"

OPTIONS

(KINESIS_STREAM_NAME 'AggregatedData',

AWS_REGION 'us-west-1',

formatter 'CSV',

row_separator '',

character_encoding 'UTF-8');

 

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

 

CREATE OR REPLACE PUMP "writerPump" STOPPED AS

--We recommend creating pumps as stopped

--then using ALTER PUMP "Pumps"."writerPump" START to start it

INSERT INTO "FileWriterSchema"."FileWriterStream"

SELECT STREAM * FROM "MyStream";

--where "MyStream" is a currently existing stream

 

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

 

If one of the foreign stream columns is named PARTITION_ID, that will override the PARTITION_ID option.

Setting Up an AWS Profile Path

The xxx and yyy can be set up on the AWS Console as follows:

1.Open the AWS Console.

int_kinesis_aws_profile_path1

2.Click Identity & Access Management

int_kinesis_aws_profile_path2

3.Click Users.

int_kinesis_aws_profile_path3

4.Click your User ID.
5.Create an Access Key.

int_kinesis_aws_profile_path4

6.When you create an access key, the AWS console will allow you to download a credentials file which will contain the values for aws_access_key_id and secret_access_key.

Format Type Options

Other options are specific to format type and are described below.

Note: Parameters that require an ampersand as a value, such as QUOTE_CHARACTER '&', for CSV parsing cannot be passed. (Parameter names are currentl passed as a strong of key=value pairs delimited by ampersands, such as "key1=value1&key2=value2". There is currently have no way of escaping or quoting the ampersand character,)

hmtoggle_plus1Formatting Files as CSV

When writing CSV files, the Extensible Common Data Adapter converts rows into character-separated output based on options supplied through the options sections of the CREATE FOREIGN STREAM statement. It converts streaming tuples into a character-separated file.

Note: For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

To write a CSV file, you need to give the Extensible Common Data Adapter the following information:

A destination directory for the file.
A formatter of CSV
A character encoding type for the file.
How often the file will rotate in terms of bytes or time.

Example Code

To write to CSV files, you need to set up a server object which references one of the I/O systems.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

Finally, you create a foreign stream which references the server object. Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "WebData," and creates a foreign stream called "FileWriterStream." To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SCHEMA "WebData";

SET SCHEMA '"WebData"';

 

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("STR1" VARCHAR(32))

SERVER "FileWriterServer" OPTIONS

(directory 'path/to/myfile',

formatter 'CSV',

filename_date_format 'yyyy-MM-dd-HH:mm:ss',

filename_prefix 'test-',

filename_suffix '.csv',

character_encoding 'US-ASCII',

formatter_include_rowtime 'false',

max_bytes_per_file '1073741824')

 

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "FileWriterStream"

SELECT STREAM "MyStream";

--where "MyStream" is a currently existing stream

hmtoggle_plus1Formatting as XML

For XML files, the Extensible Common Data Adapter takes batches of rows and maps them to XML elements, depending on the options you specify.

If no value for DATA_ELEMENTS or <col_name>_ELEMENTS or <col_name>_ATTRIBUTES is specified, then the column name is used as an element name (not an <attribute> name).  So a column named foo would be in an XML element named /batch/row/foo if no values were specified.

Sample Foreign Stream Implementing ECD Adapter to Write XML Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("recNo" INTEGER,

"ts" TIMESTAMP NOT NULL,

"accountNumber" INTEGER,

"loginSuccessful" BOOLEAN,

"sourceIP" VARCHAR(32),

"destIP" VARCHAR(32),

"customerId" INTEGER,)

SERVER "FileWriterServer"

OPTIONS

(directory

'/path/to/myfile',

formatter 'XML',

filename_date_format 'yyyy-MM-dd-HH:mm:ss',

filename_prefix 'test-',

filename_suffix '.xml',

data_elements 'data',

character_encoding 'US-ASCII',

formatter_include_rowtime 'false',

max_bytes_per_file '1073741824');

 

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "FileWriterStream"

SELECT STREAM "MyStream";

--where "MyStream" is a currently existing stream

 

Sample Properties File to Use ECD Agent to Write XML Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

#Column types for the source stream

ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)

DIRECTORY=/PATH/TO/MYFILE

FORMATTER=XML

FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS

FILENAME_PREFIX=TEST-

FILENAME_SUFFIX=.XML

DATA_ELEMENTS=DATA

CHARACTER_ENCODING=US-ASCII

FORMATTER_INCLUDE_ROWTIME=FALSE

MAX_BYTES_PER_FILE=1073741824

hmtoggle_plus1Formatting as JSON

The ECDA adapter writes batches of data to JSON tuples. To configure how the adapter writes such tuples, you use foreign stream options. These options are listed below.

Here is an example of the SQL used to define a foreign stream for the JSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

CREATE OR REPLACE FOREIGN STREAM "JSON_OutputStream"

  ("id" DOUBLE,

  "reported_at" VARCHAR(4096),

  "shift_no" DOUBLE,

  "trip_no" DOUBLE,

  "route_variant_id" VARCHAR(4096),

  "waypoint_id" DOUBLE,

  "last_known_location_state" VARCHAR(4096)

   )

   SERVER "FileWriterServer"

    --note that this uses the server defined above

   OPTIONS

   (

    directory '/tmp/json_test/',

     --file directory where JSON file will be written. 

    formatter 'JSON',

    character_encoding 'UTF-8',

    filename_prefix 'test',

    filename_suffix '.json',

    formatter_include_rowtime 'false',

    MAX_TIME_DELTA_PER_FILE '60000'

    );

 

To actually write to a file in /tmp/json_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "JSON_OutputStream" (

  "id",

  "reported_at",

  "shift_no",

  "trip_no",

  "route_variant_id",

  "waypoint_id",

  "last_known_location_state"

)

   SELECT STREAM

  "id",

  "reported_at",

  "shift_no",

  "trip_no",

  "route_variant_id",

  "waypoint_id",

  "last_known_location_state"

   from "buses_stream";

    --this assumes that a stream called "buses_stream" exists in the same schema

 

Output

 

[{"id":"5.0115809712E10",

"reported_at":"2014-07-23 20:52:04.527000000",

"shift_no":"NULL",

"trip_no":"653.0",

"route_variant_id":"L38 7",

"waypoint_id":"NULL",

"last_known_location_state":"NULL"},

{"id":"5.0115854098E10",

"reported_at":"2014-07-23 20:52:05.443000000",

"shift_no":"NULL",

"trip_no":"NULL",

"route_variant_id":"310 7",

"waypoint_id":"NULL",

"last_known_location_state":"NULL"},

{"id":"3.46866848031E11",

"reported_at":"2014-07-23 20:52:07.713000000",

"shift_no":"1016.0",

"trip_no":"NULL",

"route_variant_id":"806 160",

"waypoint_id":"1.5588646E7",

"last_known_location_state":"NULL"}]

 

Sample Properties File to Use ECD Agent to Write JSON Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

#Column types for the source stream

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

DIRECTORY=/tmp/json_test/

FORMATTER=JSON

FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS

FILENAME_PREFIX=TEST

FILENAME_SUFFIX=.JSON

CHARACTER_ENCODING=US-ASCII

FORMATTER_INCLUDE_ROWTIME=FALSE

MAX_TIME_DELTA_PER_FILE=60000

 

See the topic Writing to Files with the ECD Adapter and Agent for a general overview of how to write to files with the ECD Adapter or Agent.