Writing to AMQP

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing to AMQP

Previous pageReturn to chapter overviewNext page

Show/Hide Hidden Text

To write data to an AMQP message bus, use the Extensible Common Data Framework. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see see https://cwiki.apache.org/confluence/display/CAMEL/AMQP.

To write data, you first define a server object with connection information, including the directory and information on file rotation. Once you define this server object, you can write to the file system by referencing it. See the topic CREATE SERVER in the s-Server Streaming SQL Reference Guide for more details.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Configuring the ECD Adapter for AMQP

In these streams each row represents a single message, and has two columns: the message headers and the message body. The message body is one varbinary column. The message headers are concatenated to form one varchar column, with the format TAG=VALUE TAG=VALUE ....

You configure the Extensible Common Data Adapter (ECDA)  to read or write over AMQP using server options and foreign table/stream options. You configure the Extensible Common Data Agent using the same options in a property file. See the topic Extensible Common Data Agent Overview for more details.

Note: ECD Adapter server definitions need to reference the ECD foreign data wrapper. You can do so with the syntax FOREIGN DATA WRAPPER ECDA.

You reference the library com.sqlstream.aspen.namespace.common.AmqpColumnSet (AMQP .9) or com.sqlstream.aspen.namespace.common.Amqp10ColumnSet (AMQP 1.0) as in the following code:

For AMQP 0.9

CREATE OR REPLACE SERVER "AmqpServer"

FOREIGN DATA WRAPPER ECDA

OPTIONS (classname 'com.sqlstream.aspen.namespace.common.AmqpColumnSet');

 

For AMQP 1.0

CREATE OR REPLACE SERVER "AmqpServer"

FOREIGN DATA WRAPPER ECDA

OPTIONS (classname 'com.sqlstream.aspen.namespace.common.Amqp10ColumnSet');

 

Next, you create a foreign stream object. This object contains connection information for the socket, such as format type, port, character encoding, whether or not to write a header, and the host name. Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named "NetWriterStream."

CREATE OR REPLACE SCHEMA "FileWriterSchema"

SET SCHEMA 'FileWriterSchema';

 

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("MESSAGE" VARCHAR(8192))

SERVER "AmqpServer"

OPTIONS

(

HOSTNAME 'ahost.domainname.com',

USERID 'auser',

PASSWORD 'demo2014',

DESTINATION 'DemoStream.sample',

FORMAT 'CSV',

CHARACTER_ENCODING 'UTF-8'

);

 

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

 

CREATE OR REPLACE PUMP "writerPump" STOPPED AS

--We recommend creating pumps as stopped

--then using ALTER PUMP "Pumps"."writerPump" START to start it

INSERT INTO "FileWriterSchema"."FileWriterStream"

SELECT STREAM * FROM "MyStream";

--where "MyStream" is a currently existing stream

 

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

 

Implementing the ECD Agent for AMQP

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

SCHEMA=AMQPWRITER

TABLE_NAME=AmqpStream

HOSTNAME=ahost.domainname.com

USERID=auser

PASSWORD=demo2014

DESTINATION=DemoStream.sample

CHARACTER_ENCODING=UTF-8

ROWTYPE=RecordType(VARCHAR(8192) COL1)

 

Special Columns Generated by for AMQP 1.0 (Input Only)

The Extensible Common Data Adapter generates special row columsn when parsing AMQP 1.0. You can declare this column to make it part of a foreign stream or table. Special columns must be declared in double quotes, as in the following code snippet:

CREATE OR REPLACE FOREIGN STREAM MyStream (

   "CREATION_TIME" TIMESTAMP,

   POSITION_ID VARCHAR(42),

   VEHICLE_ID VARCHAR(42));

 

Special Column

Type

Meaning

CREATION_TIME

TIMESTAMP

The time the containing message was created.

5_2_indicator5.2 Feature

Special Column

Type

Meaning

PARTITION

VARCHAR

The partition from which the containing message was generated.

Format Type Options

Other options are specific to format type.

hmtoggle_plus1Formatting Files as CSV

When writing CSV files, the Extensible Common Data Adapter converts rows into character-separated output based on options supplied through the options sections of the CREATE FOREIGN STREAM statement. It converts streaming tuples into a character-separated file.

Note: For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

To write a CSV file, you need to give the Extensible Common Data Adapter the following information:

A destination directory for the file.
A formatter of CSV
A character encoding type for the file.
How often the file will rotate in terms of bytes or time.

Example Code

To write to CSV files, you need to set up a server object which references one of the I/O systems.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

Finally, you create a foreign stream which references the server object. Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "WebData," and creates a foreign stream called "FileWriterStream." To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SCHEMA "WebData";

SET SCHEMA '"WebData"';

 

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("STR1" VARCHAR(32))

SERVER "FileWriterServer" OPTIONS

(directory 'path/to/myfile',

formatter 'CSV',

filename_date_format 'yyyy-MM-dd-HH:mm:ss',

filename_prefix 'test-',

filename_suffix '.csv',

character_encoding 'US-ASCII',

formatter_include_rowtime 'false',

max_bytes_per_file '1073741824')

 

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "FileWriterStream"

SELECT STREAM "MyStream";

--where "MyStream" is a currently existing stream

hmtoggle_plus1Formatting as XML

For XML files, the Extensible Common Data Adapter takes batches of rows and maps them to XML elements, depending on the options you specify.

If no value for DATA_ELEMENTS or <col_name>_ELEMENTS or <col_name>_ATTRIBUTES is specified, then the column name is used as an element name (not an <attribute> name).  So a column named foo would be in an XML element named /batch/row/foo if no values were specified.

Sample Foreign Stream Implementing ECD Adapter to Write XML Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("recNo" INTEGER,

"ts" TIMESTAMP NOT NULL,

"accountNumber" INTEGER,

"loginSuccessful" BOOLEAN,

"sourceIP" VARCHAR(32),

"destIP" VARCHAR(32),

"customerId" INTEGER,)

SERVER "FileWriterServer"

OPTIONS

(directory

'/path/to/myfile',

formatter 'XML',

filename_date_format 'yyyy-MM-dd-HH:mm:ss',

filename_prefix 'test-',

filename_suffix '.xml',

data_elements 'data',

character_encoding 'US-ASCII',

formatter_include_rowtime 'false',

max_bytes_per_file '1073741824');

 

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "FileWriterStream"

SELECT STREAM "MyStream";

--where "MyStream" is a currently existing stream

 

Sample Properties File to Use ECD Agent to Write XML Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

#Column types for the source stream

ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)

DIRECTORY=/PATH/TO/MYFILE

FORMATTER=XML

FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS

FILENAME_PREFIX=TEST-

FILENAME_SUFFIX=.XML

DATA_ELEMENTS=DATA

CHARACTER_ENCODING=US-ASCII

FORMATTER_INCLUDE_ROWTIME=FALSE

MAX_BYTES_PER_FILE=1073741824

hmtoggle_plus1Formatting as JSON

The ECDA adapter writes batches of data to JSON tuples. To configure how the adapter writes such tuples, you use foreign stream options. These options are listed below.

Here is an example of the SQL used to define a foreign stream for the JSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

CREATE OR REPLACE FOREIGN STREAM "JSON_OutputStream"

  ("id" DOUBLE,

  "reported_at" VARCHAR(4096),

  "shift_no" DOUBLE,

  "trip_no" DOUBLE,

  "route_variant_id" VARCHAR(4096),

  "waypoint_id" DOUBLE,

  "last_known_location_state" VARCHAR(4096)

   )

   SERVER "FileWriterServer"

    --note that this uses the server defined above

   OPTIONS

   (

    directory '/tmp/json_test/',

     --file directory where JSON file will be written. 

    formatter 'JSON',

    character_encoding 'UTF-8',

    filename_prefix 'test',

    filename_suffix '.json',

    formatter_include_rowtime 'false',

    MAX_TIME_DELTA_PER_FILE '60000'

    );

 

To actually write to a file in /tmp/json_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "JSON_OutputStream" (

  "id",

  "reported_at",

  "shift_no",

  "trip_no",

  "route_variant_id",

  "waypoint_id",

  "last_known_location_state"

)

   SELECT STREAM

  "id",

  "reported_at",

  "shift_no",

  "trip_no",

  "route_variant_id",

  "waypoint_id",

  "last_known_location_state"

   from "buses_stream";

    --this assumes that a stream called "buses_stream" exists in the same schema

 

Output

 

[{"id":"5.0115809712E10",

"reported_at":"2014-07-23 20:52:04.527000000",

"shift_no":"NULL",

"trip_no":"653.0",

"route_variant_id":"L38 7",

"waypoint_id":"NULL",

"last_known_location_state":"NULL"},

{"id":"5.0115854098E10",

"reported_at":"2014-07-23 20:52:05.443000000",

"shift_no":"NULL",

"trip_no":"NULL",

"route_variant_id":"310 7",

"waypoint_id":"NULL",

"last_known_location_state":"NULL"},

{"id":"3.46866848031E11",

"reported_at":"2014-07-23 20:52:07.713000000",

"shift_no":"1016.0",

"trip_no":"NULL",

"route_variant_id":"806 160",

"waypoint_id":"1.5588646E7",

"last_known_location_state":"NULL"}]

 

Sample Properties File to Use ECD Agent to Write JSON Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

#Column types for the source stream

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

DIRECTORY=/tmp/json_test/

FORMATTER=JSON

FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS

FILENAME_PREFIX=TEST

FILENAME_SUFFIX=.JSON

CHARACTER_ENCODING=US-ASCII

FORMATTER_INCLUDE_ROWTIME=FALSE

MAX_TIME_DELTA_PER_FILE=60000

 

See the topic Writing to Files with the ECD Adapter and Agent for a general overview of how to write to files with the ECD Adapter or Agent.