Reading from AMQP

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Reading Data into s-Server > Reading from Other Sources  >

Reading from AMQP

Previous pageReturn to chapter overviewNext page

The AMQP adapter lets you define foreign streams with an AMQP message bus. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see see https://cwiki.apache.org/confluence/display/CAMEL/AMQP.

All adapter or agent implementations involve configuring options. For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

In these streams each row represents a single message, and has two columns: the message headers and the message body. The message body is one varbinary column. The message headers are concatenated to form one varchar column, with the format TAG=VALUE TAG=VALUE ....

You configure the Extensible Common Data Agent using the same options in a property file. See the topic Extensible Common Data Agent Overview for more details.

Note: ECD Adapter server definitions need to reference the ECD foreign data wrapper. You can do so with the syntax FOREIGN DATA WRAPPER ECDA.

You reference the library com.sqlstream.aspen.namespace.common.AmqpColumnSet (AMQP .9) or com.sqlstream.aspen.namespace.common.Amqp10ColumnSet (AMQP 1.0) as in the following code:

For AMQP 0.9

CREATE OR REPLACE SERVER "AMQPSERVER" TYPE 'amqp_legacy'

FOREIGN DATA WRAPPER ECDA;

 

For AMQP 1.0

CREATE OR REPLACE SERVER "AMQPSERVER" TYPE 'amqp10'

FOREIGN DATA WRAPPER ECDA;

 

Next, you create a foreign stream object. This object contains connection information for the AMQP server, such as format type, destination topic, connection url, and formatter. Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named "amqp_stream."  Note that the options are slightly different for AMQP 0.9 vs. AMQP 1.0; you need to configure the CONNECTION_URL option differently for these. See CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide.

For AMQP 0.9

CREATE OR REPLACE SCHEMA "Sample"

SET SCHEMA 'SAMPLE';

 

CREATE OR REPLACE FOREIGN STREAM amqp_stream (

line VARCHAR(4096))

SERVER AMQPSERVER

OPTIONS (DESTINATION 'amq.topic',

CONNECTION_URL 'amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672'',

PARSER 'CSV');

 

For AMQP 1.0

CREATE OR REPLACE SCHEMA "Sample"

SET SCHEMA 'SAMPLE';

 

CREATE OR REPLACE FOREIGN STREAM amqp_stream (

line VARCHAR(4096))

SERVER AMQPSERVER

OPTIONS (DESTINATION 'amq.topic',

CONNECTION_URL 'amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default',

PARSER 'CSV');

 

Implementing the ECD Agent for AMQP

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above. The following options are for AMQP 1.0.

SCHEMA=AMQPWRITER

TABLE_NAME=AmqpStream

DESTINATION=amq.topic

CONNECTION_URL=amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

PARSER=CSV

 

 

Special Columns Generated by for AMQP 1.0 (Input Only)

The Extensible Common Data Adapter generates one special row column when parsing AMQP 1.0. You can declare this column to make it part of a foreign stream or table.

Special Column

Type

Meaning

CREATION_TIME

TIMESTAMP

The time the containing message was created.