Reading from Amazon Kinesis

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Reading Data into s-Server > Reading from Other Sources  >

Reading from Amazon Kinesis

Previous pageReturn to chapter overviewNext page

Because of the particular nature of Amazon Kinesis streams, s-Server uses an agent to read from Kinesis streams. Kinesis agents run in s-Server and connect to Amazon Kinesis using options that you pass to the agent. The Kinesis agent is best used within the AWS system. You can read from Kinesis from outside AWS, but this will entail sending rows over the Internet. See Configuring s-Server to Use SSL in Administrator Guide for information on how to use SSL to send such rows securely over the Internet.

Destination for Rows

As with other agents, you will need to set up a native stream as a target for the Kinesis data to be read by the agent. As with other sources, this data needs to be parsed. You set a parser and pass parser options to the agent. See Parser Types for Reading for a list of parser choices.

kinesis_agent

You give each agent a name, such as my_kinesis_stream. Each agent reads from exactly one Kinesis stream.

Agents are stopped using START_AGENT_HOSTER and stopped using STOP_AGENT_HOSTER. You will need to restart agents if s-Server is stopped. (Unlike pumps, agents do not start up automatically.)  Best practice is to start pumps before agents, so that you do not lose any rows sent by agents.

This topic includes the following subtopics:

Simple Example of Starting the Kinesis Agent. This code sample provides a basic example using a CSV parser.

More Complex Example with JSON Parsing. This code sample provides a more complex example using the JSON parser, including JSON paths.

Stopping the Agent. This code sample provides a basic example of stopping the agent.

Kinesis Agent Options. Lists the Kinesis-specific options for the agent.

Setting Up an AWS Profile. In order to connect the agent with Kinesis, you will need an AWS profile.

Note: Agent names are internal to s-Server. This means that you can use agents with the same name on separate instances of s-Server, even if these are running on the same machine.

Simple Example of Starting Agent

The agent is started with code along the following lines. You can issue this code in SQLline or another JDBC client.

CALL SYS_BOOT.MGMT.START_AGENT_HOSTER('my_kinesis_agent',

                                     'kinesis',

                                     'SCHEMA_NAME=myschema&

                                      TABLE_NAME=mystream&

                                      PARSER=CSV&

                                      --The following are settings for the CSV parser.

                                      CHARACTER_ENCODING=ISO-8859-1&

                                      SKIP_HEADER=false&

                                      --The following are Kinesis-specific options

                                      AWS_REGION=us-west-1&

                                      AWS_PROFILE_NAME=my_profile&

                                      AWS_PROFILE_PATH=~/.aws/credentials&

                                      KINESIS_STREAM_NAME=test&

                                      KINESIS_APPLICATION_NAME=testrun3&

                                      STREAM_FANOUT=1&

                                      KINESIS_INITIAL_POSITION_IN_STREAM=TRIM_HORIZON&

                                      KINESIS_MAX_RECORDS_PER_GET=1500&

                                      KINESIS_SOCKET_TIMEOUT=-1&

                                      KINESIS_IDLE_TIME_BETWEEN_READS=-1');

 

The first parameter ("my_agent" above) is the name you are assigning to the agent for use in subsequent calls.

Second parameter ("kinesis" above) is the agent type - in this case kinesis.

Third parameter is either the path to a properties file or list of properties separated by '&'.

Agent Options

Option Name

Description

SCHEMA_NAME

s-Server schema in which the native stream to which data will be read resides. You need to create this before running the agent.

TABLE_NAME

s-Server native stream to which data will be read. You need to create this before running the agent.

KINESIS_STREAM_NAME

Required. Name of Kinesis stream to read from. No default.

AWS_REGION

Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.

AWS_PROFILE_PATH

Must point to a credential file on the s-Server file system with the following format:

[default]

aws_access_key_id = xxx

aws_secret_access_key = yyy

 

This defaults to ''" - which goes to ~/.aws/credentials.

Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path  in the topic Reading from Kinesis Streams in the Integration Guide .

AWS_PROFILE_NAME

Optional. Profile name to use within credentials file. Amazon supports multiple named profiles within a configuration file. If you have a named profile, you can reference it here. Defaults to default. See http://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html

KINESIS_INITIAL_POSITION_IN_STREAM

LATEST for latest or TRIM_HORIZON for earliest. Defaults to LATEST.

STREAM_FANOUT

If > 1, instead of writing to TABLE_NAME will instead write to TABLE_NAME1 through TABLE_NAMEn depending on shard coming in.

shard 1 will go to TABLE_NAME1. shard n will go to TABLE_NAMEn. shard n+1 will go to TABLE_NAME1 etc.

KINESIS_APPLICATION_NAME

Identifies client in cloud watch (defaults to sqlstream). Identifies a group of consumers processes that will collaborate to process output from the source Kinesis stream. If two agents read from the same Kinesis stream with the same KINESIS_APPLICATION_NAME they will share out the Kinesis shards between them. Whereas if the two agents use two different KINESIS_APPLICATION_NAMEs they will both independently read all the data from the stream.

KINESIS_MAX_RECORDS_PER_GET

(default -1) if > 0 will read multiple records per request. (Usually a large number is better.)

KINESIS_SOCKET_TIMEOUT

Defaults to -1, which means leave at Kinesis setting. If set, will override Kinesis socket timeout in milliseconds.

KINESIS_IDLE_TIME_BETWEEN_READS

Defaults to -1, which means leave at Kinesis setting. If set, will override Kinesis time between reads in milliseconds.

Stopping the Agent

To stop the agent, issue code along the following lines:

call sys_boot.mgmt.stop_agent_hoster('my_agent');

 

Extended Example with JSON Parsing

The code below reads from a Kinesis stream called production-stream, and reads data into an s-Server stream called kinesis_sink_read_json in a schema called myschema. It passes options in for JSON parsing. See Reading JSON for more details on these options.

-- Read data from the TARGET Kinesis stream; not normally used but can be helpful for debugging etc

-- Read into Sink_As_Source.kinesis_sink_read_json

-- Expand JSON into the columns as defined for the output

---

-- agent_name = 'target_listen_json'

-- application_name = 'sqlstream_3'

 

call sys_boot.mgmt.start_agent_hoster('target_listen_json','kinesis','PARSER=JSON&

SCHEMA_NAME=myschema&

TABLE_NAME=kinesis_sink_read_json&

AWS_REGION=us-east-1&

AWS_PROFILE_NAME=default&

AWS_PROFILE_PATH=&

KINESIS_STREAM_NAME=production-stream&

KINESIS_APPLICATION_NAME=sqlstream_3&

STREAM_FANOUT=1&

KINESIS_INITIAL_POSITION_IN_STREAM=LATEST&

KINESIS_MAX_RECORDS_PER_GET_RECORDS=-1&

KINESIS_SOCKET_TIMEOUT=-1&

KINESIS_IDLE_TIME_BETWEEN_READS=-1&

PARSER=JSON&

ROW_PATH=$&

device_key_PATH=$.device_key&

model_code_PATH=$.model_code&

latitude_PATH=$.latitude&

longitude_PATH=$.longitude&

recorded_at_PATH=$.recorded_at&

channel_PATH=$.sensor_readings[0:].channel&

sensor_type_PATH=$.sensor_readings[0:].sensor_type&

metric_value_PATH=$.sensor_readings[0:].metric_value&

other_value_PATH=$.sensor_readings[0:].other_value&

value_PATH=$.sensor_readings[0:].value&

unit_PATH=$.sensor_readings[0:].unit');

 

 

Setting Up an AWS Profile Path

To read from Kinesis, you need to have an AWS configuration file set up. You can set up your aws_access_key_id and secret_access_key on the AWS Console as follows:

1.Open the AWS Console.

int_kinesis_aws_profile_path1

2.Click Identity & Access Management

int_kinesis_aws_profile_path2

3.Click Users.

int_kinesis_aws_profile_path3

4.Click your User ID.

5.Create an Access Key.

int_kinesis_aws_profile_path4

6.When you create an access key, the AWS console will allow you to download a credentials file which will contain the values for aws_access_key_id and secret_access_key.