AMQP Adapter

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Appendix A: Legacy Adapters and Agents >

AMQP Adapter

Previous pageReturn to chapter overviewNext page

Note: This topic describes a legacy adapter. See the topics Reading from Other Sources and Writing to Other Destinations for current solutions.

The AMQP adapter lets you define foreign streams with an AMQP message bus. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see see http://www.amqp.org/confluence/display/AMQP/About+AMQP.

In these streams each row represents a single message, and has two columns: the message headers and the message body. The message body is one varbinary column. The message headers are concatenated to form one varchar column, with the format TAG=VALUE TAG=VALUE ....

Note: The Adapter does not provide support for constructing or deconstructing the body of an AMQP message.

Streams defined with the AMQP adapter are either write only (for publishing messages) or read only (for fetching messages or subscribing to a topic). Each stream maps to a specified AMQP exchange, and can be mapped to a specific queue or to a specific topic.

Some AMQP options are supported as stream and/or server OPTIONS. When the same option applies to both, it means the server value is the default for new streams on that server.

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0  only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of  exchanges, queues and bindings to exchange messages. As a result of this change, you configure the AMQP adapter much differently for 1.0 than for up to 0.9.1; in 1.0, all connection information goes through the ADDRESS option of the foreign stream for the adapter.

SQL Declarations

Using the AMQP Adapter to receive or publish messages requires loading it using CREATE FOREIGN DATA WRAPPER, defining a server object, and creating a foreign stream using the adapter.

The options you specify through the server object will differ depending on whether you are using

The following SQL declarations load the AMQP Adapter, declare the AMQP data server, and define the foreign stream for the messages. SQL is designed to be used as a continuous block.

Registering the Plugin (Foreign Data Wrapper)

Use CREATE OR REPLACE FOREIGN DATA WRAPPER to install the AMQP adapter. The following code first creates a schema for the AMQP example and then creates a foreign data wrapper for the AMQP adapter.

CREATE OR REPLACE SCHEMA "AMQPTest";

DROP SCHEMA "AMQPTest" CASCADE;

CREATE SCHEMA "AMQPTest" DESCRIPTION 'test web-based data feeds';

SET SCHEMA '"AMQPTest"';

SET PATH '"AMQPTest"';

 

CREATE OR REPLACE FOREIGN DATA WRAPPER "AmqpWrapper"

   LIBRARY 'class com.sqlstream.plugin.amqp.AmqpAdapter'

   LANGUAGE JAVA;

Creating the Server

As with other adapters, the server object for a AMQP adapter contains server connection information.

For AMQP, this information includes the name of the host, port, and username/password for the server on which the AMQP message broker replies, as well as information on how frequently to connect with the server.

For AMQP up to 0.9.1

CREATE OR REPLACE SERVER "AMQPServer"

   FOREIGN DATA WRAPPER "AmqpWrapper"

   OPTIONS (

       HOST 'franklin', VHOST '/',

   PROTOCOL "0.9"

       USERNAME 'guest', PASSWORD 'guest'

   );

 

For AMQP 1.0

CREATE OR REPLACE SERVER "TestServer"

   FOREIGN DATA WRAPPER "AmqpWrapper"

   OPTIONS (

       PROTOCOL '1.0',

        );

Server-level options

Option

Description

Req'd

Default

HOST

Host on which the message broker resides. Not valid with version 1.0.

For version 1.0, all connection information goes through the ADDRESS option of the stream. You configure this option based on the normal configuration options for your version of AMQP.

Y for 0.9

 

PORT

Port at which the message broker accepts connections. Not valid with version 1.0.

Y

5672

PROTOCOL

Identifies the version of the foreign amqp server. Values "0.9" or "1.0'"

Y for 0.9

 

VHOST

Virtual host within this message broker. Not valid with version 1.0.

 

 

USERNAME

Host credentials. Not valid with version 1.0.

Y for 0.9

 

PASSWORD

Host credentials. Not valid with version 1.0.

Y for 0.9

 

CONN_RETRY

Millis between connection attempts. Not valid with version 1.0.

 

30000

Creating the Foreign Stream

The foreign stream is the SQL object where data from the AMQP adapter will be accessed. Once you create a foreign stream for the AMQP adapter, you can query it as you would any other SQL object.

For AMQP up to 0.9.1

This code creates a foreign stream for a message exchange named magnet, which is a topic exchange, and requests all messages with the topic (alias routing-key) data. All messages with a matching topic will be delivered to as rows to the streaming query SELECT STREAM * FROM "RawSignalMessages". The stream option TYPE is 'READER' for a message consumer or 'WRITER' for a publisher.

CREATE FOREIGN STREAM "RawSignalMessages" (

      header varchar(256),

      body   varbinary(2048))

   SERVER "AMQPServer"

   OPTIONS (

     TYPE 'READER',

     EXCHANGE 'magnet', EXCHANGE_TYPE 'topic', TOPIC 'data');

 

For a writer, the topic value is a fixed string: all messages published have this same topic. For a reader, the topic value is a match-pattern, and can be a fixed string or a regular expression.

For AMQP 1.0

For 1.0, all connection information goes through the server. Whatever address formatting you use to connect with your message server, the adapter will through.

CREATE FOREIGN STREAM "RawSignalMessages" (

      header varchar(256),

      body   varchar(2048))

   SERVER "TestServer"

   OPTIONS (

       TYPE 'READER',

       ADDRESS 'topic://localhost/amq.topic/test'

   );

 

Stream-level options

Option

Description

Req'd

Default

TYPE

Foreign stream type, either 'READER' or 'WRITER'. Valid for all versions.

 

'READER'

ADDRESS

String value which identifies a target object to read from or to write to. Valid for version 1.0 only.

Y for 1.0

 

EXCHANGE

Named exchange on broker. This is mandatory for version 0.9.

Y

 

QUEUE

Named queue on exchange. Default null means create an anonymous queue. Applies only to the reader. Not valid for 1.0.

 

Null

TOPIC

Named topic on exchange. Not valid for 1.0.

 

Any topic (reading); null topic (writing).

EXCHANGE_TYPE

Not valid with version 1.0. fanout: A fanout exchange is the simplest exchange type, representing a 1:N message delivery pattern. No routing keys are involved - you simply bind a queue to the exchange and messages sent to that exchange get delivered to all the bound queues.

..... See http://www.rabbitmq.com/faq.html#fanout-exchange.

 

topic: In a topic exchange, the broker matches the routing key against a pattern to determine how to deliver the message.

..... See "What is a topic exchange?" at http://www.rabbitmq.com/faq.html?#topic-exchange.

 

direct: A direct exchange is a 1:1 form of communication where a routing key directs how a broker routes the message from the producer to the consumer.

..... See "What is a direct exchange?" at http://www.rabbitmq.com/faq.html?#direct-exchange.

 

fanout

EXCHANGE_DURABLE

true or false

Not valid with version 1.0.

 

false

EXCHANGE_AUTO_DELETE

true or false

Not valid with version 1.0.

 

false

EXCHANGE_PASSIVE

true means exchange must already exist

Not valid with version 1.0.

 

false

QUEUE_PASSIVE

true means named queue must already exist

Not valid with version 1.0.

 

false

QUEUE_DURABLE

true or false

Not valid with version 1.0.

 

false

QUEUE_EXCLUSIVE

true or false

Not valid with version 1.0.

 

true

QUEUE_AUTO_DELETE

true or false

Not valid with version 1.0.

 

true

Troubleshooting

   public static final String TRACER_NAME = "com.sqlstream.plugin.amqp"

For general messaging/AMQP information, see the Rabbitmq website for more complete information.