Reading Files in Remote Locations

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Reading Data into s-Server > Reading from Other Sources  >

Reading Files in Remote Locations

Previous pageReturn to chapter overviewNext page

To read files in remote locations, you need to configure and instantiate the Extensible Common Data agent. This agent is a standalone Java application that mediates data between SQLstream s-Server and external data sources. You can install the agent on a remote server in order to process data at the site of collection. For example, if you have a server which generates log files, you may find it advantageous to have the ECDA Agent parse those files on the server itself, and pass rows to s-Server, as opposed to passing the entire file.

The agent is supplied either as part of the distributed SQLstream product or as part of the ClientTools download from the SQLstream website (via SQLstream-5.1.0-clienttools-linux.run or SQLstream-client-tools-5.1.0-windows.exe).

The ECDA Agent uses the SQLstream JDBC driver to communicate with a stream in s-Server. You need to create a stream before implementing the agent.

The ECDA Agent feeds rows to an existing stream in s-Server.

The ECDA Agent feeds rows to an existing stream in s-Server.

The agent itself is a wrapper (or "hoster") for the ECDA (Extensible Common Data Adapter). See the topic Reading from Other Sources for more information on the ECD adapter. The agent is configured much like the adapter, except that you use command-line options or properties files instead of foreign stream options to configure the agent.

Requirements

The server hosting the ECDA Agent must have a Java Runtime Environment (or a JDK) installed.

Configuring and Using the ECDA Agent

To use the agent, you launch it from the command line, using a command along the following lines:

./commondataagent.sh --input --io kafka --props /var/sqlstream/etc/kafka_demo.properties --schemaName kafkaSource --tableName kafkaSourceStream --plugin /opt/sqlstream/5.1.0.14827/s-Server/plugin/kafka/kafka.jar

 

The agent can be launched from anywhere on the machine on which it is installed. The script require the following minimum command line arguments:

--input Specififies that the agent reads from the server on which the agent resides.
--io Determines the input/output format to be used by the agent. Options are file (which is the default), net (network), mail (mail server), amqp, mqseries, kafka, or hdfs.
--schemaName (or SCHEMA_NAME) Indicates the s-Server schema containing the stream to be inserted to or read from (see below). AlternatelyAlternately, you can configure this through the properties file, but one configuration option must be present. When specified on the command line, they override any setting in the properties file.
--streamName (or STREAM_NAME) Indicates the s-Server stream to be inserted into or read from. Alternately, you can configure this through the properties file, but one configuration option must be present. When specified on the command line, they override any setting in the properties file.

Note: For Kafka, you also need to specify a location for the Kafka plugin, using the option --plugin, such as -plugin /opt/sqlstream/5.1.0.14827/s-Server/plugin/kafka/kafka.jar

Configuring the EcdaAgent

You configure the rest of the EcdaAgent either by specifying a properties file or using command-line parameters. We recommend using the properties file option, though it may be useful at times to use base configuration information in the properties file and use the command line to change one or two options.

Using Command Line Options Versus Property File Options

You use Property File options to specify how the ECDA Agent communicates with a particular data source, such as a log file. The properties file configures the format type, directory, separator character, and so on. You can also configure stream information in the properties file.

However, you may wish to use the same agent to connect with another instance of s-Server. In this case, you can configure server connection and stream information from the command line, using the arguments defined above.

Running the Agent

To invoke the ECDA Agent:

1.Declare or identify a destination stream in s-Server. For more information on creating a stream, see the topic CREATE STREAM in the Streaming SQL Reference Guide

Note: When you install client tools as a standalone installation, the installer asks you for the name of s-Server. The installer adds this information to file called client-tools/default.conn.prop. If you are connecting to this instance of s-Server, you do not need to configure connection information for s-Server when you run the ECD Agent. See Installing Client tools in Installation Guide for more information.

2.Create a properties file with information about the data source and destination stream. Property file options are described below. The following example is for an agent that reads Kafka topics from a server called localhost and inserts them into a stream called kafkaSourceStream in a schema called kafkaSource. Property names must be entered in uppercase.

#Options specific to data destination

SEEDBROKERS=localhost

TOPIC=TutorialTopic

STARTING_TIME=latest

#Parser type

PARSER=CSV

CHARACTER_ENCODING=UTF-8

# Schema and name of destination stream

SCHEMA_NAME=kafkaSource

TABLE_NAME=kafkaSourceStream

 

3.Run the commondataagent.sh script. The following example specifies that the agent is run as input, with a format type of kafka, using a properties file at /var/sqlstream/etc/kafka_demo.properties.

./commondataagent.sh --input --io kafka --props /var/sqlstream/etc/kafka_demo.properties --plugin /opt/sqlstream/5.1.0.14827/s-Server/plugin/kafka/kafka.jar

 

Note: When you start the agent, you may see error messages related to logging. You can ignore these messages.

Command Line Arguments for ECDA Agent

Command Line Argument

Definition

--input, --output

--input specifies reader (inserting)

--output specifies writer (selecting)

One of these is required.

--io

Input/output source. Defaults to file. Options are

file--reads/writes over the file system

net--reads/writes over a network

amqp--reads/writes over amqp

kafka--reads from/writes to kafka

mqseries--reads from/writes to mqseries

mail--reads from/writes to mail server

hdfs--reads from/writes to Hadoop file system

--props

Indicates properties file to be passed. Some properties, such as --rowType, --tableName, --schemaName can be also be configured through the command line.

--schemaName

--tableName

These indicate the s-Server stream to be inserted into or read from. They override any setting in the properties file.

--rowType

This is optional in many use cases, because the parser will automatically generate column names. This is always RecordType and takes as options column names for the stream. In writing column names, you enter the column type first, such as VARCHAR(2040) or INTEGER, then the column name exactly as it appears in the stream declaration. Column names have implicit quotation marks; that is, id in a ROWTYPE property is equivalent to "id" in a stream declaration.

--ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at)

--uri, --user, --pass

Connection information for the s-Server JDBC driver. You only need to specify these if you are connecting to an s-Server other then the one defined in in client-tools/default.conn.properties.

--parser

This also overrides any setting in the properties file. File format to be parsed by agent. Possible values are CSV, XML, JSON, FCLP, VCLP, W3C, FastRegex, MultiRow, Custom

--formatter

This also overrides any setting in the properties file. File format to be processed by agent.

Possible values are CSV, XML, JSON, Custom

--plugin

This is the location of the jar file which contains the appropriate ECDA plugin. For example, for the Kafka Agent, this argument would be set to a value like this:

plugin/kafka/kafka.jar

 

The resulting boot command for the Kafka Agent would then be

clienttools/commondata/commondataagent.sh --input --io kafka --props $propertyFile --plugin plugin/kafka/kafka.jar

 

where $propertyFile is the name of the file containing the agent property files (as described below).

At minimum, you need to specify the agent as a reader or writer and a configuration option.

Properties File Configuration Options

The properties file determines most of the options for the agent (most of the options set as part of a foreign stream definition for the ECD adapter). Some options are general to all sources, and other options need to be specified for each source. See the tables below for more details.

To specify a properties file, enter the following:

--props example.properties

 

where example.properties is the name of your property file.You list property files using a text editor. Property names are case-sensitive.

By default, the EcdaAgent uses the connection info shared with all the other tools and agents in client-tools/default.conn.properties, and uses a default data format of file.

You can read from a CSV file and insert into a stream using the following properties in example.properties:

# Location, name, and type of file

DIRECTORY=/tmp

filename_pattern=buses\.log

SEPARATOR=,

FORMAT_TYPE=CSV

CHARACTER_ENCODING=UTF-8

# Schema, name, and parameter signature of destination stream

SCHEMA_NAME=fileSource

TABLE_NAME=fileSourceStream

#List of columns and types for destination stream

ROWTYPE=RECORDTYPE(DOUBLE id, TIMESTAMP reported_at, DOUBLE shift_no, VARCHAR(4096) trip_no , VARCHAR(4096) route_variant_id, VARCHAR(4096) waypoint_id, VARCHAR(4096) last_known_location_state, VARCHAR(4096) lat, VARCHAR(4096) lon, DOUBLE speed, VARCHAR(4096) bearing, DOUBLE driver_no , VARCHAR(4096) prescribed, DOUBLE highway, TIMESTAMP created_at, TIMESTAMP updated_at)

 

Property Name

Description

PARSER

Format type for the file to be read from or written to by the Agent. Options are CSV, XML, FCLP, VCLP, W3C, FastRegex, MultiRow, and Custom.

WRITE_HEADER

True or False

DIRECTORY

Directory in which file resides or to which you are writing.

FILENAME_PATTERN

Input only. Regular expression defining which files to read from the location specified in DIRECTORY. If you do not specify a pattern, the agent tries all files in the directory. If the agent/adapter finds multiple files that match the pattern, it processes files one by one by picking files ordered by names alphabetically.

SCHEMA_NAME

Name of schema in s-Server for the stream to be inserted into or read from. This needs to match the name of the s-Server schema exactly.

TABLE_NAME

Name of s-Server stream or table to be inserted into or read from. This needs to match the name of the s-Server stream exactly.

ROWTYPE

This is always RecordType and takes as options column names for the stream. In writing column names, you enter the column type first, such as VARCHAR(2040) or INTEGER, then the column name exactly as it appears in the stream declaration. Column names have implicit quotation marks; that is id in a ROWTYPE property is equivalent to "id" in a stream declaration.

-ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at)

CHARACTER_ENCODING

Default: UTF-8

Any Java Supported Encoding. Use encoding implemented by java.nio, see:

http://download.oracle.com/javase/6/docs/technotes/guides/intl/encoding.doc.html?toc=0

SEPARATOR

Defines the characters used to delimit the columns in your data. Any character in the list will be used as a delimiter, including a tab. For example, after specifying SEPARATOR ',;:', every comma, semicolon, and colon in a log record starts a new column.

Options Specific to the File System

Option

Description

DIRECTORY

Directory in which file resides or to which you are writing.

FILENAME_PATTERN

Input only. Java regular expression defining which files to read. See https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html for more information on Java regular expressions.

STATIC_FILES

Defaults to false. When you set this to true, you indicate that no files will be added or changed after the file reader is started. File reader will exit after the last file is read. This lets you use the file reader as a foreign table, which is finite (as opposed to a foreign stream, which is infinite, and handles files that are continually written to).

Options Specific to Sockets

Options specific to sockets appear below. The ECD agent can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT.  When it acts a server, set SERVER_PORT and if desired SERVER_HOST.

Name

Description

IS_IPV6

Whether or not the socket uses IPV6. Default is false.

IS_TCP

Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).

REMOTE_HOST

Hostname to send tuples to or receive tuples from, when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client.

REMOTE_PORT

Port to send tuples to or receive tuples from when ECDA is acting as a client. REMOTE_* and SERVER_* tells ECDA's socket code how to start the network connection (as a server or a client).

SERVER_HOST

The hostname or IP address to listen upon to send/receive tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client.

SERVER_PORT

the port to listen upon to send/receive tuples when ECDA is acting as a server.

Options Specific to AMQP

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0  only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of  exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1

Foreign Stream/Table Options or Agent Properties Settings for AMQP

You configure adapter options through foreign streams/tables. You configure agent options through the ECD agent property file.

Name

Description

CONNECTION_URL

Required. Connection URL for AMQP legacy server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.

Depending on your AMQP implementation, your URL might look something like the following:

'amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672'''

 

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.

DESTINATION

Required. AMQP 0.9 queue or topic identifier.

PARSER_QUEUE_SIZE

Queue size for parser. Reading only. Defaults to 2. In most cases, you will not want to change this number.

ACKNOWLEDGE_MODE

Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.

Options Specific to Kafka

Options specific to reading files from Kafka in a remote location appear below:

Format Name

Name

TOPIC

Required. Kafka Topic

STARTING_TIME

The time to start reading from specified topic. Options are LATEST, EARLIEST, or a long int representing a timestamp

(milliseconds since epoch). Defaults to LATEST.

STARTING_OFFSET

When to start reading from (default is -1) as a long int representing a timestamp (milliseconds since epoch)

SEED_BROKERS

Name of the Kafka host. Defaults to localhost.

PORT

Port for Kafka server. Defaults to 9092.

PARTITION

Partition number to read from.

If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from.

You can specify a single partition with:

PARTITION <partition number>

or a range with:

PARTITION <first partition number>-<last partition number>

Note: Partition numbers are 0 based.

PARTITION_OFFSET_QUERY

Returns the starting offsets for all partitions. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start.

Should be something like "SELECT PARTITION, OFFSET FROM [TABLE NAME]".

PARTITION should be of type INTEGER.

OFFSET should be of type BIGINT.

BUFFER_SIZE

Buffer size in bytes. Defaults to 1048576.

FETCH_SIZE

Fetch size. Defaults to 1000000.

MAX_WAIT

Maximum number of milliseconds to wait on a read attempt. Defaults to 2000. Affects the time needed to cancel a query on the Kafka topic.

CLIENT_ID

Client key for Yammer metrics. CLIENT_ID defaults to client_{TOPIC} or CLIENT_{TOPIC}_{PARTITION} if PARTITION is specified.

CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true.

See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics.

METRICS_PER_PARTITION

True or False.

If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported).

Options Specific to IBM MQ

Format Name

Name

QUEUE_MANAGER_NAME

Required. Name of MQSeries Queue Manager.

HOSTNAME

Required. MQSeries hostname. Defaults to localhost.

PORT

Optional. Port for MQSeries server. Defaults to 1414.

USERID

User name for MQseries server.

PASSWORD

Password for MQseries server.

CHANNEL_NAME

Server connection channel for the MQSeries server.

TOPIC_NAME

Name of the MQSeries topic from which you are receiving data. Need to specify either QUEUE_NAME or TOPIC_NAME but not both.

TOPIC_OBJECT

Any MQSeries administrative topic objects for the topic. See http://pic.dhe.ibm.com/infocenter/wmqv7/v7r0m0/topic/com.ibm.mq.amqnar.doc/ps12490_.htm for more details.

QUEUE_NAME

Name of the queue from which you are receiving data. Need to specify either QUEUE_NAME or TOPIC_NAME but not both.