Writing to Amazon Kinesis

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing to Amazon Kinesis

Previous pageReturn to chapter overviewNext page

The Kinesis ECDA adapter writes batches of data to a Kinesis stream. You can specify CSV, XML, JSON, or BSON as a format for the data. In order to write to Kinesis, you must first define a server object for the Kinesis server. This topic describes setting up and performing an INSERT into a foreign stream in order to write data to a Kinesis server. The minimum credentials required to write to a Kinesis stream are the stream name and region.

This adapter will work best when run from within AWS. If you write to it from outside AWS, all rows will be sent over the Internet. (You would also need to install credentials from an account to do so.) See Configuring s-Server to Use SSL in Administrator Guide for information on how to use SSL to send such rows securely over the Internet.

This topic contains the following subtopics:

Sample Code for Kinesis Foreign Stream

Foreign Stream Options for Kinesis

Setting Up an AWS Profile

Using Partition IDs to Write to a Specific Shard

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Sample Code

Like all streams (but unlike server objects or data wrappers), all streams must be defined within a schema. The following code first creates a schema called "KinesisOutput," then creates a foreign stream called "KinesisOutputStream" with the predefined server "KinesisServer" as a server option. To transfer data into Kinesis from this stream, you will need to INSERT into it. This step simply sets up the stream, with named columns and Kinesis-specific options. (These options are discussed below.)

Here is an example of the SQL used to define a foreign stream for the Kinesis adapter. When you INSERT into this stream (using a pump), s-Server writes data to the defined Kinesis location.

CREATE SERVER "KinesisWriterServer" TYPE 'KINESIS'

FOREIGN DATA WRAPPER ECDA;

 

CREATE OR REPLACE SCHEMA "KinesisWriterSchema"

SET SCHEMA 'KinesisWriterSchema';

 

CREATE OR REPLACE FOREIGN STREAM "KinesisWriterStream"

("ts" TIMESTAMP NOT NULL,

"transactionCount" INT NOT NULL)

SERVER "KinesisServer"

OPTIONS

(KINESIS_STREAM_NAME 'AggregatedData',

AWS_REGION 'us-west-1',

formatter 'CSV',

row_separator '',

character_encoding 'UTF-8');

 

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

 

CREATE OR REPLACE PUMP "writerPump" STOPPED AS

--We recommend creating pumps as stopped

--then using ALTER PUMP "Pumps"."writerPump" START to start it

INSERT INTO "KinesisWriterSchema"."KinesisWriterStream"

SELECT STREAM * FROM "MyStream";

--where "MyStream" is a currently existing stream

 

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

 

If one of the foreign stream columns is named PARTITION_ID, that will override the PARTITION_ID option.

Kinesis Stream Options

Option Name

Description

KINESIS_STREAM_NAME

Required. Name of Kinesis stream to write to. No default.

AWS_REGION

Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.

AWS_PROFILE_PATH

See Setting Up an AWS Profile Path  in the topic Reading from Kinesis Streams in the Integration Guide . Must point to a credential file on the s-Server file system with the following format:

[default]

aws_access_key_id = xxx

aws_secret_access_key = yyy

 

This defaults to '' - which goes to ~/.aws/credentials.

Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path  in the topic Reading from Kinesis Streams in the Integration Guide .

AWS_PROFILE_NAME

Optional. Profile name to use within credentials file. Amazon supports multiple named profiles within a configuration file. If you have a named profile, you can reference it here. Defaults to default. See http://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html

INITIAL_BACKOFF

Optional. If insert fails due to throttling, how many milliseconds to back off initially. Default to 20.

MAX_BACKOFF

Optional. If insert fails due to throttling, how many milliseconds to back off as a maximum. Defaults to 20480.

MAX_RETRIES

Optional. If insert fails due to throttling, how many retries should be made. Backoff is doubled for each retry up to max. Default is 10.

BUFFER_SIZE

Optional. Maximum number of bytes per update request. Defaults to 4194304.

MAX_RECORDS_PER_REQUEST

Optional. maximum number of records per update request. Defaults to 500.

REPORT_FREQUENCY

Optional. How often to log (in milliseconds) statistics. Defaults to 0, which means "never."

KINESIS_DEFAULT_PARTITION_ID

Optional. Partition id of shard to write to. Defaults to ''.

Setting Up an AWS Profile Path

The aws_access_key_id and secret_access_key can be set up on the AWS Console as follows:

1.Open the AWS Console.

int_kinesis_aws_profile_path1

1.Click Identity & Access Management

int_kinesis_aws_profile_path2

2.Click Users.

int_kinesis_aws_profile_path3

3.Click your User ID.

4.Create an Access Key.

int_kinesis_aws_profile_path4

5.When you create an access key, the AWS console will allow you to download a credentials file which will contain the values for aws_access_key_id and secret_access_key.

Format Type Options

Other options are specific to format type. See Output Formats for Writing.

Note: Parameters that require an ampersand as a value, such as QUOTE_CHARACTER '&', for CSV parsing cannot be passed. (Parameter names are currently passed as a strong of key=value pairs delimited by ampersands, such as "key1=value1&key2=value2". There is currently have no way of escaping or quoting the ampersand character,)

Performance, Partitioning, Scaling, High Availability

Amazon Kinesis limits write rates to 1000 record/sec/shard or 1Mb/sec/shard, whichever is lower. So high throughput applications need to split data across many shards. You can do so by setting a partition key, using a column in the foreign stream named "PARTITION_ID". The value in this column will not be delivered to the target stream, but it will be used as the basis for choosing which shard the record gets sent to (the value gets hashed). For example, you might shard on a device_key (so readings from a given device always go to the same shard). Of course it is important to consider how downstream applications are going to consume the shards.

If you do not use a PARTITION_ID column, all data is written to the shard defined in the KINESIS_DEFAULT_PARTITION_ID parameter.

More Complex Example with Partitioning

The following code uses a device indicator in order to partition output on Kinesis. In this way, data is "sharded" by device id.  See http://docs.aws.amazon.com/streams/latest/dev/key-concepts.html for more information about shards and partition ids.

CREATE OR REPLACE SCHEMA "Kinesis_Schema";

 

SET SCHEMA '"Kinesis_Schema"';

 

CREATE OR REPLACE SERVER "KinesisWriterServer" TYPE 'KINESIS'

FOREIGN DATA WRAPPER ECDA;

 

--  Define the stream

 

 CREATE FOREIGN STREAM "kinesis_output_sink" (

      --populated by pump from device_key to shard the output

     "PARTITION_ID" varchar(32),                        

     "Wind_Chill" DOUBLE,

     "Barometric_Pressure" DOUBLE,

     "Humidity" DOUBLE,

     "Leak_Detection" DOUBLE,

     "Rainfall" DOUBLE,

     "Rainrate" DOUBLE,

     "Remote_Humid" DOUBLE,

     "Remote_Temp" DOUBLE,

     "Wind_Direction" DOUBLE,

     "runningCount" BIGINT,

     "readings_uuid" VARCHAR(32),

     "device_key" VARCHAR(32),

     "model_code" VARCHAR(16),

     "latitude" DOUBLE,

     "longitude" DOUBLE,

     "recorded_at" VARCHAR(32)

     )

     SERVER "KinesisWriterServer"

     OPTIONS (

         "FORMATTER" 'JSON',

         "KINESIS_STREAM_NAME" 'production',

         "AWS_REGION" 'us-east-1',

         --"KINESIS_DEFAULT_PARTITION_ID" '',

         "BUFFER_SIZE" '4194304',

         "MAX_RETRIES" '10',

         "INITIAL_BACKOFF" '20',

         "MAX_BACKOFF" '20480',

         "MAX_RECORDS_PER_REQUEST" '500',

         "AWS_PROFILE_NAME" 'default',

         "AWS_PROFILE_PATH" '',

         "REPORT_FREQUENCY" '0'

     )

  ;

 

CREATE FOREIGN STREAM "Kinesis_Output_Sink"

( "PARTITION_ID" varchar(32)                        populated by pump from device_key to shard the output

, "json" VARCHAR(3000)

)

SERVER "KinesisWriterServer"

OPTIONS

  ( "FORMATTER" 'CONCATENATE'

  , "KINESIS_STREAM_NAME" 'production-a_production_mar_sensor_readings_post_sql_stream'

  , "AWS_REGION" 'us-east-1'

  , "BUFFER_SIZE" '4194304'

  , "MAX_RETRIES" '10'

  , "INITIAL_BACKOFF" '20'

  , "MAX_BACKOFF" '20480'

  , "MAX_RECORDS_PER_REQUEST" '500'

  , "AWS_PROFILE_NAME" 'user1'

  , "AWS_PROFILE_PATH" ''

  , "REPORT_FREQUENCY" '0'

)

;

 

CREATE PUMP "kinesis_output_sink-Pump" STOPPED AS

 

INSERT INTO "Kinesis_Output_Sink"

(   "PARTITION_ID" varchar(32),                        

--populated by pump from device_key to shard the output

     "Wind_Chill" DOUBLE,

     "Barometric_Pressure" DOUBLE,

     "Humidity" DOUBLE,

     "Leak_Detection" DOUBLE,

     "Rainfall" DOUBLE,

     "Rainrate" DOUBLE,

     "Remote_Humid" DOUBLE,

     "Remote_Temp" DOUBLE,

     "Wind_Direction" DOUBLE,

     "runningCount" BIGINT,

     "readings_uuid" VARCHAR(32),

     "device_key" VARCHAR(32),

     "model_code" VARCHAR(16),

     "latitude" DOUBLE,

     "longitude" DOUBLE,

     "recorded_at" VARCHAR(32)

)

SELECT STREAM

   "device_key" varchar(32), -- use this as partition id                      

     "Wind_Chill" DOUBLE,

     "Barometric_Pressure" DOUBLE,

     "Humidity" DOUBLE,

     "Leak_Detection" DOUBLE,

     "Rainfall" DOUBLE,

     "Rainrate" DOUBLE,

     "Remote_Humid" DOUBLE,

     "Remote_Temp" DOUBLE,

     "Wind_Direction" DOUBLE,

     "runningCount" BIGINT,

     "readings_uuid" VARCHAR(32),

     "device_key" VARCHAR(32),

     "model_code" VARCHAR(16),

     "latitude" DOUBLE,

     "longitude" DOUBLE,

     "recorded_at" VARCHAR(32)

FROM "Output_Sink"

;