Writing Files to Remote Locations

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing Files to Remote Locations

Previous pageReturn to chapter overviewNext page

To write to files in remote locations, you need to configure and instantiate the Extensible Common Data agent. This agent is a standalone Java application that mediates data between SQLstream s-Server and external data sources.

The agent is supplied either as part of the distributed SQLstream product or as part of the ClientTools download from the SQLstream website (via SQLstream-5.1.0-clienttools-linux.run or SQLstream-client-tools-5.1.0-windows.exe).

The ECDA Agent uses the SQLstream JDBC driver to select from a stream in s-Server. You need to create a stream before implementing the agent.

The ECDA Agent reads rows from an existing stream in s-Server and writes them to a destination on a remote server .

The ECDA Agent reads rows from an existing stream in s-Server and writes them to a destination on a remote server
.

The agent itself is a wrapper (or "hoster") for the ECDA (Extensible Common Data Adapter). See the topic Reading from Other Sources for more information on the ECD adapter. The agent is configured much like the adapter, except that you use command-line options or properties files instead of foreign stream options to configure the agent.

Requirements

The server hosting the ECDA Agent must have a Java Runtime Environment (or a JDK) installed.

Configuring and Using the ECDA Agent

To use the agent, you launch it from the command line, using a command along the following lines:

./commondataagent.sh --output --io file --props /var/sqlstream/etc/file_demo.properties --schemaName fileSource --tableName fileSourceStream

 

The agent can be launched from anywhere on the machine on which it is installed. The script require the following minimum command line arguments:

--output Specififies that the agent reads from the server on which the agent resides.
--io Determines the input/output format to be used by the agent. Options are file (which is the default), net (network), mail (mail server), amqp, mqseries, kafka, or hdfs.
--schemaName, --tableName These indicate the s-Server stream to be read from. You can configure these through the properties file. When specified on the command line, they override any setting in the properties file.

Note: For Kafka, you also need to specify a location for the Kafka plugin, using the option --plugin, such as -plugin /opt/sqlstream/5.1.0.14827/s-Server/plugin/kafka/kafka.jar

Configuring the EcdaAgent

You configure the rest of the EcdaAgent either by specifying a properties file or using command-line parameters. We recommend using the properties file option, though it may be useful at times to use base configuration information in the properties file and use the command line to change one or two options.

Using Command Line Options Versus Property File Options

You use Property File options to specify how the ECDA Agent communicates with a particular data source, such as a log file. The properties file configures the format type, directory, separator character, and so on. You can also configure stream information in the properties file.

However, you may wish to use the same agent to connect with another instance of s-Server. In this case, you can configure server connection and stream information from the command line, using the arguments defined above.

Running the Agent

To invoke the ECDA Agent:

1.Declare or identify a destination stream in s-Server. For more information on creating a stream, see the topic CREATE STREAM in the Streaming SQL Reference Guide

Note: When you install client tools as a standalone installation, the installer asks you for the name of s-Server. The installer adds this information to file called client-tools/default.conn.prop. If you are connecting to this instance of s-Server, you do not need to configure connection information for s-Server when you run the ECD Agent. See Installing Client tools in Installation Guide for more information.

2.Create a properties file with information about the data source and destination stream. Property file options are described below. The following example is for an agent that reads from a stream called fileSourceStream in a schema called fileSource and writes to an XML file located in the directory /TMP/XML_TEST/:

# Location, name, and type of file

DIRECTORY=/TMP/XML_TEST/

FORMAT_TYPE=XML

FILENAME_PREFIX=TEST-

FILENAME_SUFFIX=.XML

# Configuration for formatter

DOC_ELEMENTS=/DOC/ELEMENTS/

ROW_ELEMENTS=/ROWELEMENTS/

DATA_ELEMENTS=DATA_ELEMENTS

ID_ELEMENTS=ID_ELEMENTS

ID_ATTRIBUTES=ID_ATTRIBUTES

CHARACTER_ENCODING=UTF-8

FORMATTER_INCLUDE_ROWTIME=FALSE

#Configuration for file rotation

MAX_TIME_DELTA_PER_FILE=60000

# Schema, name, and parameter signature of source stream

SCHEMA_NAME=fileSource

TABLE_NAME=fileSourceStream

#List of columns and types for source stream

#ROWTYPE=RECORDTYPE(DOUBLE id, TIMESTAMP reported_at, DOUBLE shift_no, VARCHAR(4096) trip_no , VARCHAR(4096) route_variant_id, VARCHAR(4096) waypoint_id, VARCHAR(4096) last_known_location_state, VARCHAR(4096) lat, VARCHAR(4096) lon, DOUBLE speed, VARCHAR(4096) bearing, DOUBLE driver_no , VARCHAR(4096) prescribed, DOUBLE highway, TIMESTAMP created_at, TIMESTAMP updated_at)

 

3.Run the commondataagent.sh script. The following example specifies that the agent is run as input, with a format type of kafka, using a properties file at /var/sqlstream/etc/kafka_demo.properties.

./commondataagent.sh --output --io kafka --props /var/sqlstream/etc/kafka_demo.properties --plugin /opt/sqlstream/5.1.0.14827/s-Server/plugin/kafka/kafka.jar

 

Note: When you start the agent, you may see error messages related to logging. You can ignore these messages.

Command Line Arguments for ECDA Agent

Command Line Argument

Definition

--input, --output

--input specifies reader (inserting)

--output specifies writer (selecting)

One of these is required.

--io

Input/output source. Defaults to file. Options are

file--reads/writes over the file system

net--reads/writes over a network

amqp--reads/writes over amqp

kafka--reads from/writes to kafka

mqseries--reads from/writes to mqseries

mail--reads from/writes to mail server

hdfs--reads from/writes to Hadoop file system

--props

Indicates properties file to be passed. Some properties, such as --rowType, --tableName, --schemaName can be also be configured through the command line.

--schemaName

--tableName

These indicate the s-Server stream to be read from. They override any setting in the properties file.

--rowType

This is optional in many use cases, because the parser will automatically generate column names. This is always RecordType and takes as options column names for the stream. In writing column names, you enter the column type first, such as VARCHAR(2040) or INTEGER, then the column name exactly as it appears in the stream declaration. Column names have implicit quotation marks; that is, id in a ROWTYPE property is equivalent to "id" in a stream declaration.

--ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at)

--uri, --user, --pass

Connection information for the s-Server JDBC driver. You only need to specify these if you are connecting to an s-Server other then the one defined in in client-tools/default.conn.properties.

--formatter

This also overrides any setting in the properties file. File format to be processed by agent.

Possible values are CSV, XML, JSON, Custom

--plugin

This is the location of the jar file which contains the appropriate ECDA plugin. For example, for the Kafka Agent, this argument would be set to a value like this:

plugin/kafka/kafka.jar

 

The resulting boot command for the Kafka Agent would then be

clienttools/commondata/commondataagent.sh --input --io kafka --props $propertyFile --plugin plugin/kafka/kafka.jar

 

where $propertyFile is the name of the file containing the agent property files (as described below).

At minimum, you need to specify the agent as a reader or writer and a configuration option.

Properties File Configuration Options

The properties file determines most of the options for the agent (most of the options set as part of a foreign stream definition for the ECD adapter). Some options are general to all sources, and other options need to be specified for each source. See the tables below for more details.

To specify a properties file, enter the following:

--props example.properties

 

where example.properties is the name of your property file.You list property files using a text editor. Property names are case-sensitive.

By default, the EcdaAgent uses the connection info shared with all the other tools and agents in client-tools/default.conn.properties, and uses a default data format of file.

Property Name

Description

WRITE_HEADER

True or False

DIRECTORY

Directory in which file resides or to which you are writing.

SCHEMA_NAME

Name of schema in s-Server for the stream to be read from. This needs to match the name of the s-Server schema exactly.

TABLE_NAME

Name of s-Server stream or table to be read from. This needs to match the name of the s-Server stream exactly.

ROWTYPE

This is always RecordType and takes as options column names for the stream. In writing column names, you enter the column type first, such as VARCHAR(2040) or INTEGER, then the column name exactly as it appears in the stream declaration. Column names have implicit quotation marks; that is id in a ROWTYPE property is equivalent to "id" in a stream declaration.

-ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at)

CHARACTER_ENCODING

Default: UTF-8

Any Java Supported Encoding. Use encoding implemented by java.nio, see:

http://download.oracle.com/javase/6/docs/technotes/guides/intl/encoding.doc.html?toc=0

SEPARATOR

Defines the characters used to delimit the columns in your data. Any character in the list will be used as a delimiter, including a tab. For example, after specifying SEPARATOR ',;:', every comma, semicolon, and colon in a log record starts a new column.

SKIP_HEADER

Whether or not a formatter will write a header.

FORMATTER_INCLUDE_ROWTIME

True or False

MAX_BYTES_PER_FILE

Maximum number of bytes to write to a file before rotation. Either MAX_BYTES_PER_FILE or MAX_TIME_DELTA_PER_FILE must be specified for an output file set.

MAX_TIME_DELTA_PER_FILE

Maximum difference in rowtime in a specific file in milliseconds before rotation. Examples: 60000 (1 minute), 30000 (30 seconds), 3600000 (1 hour), 43200000 (12 hours), 86400000 (1 day).

Either MAX_BYTES_PER_FILE or MAX_TIME_DELTA_PER_FILE must be specified for an output file set.

FILENAME_PREFIX

Output only. Prefix of the final output file name.

FILENAME_DATE_FORMAT

Output only. Date format to use in the final output file name.

FILENAME_SUFFIX

Output only. Suffix of the final output file name

Options Specific to the File System

Option

Description

DIRECTORY

Directory to which you are writing. s-Server needs permission to write to this directory.

ORIGINAL_FILENAME

Name of file to write before rotation. This will be the name of the file that s-Server is currently writing.

ESCAPE_<column name>

True or false; defaults to true. Causes strings to be escaped before being written.

FILENAME_PREFIX

Prefix of the final output file name, such as "test-".

FILENAME_DATE_FORMAT

Java time format to use in the final output file name, for example yyyy-MM-dd_HH:mm:ss

Uses java SimpleDateFormat

FILENAME_SUFFIX

Suffix of the final output file name. Needs to include period if desired, e.g. ".csv"

MAX_BYTES_PER_FILE

Maximum number of bytes to write to a file before rotation. Either MAX_BYTES_PER_FILE or MAX_TIME_DELTA_PER_FILE must be specified for an output file set.

MAX_TIME_DELTA_PER_FILE

Maximum difference in rowtime in a specific file. Either MAX_BYTES_PER_FILE or MAX_TIME_DELTA_PER_FILE must be specified for an output file set.

Options Specific to Sockets

Options specific to sockets appear below. The ECD agent can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT.  When it acts a server, set SERVER_PORT and if desired SERVER_HOST.

Options specific to sockets appear below. The ECD framework can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT.  When it acts a server, set SERVER_PORT and if desired SERVER_HOST.

Name

Description

IS_IPV6

Whether or not the socket uses IPV6. Default is false.

IS_TCP

Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).

REMOTE_HOST

Hostname to send tuples to or receive tuples from, when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client.

REMOTE_PORT

Port to send tuples to or receive tuples from when ECDA is acting as a client. REMOTE_* and SERVER_* tells ECDA's socket code how to start the network connection (as a server or a client).

SERVER_HOST

The hostname or IP address to listen upon to send/receive tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client.

SERVER_PORT

the port to listen upon to send/receive tuples when ECDA is acting as a server.

Options Specific to AMQP

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0  only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of  exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1

Foreign Stream, Table, or Properties Options for AMQP

Name

Description

CONNECTION_URL

Required. Connection URL for AMQP 1.0 server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.

Depending on your AMQP implementation, your URL might look something like the following:

'amqp://guest:guest@localhost'

 

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.

The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.

DESTINATION

Required. AMQP 1\.0 queue or topic identifier.

5_2_indicator In s-Server 5.2, DESTINATION can be either an absolute destination or be in the form:

<destination prefix>{PARITITION}<destination suffix>

Example:

/new/ConsumerGroups/$Default/Partitions/{PARTITION}

 

 

ACKNOWLEDGE_MODE

Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.

DELIVERY_MODE

Optional. Delivery mode for messages that ECDA communicates to the AMQP 1.0 server. Options are NON-PERSISTENT or PERSISTENT; defaults to NON-PERSISTENT. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-outbound-channel-adapter-xml-7a

 

5_2_indicator Version 5.2 Feature

PARTITION_EXPRESSION

Use only if DESTINATION includes "{PARTITION}".

This should be a dk.brics regular expression, such as "<0-3>".

 

Options Specific to Kafka

Options specific to writing files to Kafka in a remote location appear below:

 

Option name

Description

TOPIC

Kafka topic

metadata.broker.list

hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.

partitioner.class

Fully qualified Java classname of Kafka partitioner

Defaults to

com.sqlstream.aspen.namespace.common.KafkaOutputSink.RoundRobinPartitioner)

serializer.class

Fully qualified Java classname of Kafka serializer, Defaults to kafka.serializer.DefaultEncoder

key.serializer.class

Names a serializer class for keys. If no class is given, Kafka uses serializer.class.

producer.type

Specifies whether messages sent asynchronously in a background thread. Async lets requests be batched. This helps throughput but increases the possibility that a failed client machine results in unsent data. Defaults to async.

compression.codec

Specifies the compression codec for generated data, either "none", "gzip" and "snappy".

compressed.topics

If you have specified a compression.codec (other than "none"), this option lets you limit compression to those topics listed in this option. Empty means apply compression to all topics.

message.send.max.retries

If enabled, producer will automatically retry a failed send request for a set number of retries.

Note: using this option may, according to Kafka documentation "lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgment to be lost."

retry.backoff.ms

Producers refreshes metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing.

request.required.acks

If async is enabled in producer.mode, this sets a max time for buffering data in milliseconds. For example, "100" means "try to batch together 100ms of messages." Like most buffering, improves throughput but adds message delivery latency.

request.timeout.ms

When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.

topic.metadata.refresh.interval.ms

By default, the producer refreshes topic metadata along two lines:

First, at regular intervals, which defaults to every 600000 ms, or 10 minutes.
Second, any time there is a failure, such as a partition missing or leader not being available.

This setting lets you change the regular polling interval by specifying a new interval. If you set this number to a negative, the producer only refreshes on failure. If you set this number to zero, the producer refreshes ever time a message is sent (this is not recommended).

Note: refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed.

queue.buffering.max.ms

If async is enabled in producer.mode, this lets you specify a maximum number of unsent messages to queue. Once this number is reached, either the producer must be blocked or data must be dropped.

queue.buffering.max.messages

If async is enabled in producer.mode, this lets you specify a maximum number of messages to buffer.

 

queue.enqueue.timeout.ms

If async is enabled in producer.mode, this lets you specify an amount of time to block before dropping messages when the buffer has reached the value specified in queue.buffering.max.messages. If you set this option, to 0 events will be enqueued immediately or dropped if the queue is full. If you set this option to -1, the producer will block indefinitely.

batch.num.messages

If async is enabled in producer.mode, this lets you specify the number of messages to send in one batch. With this option enabled, the producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.

send.buffer.bytes

Socket write buffer size.

client.id

Using this option, you can specify a string to help you identify the application making calls to the Kafka server.

Options Specific to IBM MQ

Format Name

Name

QUEUE_MANAGER_NAME

Required. Name of MQSeries Queue Manager.

HOSTNAME

Required. MQSeries hostname. Defaults to localhost.

PORT

Optional. Port for MQSeries server. Defaults to 1414.

USERID

User name for MQseries server.

PASSWORD

Password for MQseries server.

CHANNEL_NAME

Server connection channel for the MQSeries server.

TOPIC_NAME

Name of the MQSeries topic from which you are receiving data. Need to specify either QUEUE_NAME or TOPIC_NAME but not both.

TOPIC_OBJECT

Any MQSeries administrative topic objects for the topic. See http://pic.dhe.ibm.com/infocenter/wmqv7/v7r0m0/topic/com.ibm.mq.amqnar.doc/ps12490_.htm for more details.

QUEUE_NAME

Name of the queue from which you are receiving data. Need to specify either QUEUE_NAME or TOPIC_NAME but not both.