Writing to AMQP

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing to AMQP

Previous pageReturn to chapter overviewNext page

To write data to an AMQP message bus, use the Extensible Common Data Framework. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see see https://cwiki.apache.org/confluence/display/CAMEL/AMQP.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options.See the topic CREATE SERVER in the s-Server Streaming SQL Reference Guide for more details. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Configuring the ECD Adapter for AMQP

In these streams each row represents a single message, and has two columns: the message headers and the message body. The message body is one varbinary column. The message headers are concatenated to form one varchar column, with the format TAG=VALUE TAG=VALUE ....

You configure the Extensible Common Data Adapter (ECDA)  to read or write over AMQP using server options and foreign table/stream options. You configure the Extensible Common Data Agent using the same options in a property file. See the topic Extensible Common Data Agent Overview for more details.

Note: ECD Adapter server definitions need to reference the ECD foreign data wrapper. You can do so with the syntax FOREIGN DATA WRAPPER ECDA.

You reference the server type 'amqp_legacy' (AMQP 0.9 or earlier) or 'amqp10' (AMQP 1.0 or later) as in the following code. See the topic CREATE SERVER in the Streaming SQL Reference Guide for more details.

For AMQP 0.9

CREATE OR REPLACE SERVER "AMQPSERVER" TYPE 'amqp_legacy'

FOREIGN DATA WRAPPER ECDA;

 

For AMQP 1.0

CREATE OR REPLACE SERVER "AMQPSERVER" TYPE 'amqp10'

FOREIGN DATA WRAPPER ECDA;

 

Next, you create a foreign stream object. This object contains connection information for the AMQP server, such as format type, destination topic, connection url, and formatter. Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named "amqp_stream." Note that the options are slightly different for AMQP 0.9 vs. AMQP 1.0; you need to configure the CONNECTION_URL option differently for these. See CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide.

For AMQP 0.9

CREATE OR REPLACE SCHEMA "Sample"

SET SCHEMA 'SAMPLE';

 

CREATE OR REPLACE FOREIGN STREAM amqp_stream (

line VARCHAR(4096))

SERVER AMQPSERVER

OPTIONS (DESTINATION 'amq.topic',

CONNECTION_URL 'amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672''',

FORMATTER 'CSV');

 

For AMQP 1.0

CREATE OR REPLACE SCHEMA "AmqpWriterSchema"

SET SCHEMA 'AmqpWriterSchema';

 

CREATE OR REPLACE FOREIGN STREAM "AmqpWriterStream" (

line VARCHAR(4096))

SERVER AMQPSERVER

OPTIONS (DESTINATION 'amq.topic',

CONNECTION_URL 'amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default',

FORMATTER 'CSV');

 

Again, to get data moving, you need to create and start a pump. You do so with code along the following lines:

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

 

CREATE OR REPLACE PUMP "writerPump" STOPPED AS

--We recommend creating pumps as stopped

--then using ALTER PUMP "Pumps"."writerPump" START to start it

INSERT INTO "AmqpWriterSchema"."AmqpWriterStream"

SELECT STREAM * FROM "MyStream";

--where "MyStream" is a currently existing stream

 

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

 

Implementing the ECD Agent for AMQP

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above. The following options are for AMQP 1.0.

SCHEMA=AMQPWRITER

TABLE_NAME=AmqpStream

DESTINATION=amq.topic

CONNECTION_URL=amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

FORMATTER=CSV