Sink Output Types

<< Click to Display Table of Contents >>

Navigation:  Using StreamLab > StreamLab Sinks Overview >

Sink Output Types

Previous pageReturn to chapter overviewNext page

External data stream sinks require you to select an output type. Output type determines where the file will be written. This can be the file system, a network socket, AMQP, Kafka, or Amazon Kinesis.

Using a File System Location as a Sink

Setting up a file system location as a sink is reasonably straightforward:

1.Enter a location where the file will be written. This location must be accessible by StreamLab.

2.If desired, enter a prefix and suffix for the file.

3.If desired, change the Filename Date Format for the file. (Files are renamed as they are rotated.)

4.Indicate how the file should be rotated, by entering either:

Maximum Bytes per File. Files will be rotated once they reach this size.

Maximum Time per File. Files will be rotated once you reach the time limit.

sl_file_sink_options

Using a Socket as a Sink

Network sockets initiate a connection over TCP or UDP. Connections can be initiated by remote applications or by StreamLab itself. To tell StreamLab listen to a remote host, use the Remote Host and Remote Port fields. To have StreamLab serve as a host upon which other clients will listen, use the Server Host and Server Port fields.

Connections can optionally be made using IPV6.

sl_socket_options

Name

Description

Remote Host

Hostname to send rows to or receive rows from. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>.

Remote Port

Port to send rows to or receive rows from.

Server Host

The hostname or IP address to listen upon to send/receive rows (defaults to 0.0.0.0).

Server Port

The port to listen upon to send/receive rows.

Socket uses TCP?

Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).

Socket uses IPV6?

Whether or not the socket uses IPV6. Default is false.

Skip Header

True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.

Using AMQP as a Sink

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0  only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of  exchanges, queues and bindings to exchange messages. As a result of this change, you configure AMQP sinks differently for 1.0 than for up to 0.9.1

Connection Options for AMQP 0.9.1

The AMQP option lets you define a sink with an AMQP 0.9.1 message bus. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see see http://www.amqp.org/confluence/display/AMQP/About+AMQP.

As with other input formats, AMQP simply intakes rows as strings in CSV, XML, or JSON format.

To set up an AMQP 0.9.1 sink, you need the following pieces of information:

The hostname or IP address of the AMQP server (defaults to localhost)

The port of the AMQP server (defaults to 5672)

The name of the AMQP exchange--the place where AMQP messages are routed. See https://www.rabbitmq.com/tutorials/amqp-concepts.html for more details.

The type of AMQP exchange--either fanout, topic, or direct. See https://www.rabbitmq.com/tutorials/amqp-concepts.html for more details.

sl_amqp_091_options

Name

Description

Name

hostname or IP address of the AMQP server (default localhost)

Port

port of the AMQP server (default 5672)

EXCHANGE_TYPE

the type of exchange to connect to (default fanout)

EXCHANGE_NAME

the name of the exchange to send/receive tuples to/from,

required

To set up an AMQP 1.0 sink, you need the following pieces of information:

The hostname or IP address of the AMQP server (defaults to localhost)

The port of the AMQP server (defaults to 5672)

The queue or topic name. This is a string value that identifies a target object to read from or to write to. See https://activemq.apache.org/apollo/documentation/amqp-manual.html for more details.

User name and password for AMQP server.

As with other input formats, AMQP 1.0 simply intakes rows as strings in CSV, XML, or JSON format.

sl_amqp_10_options

Name

Description

Name

This is the hostname or IP address of the AMQP server. Defaults to localhost.

Port

This is the port of the AMQP server. Defaults to 5672.

Queue or Topic

String value which identifies a target object to read from or to write to.

User ID

Required. This is the user ID to use to connect to the AMQP 1.0 server,.

Password

Required. This is the password to use to connect to the AMQP 1.0 server.

Using Kafka as a Sink

When you point a sink to Kafka, you can configure a wide range of options for delivering rows to Kafka:

The following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation.html.

Option name

Description

TOPIC

Kafka topic

Client ID

Using this option, you can specify a string to help you identify the application making calls to the Kafka server.

Partitioner

Fully qualified Java classname of Kafka partitioner

Defaults to

com.sqlstream.aspen.namespace.common.KafkaOutputSink.RoundRobinPartitioner

Serializer

Fully qualified Java classname of Kafka serializer, Defaults to kafka.serializer.DefaultEncoder

Key Serializer

Names a serializer class for keys. If no class is given, Kafka uses serializer.class.

Producer

Specifies whether messages sent asynchronously in a background thread. Async lets requests be batched. This helps throughput but increases the possibility that a failed client machine results in unsent data. Defaults to async.

Compression

Specifies the compression codec for generated data, either "none", "gzip" and "snappy".

Compressed Topics

If you have specified a compression.codec (other than "none"), this option lets you limit compression to those topics listed in this option. Empty means apply compression to all topics.

Message Send Retries

If enabled, producer will automatically retry a failed send request for a set number of retries. Note: using this option may, according to Kafka documentation "lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgment to be lost."

Request Required Acks

If async is enabled in producer.mode, this sets a max time for buffering data in milliseconds. For example, "100" means "try to batch together 100ms of messages." Like most buffering, improves throughput but adds message delivery latency.

Request Timeout Ms

When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.

Topic Metadata Refresh

By default, the producer refreshes topic metadata along two lines:

First, at regular intervals, which defaults to every 600000 ms, or 10 minutes.

Second, any time there is a failure, such as a partition missing or leader not being available.

This setting lets you change the regular polling interval by specifying a new interval. If you set this number to a negative, the producer only refreshes on failure. If you set this number to zero, the producer refreshes ever time a message is sent (this is not recommended).

Note: refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed

Request Timeout

If async is enabled in producer.mode, this lets you specify an amount of time to block before dropping messages when the buffer has reached the value specified in queue.buffering.max.messages. If you set this option, to 0 events will be enqueued immediately or dropped if the queue is full. If you set this option to -1, the producer will block indefinitely.

Max Messages

If async is enabled in producer.mode, this lets you specify a maximum number of unsent messages to queue. Once this number is reached, either the producer must be blocked or data must be dropped.

queue.buffering.max.messages

If async is enabled in producer.mode, this lets you specify a maximum number of messages to buffer.

Queue Enqueue Timout

If async is enabled in producer.mode, this lets you specify an amount of time to block before dropping messages when the buffer has reached the value specified in queue.buffering.max.messages. If you set this option, to 0 events will be enqueued immediately or dropped if the queue is full. If you set this option to -1, the producer will block indefinitely.

Batch Messages

If async is enabled in producer.mode, this lets you specify the number of messages to send in one batch. With this option enabled, the producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.

Send Buffer Size

Socket write buffer size.

Using AWS Kinesis as a Sink

To use AWS Kinesis as a sink, you need to specify at minimum the Kinesis stream name.

Option Name

Description

Kinesis Stream Name

Required. Name of Kinesis stream to write to. No default.

Kinesis Region

Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.

Partition ID

Optional. Partition id of shard to write to. Defaults to ''. Can be overwritten by a stream column named PARTITION_ID.

Buffer Size

Maximum number of bytes per update request.

Max Retries

Optional. If insert fails due to throttling, how many retries should be made. Backoff is doubled for each retry up to max. Default is 10.

Initial Backoff

Optional. If insert fails due to throttling, how many milliseconds to back off initially. Default to 20.

Max Backoff

Optional. If insert fails due to throttling, how many milliseconds to back off as a maximum. Defaults to 20480.

Max Records Per Request

Optional. maximum number of records per update request. Defaults to 500.

AWS Profile Path

See Setting Up an AWS Profile Path  in the topic Reading from Kinesis Streams in the Integration Guide . Must point to a credential file on the s-Server file system with the following format:

[default]

aws_access_key_id = xxx

aws_secret_access_key = yyy

 

This defaults to '' - which goes to ~/.aws/credentials.

Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path  in the topic Reading from Kinesis Streams in the Integration Guide .

AWS Profile Name

Optional. Profile name to use within credentials file. Defaults to default.

Report Frequency

Optional. How often to log (in milliseconds) statistics. Defaults to 0, which means "never."