Writing to IBM MQ

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing to IBM MQ

Previous pageReturn to chapter overviewNext page

To write data to IBM MQ queue managers, use the Extensible Common Data Framework. IBM MQ is a publish-subscribe messaging framework that supports stream processing applications created in s-Server.

To write data, you first define a server object with connection information, including the directory and information on file rotation. Once you define this server object, you can write to the file system by referencing it. See the topic CREATE SERVER in the s-Server Streaming SQL Reference Guide for more details.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

int_MQseries_read_write

Installing com.ibm.mq.jar and com.ibm.mq.jmqi.jar

Before you can use the ECDA MQSeries adapter, you need to install the MQSeries client in your s-Server environment. These are installed with MQSeries. See WebSphere MQ classes for Java at the IBM support site for more details.

To install these classes into your s-Server environment, copy both into the $SQLSTREAM_HOME/lib directory.

Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/5.0.XXX/s-Server.

Implementation follows the general framework for writing to files with the ECD Adapter or Agent.

Implementing the Websphere MQSeries ECD Adapter

As with other ECDA adapters, implementing the MQSeries ECDA involves two steps.

1.A server defining section. In the defining section, you specify the classname com.sqlstream.aspen.namespace.common.MQSeriesColumnSet
2.A stream defining section. In this section, you initiate the data transfer using a CREATE FOREIGN STREAM command. Once you create this stream, you can SELECT from it or INSERT into it. When you SELECT, the Extensible Common Data Adapter reads data from MQSeries. When you INSERT, the Extensible Common Data Adapter writes data to MQSeries.

To define the server, you use code along the following lines:

CREATE OR REPLACE SERVER "IBM_MQ_SERVER" TYPE 'mqseries'

FOREIGN DATA WRAPPER ECDA;

 

Note: ECD Adapter server definitions need to reference the ECD foreign data wrapper. You can do so with the syntax FOREIGN DATA WRAPPER ECDA.

Next, you create a foreign stream object. This object contains connection information on the MQSeries queue manager. Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named "MQSeriesSourceStream." To read from the MQSeries topic, you SELECT from this stream.

CREATE OR REPLACE SCHEMA "IBM_MQ_WriterSchema"

SET SCHEMA 'IBM_MQ_Writer_Schema';

 

CREATE OR REPLACE FOREIGN STREAM "IBM_MQ_Stream"

(

SERVER IBM_MQ_SERVER

OPTIONS

(queue_name 'SYSTEM.DEFAULT.LOCAL.QUEUE',

userid 'MQSERIES_USER',

hostname '127.0.0.1',

queue_manager_name 'TEST2',

CHANNEL_NAME 'SYSTEM.DEF.SVRCONN');

 

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

 

CREATE OR REPLACE PUMP "writerPump" STOPPED AS

--We recommend creating pumps as stopped

--then using ALTER PUMP "Pumps"."writerPump" START to start it

INSERT INTO "FileWriterSchema"."FileWriterStream"

SELECT STREAM * FROM "MyStream";

--where "MyStream" is a currently existing stream

 

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

 

Implementing the ECD Agent for MQSeries

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

Note: Before using the ECD agent, you need to create a source stream for it. In the below example, you would need to create the foreign stream "IBM_MQ_WRITERSTREAM." (Agent properties are not case sensitive.)

SCHEMA=MQSERIESSOURCE

TABLE_NAME=IBM_MQ_WRITERSTREAM

QUEUE_NAME=SYSTEM.DEFAULT.LOCAL.QUEUE

USERID=MQSERIES_USER

HOSTNAME=127.0.0.1

QUEUE_MANAGER_NAME=TEST2

CHANNEL_NAME=SYSTEM.DEF.SVRCONN

 

Special Columns Generated by ECDA for MQSeries (Input Only)

The Extensible Common Data Adapter generates several special columns when parsing data from MQSeries. You can declare these columns to make them part of a foreign stream or table.

Special Column

Type

Meaning

APPLICATION_ID_DATA

VARCHAR(256)

The application ID of the application that delivered this row in a message.

PUT_APPLICATION_TYPE

INTEGER

The value set in the application type field of the incoming message containing this row.

PUT_APPLICATION_NAME

VARCHAR(256)

The value set in the application name field of the incoming message containing this row

APPLICATION_ORIGIN_DATA

VARCHAR(256)

The locater of the application where this row's data originated.

PUT_DATA_TIME

TIMESTAMP

The data time on the message containing this row.

USER_ID

VARCHAR(256)

The user id of the sender of the message containing this row.

GROUP_ID

VARCHAR(256)

The group id to which the message containing this row was sent.

MESSAGE_SEQUENCE_NUMBER

INTEGER

The MQSeries sequence number of the message containing this tuple.

The ECDA Websphere MQSeries adapter option lets you write messages from s-Server to MQSeries queue managers. To do so, you first create a foreign stream with a predefined MQSeries server object as the SERVER option.

Sample Code

Like all streams (but unlike server objects or data wrappers), all streams must be defined within a schema. The following code first creates a schema called "MQSeriesSource," then creates a foreign stream called "MQSeriesSourceStream" with the predefined server "MQSeriesServer" as a server option. To read data from this stream, you will need to SELECT against it. This step simply sets up the stream, with named columns and MQSeries-specific options. (These options are discussed below.)

CREATE OR REPLACE SCHEMA "MQSeriesOutput";

SET SCHEMA '"MQSeriesSource"';

 

CREATE OR REPLACE FOREIGN STREAM "MQSeriesSourceStream"

SERVER MQSERIESSERVER

OPTIONS

(queue_name 'SYSTEM.DEFAULT.LOCAL.QUEUE',

userid 'Jack',

hostname '127.0.0.1',

queue_manager_name 'TEST2',

CHANNEL_NAME 'SYSTEM.DEF.SVRCONN');

 

Output

As with all ECDA adapters, data is only moved from s-Server to MQSeries once you execute an INSERT statement as part of a PUMP on the foreign stream that you have defined for the adapter. For example, the following code block inserts the values of the columns partitionNo, zipcode, transactionTotal, transactionCount into Kafka using the foreign stream defined above, KafkaOutputStream.

CREATE OR REPLACE PUMP MQSERIES_PUMP STOPPED AS

INSERT INTO MQSeriesOutput(LINE)

SELECT LINE FROM TABLE(METEREDSEMIRANDOMSTRINGS(100000000, 2048, 100, 1));