Configuring Streaming Data Sources

<< Click to Display Table of Contents >>

Navigation:  Using StreamLab > StreamLab Sources Overview > Adding a Streaming Data Source >

Configuring Streaming Data Sources

Previous pageReturn to chapter overviewNext page

Streaming Data sources make use of s-Server's Extensible Common Data framework. This framework allows you to read and write rows of data in a range of forms over a range of input/output formats, including the file system, network sockets, AMQP, Amazon Kinesis, and Kafka. All data is sent as a string in CSV, XML, or JSON format

Using the File System as a Streaming Data Source

To read streaming data over the file system, you need two pieces of information:

The directory in which the file resides.

A pattern for the file's name. Here, you enter part of the file's name, such as output, csv, or log. No quotation marks are necessary.

sl_file_options

 

DIRECTORY

Directory in which file resides.

FILENAME_PATTERN

Regular expression defining which files to read.

Using a Network Socket as a Streaming Data Source

To read from a line, CSV, XML, or JSON file over a network socket, you need to configure the socket connection. You may want to consult with whoever has set up the application with which StreamLab will connect over the socket.

Network sockets initiate a connection over TCP or UDP. Connections can be initiated by remote applications or by StreamLab itself. To tell StreamLab listen to a remote host, use the Remote Host and Remote Port fields.

Connections can optionally be made using IPV6.

Name

Description

Remote Host

Hostname to send rows to or receive rows from. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>.

Remote Port

Port to send rows to or receive rows from.

Socket uses TCP?

Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).

Socket uses IPV6?

Whether or not the socket uses IPV6. Default is false.

Skip Header

True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.

Using AMQP as a Streaming Data Source

To read from a streaming data source over AMQP, you need to configure the AMQP connection. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see see http://www.amqp.org/confluence/display/AMQP/About+AMQP. You may want to consult with whoever has set up AMQP in your environment.

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0  only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of  exchanges, queues and bindings to exchange messages. As a result of this change, you configure StreamLab for AMQP differently for 1.0 than for up to 0.9.1

sl_amqp_options

Name

Description

AMQP URL

Required. Connection URL for AMQP legacy server. This includes the server's hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.

AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)

Format: amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList> ]

Example: amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'

AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:

Format: amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'

Example: amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

 

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.

Destination

Required. AMQP 0.9 queue or topic identifier. This can be in the form <destination prefix>{PARITITION}<destination suffix>.

Partition Expression

You should only use this if DESTINATION includes "{PARTITION}". This should be a dk.brics regular expression, such as <0-3>.

Queue Size

Queue size for parser. Reading only. Defaults to 2. In most cases, you will not want to change this number.

Acknowledge Mode

Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.

Using Amazon Kinesis as a Streaming Data Source

To read from a streaming data source over Amazon Kinesis, you need to configure the Amazon Kinesis connection.

Option Name

Description

Kinesis Stream Name

Required. Name of Kinesis stream to write to. No default.

AWS Region

Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.

Kinesis Application Name

Identifies client in cloud watch (defaults to sqlstream).

AWS Profile Path

See Setting Up an AWS Profile Path  in the topic Reading from Kinesis Streams in the Integration Guide .  Must point to a credential file on the s-Server file system with the following format:

[default]

aws_access_key_id = xxx

aws_secret_access_key = yyy

 

This defaults to blank, which goes to ~/.aws/credentials.

Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path  in the topic Reading from Kinesis Streams in the Integration Guide .

AWS Profile Name

Optional. Profile name to use within credentials file. Defaults to default.

Stream Fanout

How many s-Server streams to create with this Kinesis stream.

Initial Position

LATEST for latest or TRIM_HORIZON for earliest. Defaults to LATEST.

Max Records/Get

(defaults to -1) if > 0 will read multiple records per request. (Usually a large number is better.)

Socket Timeout

(defaults to -1) if set will override kinesis socket timeout

Agent Name

This is the name used in s-Server to refer to this Kinesis connection. Any unique name is fine.

Using Kafka as a Streaming Data Source

To read from a line, CSV, XML, or JSON file over Kafka, you need to configure the connection to Kafka. Kafka is an open-source, real-time publish-subscribe messaging framework. See http://kafka.apache.org/ for more details. You may want to consult with whoever has set up the Kafka messaging system in your environment.

To connect with Kafka, you need two pieces of information:

The name and port of the Kafka broker (this defaults to localhost:9092, but the source will not work if a Kafka broker is not working at this location).

The Kafka topic name from which you are reading.

The other configuration details below help manage the starting point for reading Kafka topics as well as the amount of data fed to StreamLab.

Format Name

Name

Name/Port

hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.

Topic

Required. Kafka Topic

Starting Time

The time to start reading from specified topic. Options are LATEST, EARLIEST, or a long int representing a timestamp

(milliseconds since epoch). Defaults to LATEST.

Starting Offset

When to start reading from (Default is -1) as a long int representing a timestamp (milliseconds since epoch)

Partition

Number of Kafka partition to read from. Defaults to 0.

Buffer Size

Buffer size in bytes. Defaults to 1048576.

Fetch Size

Fetch size. Defaults to 100000.