Writing to Sockets

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing to Sockets

Previous pageReturn to chapter overviewNext page

Show/Hide Hidden Text

To write to files to a socket, you use the Extensible Common Data Adapter (ECDA) or ECD Agent.

To write data, you first define a server object with connection information, including the directory and information on file rotation. Once you define this server object, you can write to the file system by referencing it. See the topic CREATE SERVER in the s-Server Streaming SQL Reference Guide for more details.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Implementing the ECD Adapter and Agent for Sockets

You configure the ECD Adapter and Agent to read or write over sockets using server options and foreign table/stream options.

CREATE OR REPLACE SERVER "NetWriterServer" TYPE 'NET'

FOREIGN DATA WRAPPER ECDA;

 

Note: ECD Adapter server definitions need to reference the ECD foreign data wrapper. You can do so with the syntax FOREIGN DATA WRAPPER ECDA.

Next, you create a foreign stream object. This object contains connection information for the socket, such as format type, port, character encoding, whether or not to write a header, and the host name. Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named "NetWriterStream."

CREATE OR REPLACE SCHEMA "FileWriterSchema"

SET SCHEMA 'FileWriterSchema';

 

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("recNo" INTEGER,

"ts" TIMESTAMP,

"accountNumber" INTEGER,

"loginSuccessful" BOOLEAN,

"sourceIP" VARCHAR(32),

"destIP" VARCHAR(32),

"customerId" INTEGER)

--Columns for the new stream

SERVER "NetWriterServer"

OPTIONS

(formatter 'CSV',

server_port '5601',

character_encoding 'UTF-8',

write_header 'FALSE', );

 

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

 

CREATE OR REPLACE PUMP "writerPump" STOPPED AS

--We recommend creating pumps as stopped

--then using ALTER PUMP "Pumps"."writerPump" START to start it

INSERT INTO "FileWriterSchema"."FileWriterStream"

SELECT STREAM * FROM "MyStream";

--where "MyStream" is a currently existing stream

 

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

 

Implementing the ECD Agent for Sockets

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Port, format, host

FORMATTER=CSV

SERVER_PORT=5601

CHARACTER_ENCODING=UTF-8

SKIP_HEADER=FALSE

# Schema, name, and parameter signature of origin stream

SCHEMA_NAME='SOCKETWRITER'

TABLE_NAME=FileReaderStream

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

 

Additional Columns Generated for Sockets

The following columns may be auto generated by the file adapter if you are doing a select, that is, reading from the file.

Column

Meaning

SOURCE_HOST

Input only. Remote host from which this row was read.

SOURCE_PORT

Input only. Remote port from which this row was read.

Format Type Options

Other options are specific to format type.

hmtoggle_plus1Formatting Files as CSV

When writing CSV files, the Extensible Common Data Adapter converts rows into character-separated output based on options supplied through the options sections of the CREATE FOREIGN STREAM statement. It converts streaming tuples into a character-separated file.

Note: For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

To write a CSV file, you need to give the Extensible Common Data Adapter the following information:

A destination directory for the file.
A formatter of CSV
A character encoding type for the file.
How often the file will rotate in terms of bytes or time.

Example Code

To write to CSV files, you need to set up a server object which references one of the I/O systems.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

Finally, you create a foreign stream which references the server object. Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "WebData," and creates a foreign stream called "FileWriterStream." To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SCHEMA "WebData";

SET SCHEMA '"WebData"';

 

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("STR1" VARCHAR(32))

SERVER "FileWriterServer" OPTIONS

(directory 'path/to/myfile',

formatter 'CSV',

filename_date_format 'yyyy-MM-dd-HH:mm:ss',

filename_prefix 'test-',

filename_suffix '.csv',

character_encoding 'US-ASCII',

formatter_include_rowtime 'false',

max_bytes_per_file '1073741824')

 

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "FileWriterStream"

SELECT STREAM "MyStream";

--where "MyStream" is a currently existing stream

hmtoggle_plus1Formatting as XML

For XML files, the Extensible Common Data Adapter takes batches of rows and maps them to XML elements, depending on the options you specify.

If no value for DATA_ELEMENTS or <col_name>_ELEMENTS or <col_name>_ATTRIBUTES is specified, then the column name is used as an element name (not an <attribute> name).  So a column named foo would be in an XML element named /batch/row/foo if no values were specified.

Sample Foreign Stream Implementing ECD Adapter to Write XML Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("recNo" INTEGER,

"ts" TIMESTAMP NOT NULL,

"accountNumber" INTEGER,

"loginSuccessful" BOOLEAN,

"sourceIP" VARCHAR(32),

"destIP" VARCHAR(32),

"customerId" INTEGER,)

SERVER "FileWriterServer"

OPTIONS

(directory

'/path/to/myfile',

formatter 'XML',

filename_date_format 'yyyy-MM-dd-HH:mm:ss',

filename_prefix 'test-',

filename_suffix '.xml',

data_elements 'data',

character_encoding 'US-ASCII',

formatter_include_rowtime 'false',

max_bytes_per_file '1073741824');

 

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "FileWriterStream"

SELECT STREAM "MyStream";

--where "MyStream" is a currently existing stream

 

Sample Properties File to Use ECD Agent to Write XML Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

#Column types for the source stream

ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)

DIRECTORY=/PATH/TO/MYFILE

FORMATTER=XML

FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS

FILENAME_PREFIX=TEST-

FILENAME_SUFFIX=.XML

DATA_ELEMENTS=DATA

CHARACTER_ENCODING=US-ASCII

FORMATTER_INCLUDE_ROWTIME=FALSE

MAX_BYTES_PER_FILE=1073741824

hmtoggle_plus1Formatting as JSON

The ECDA adapter writes batches of data to JSON tuples. To configure how the adapter writes such tuples, you use foreign stream options. These options are listed below.

Here is an example of the SQL used to define a foreign stream for the JSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

CREATE OR REPLACE FOREIGN STREAM "JSON_OutputStream"

  ("id" DOUBLE,

  "reported_at" VARCHAR(4096),

  "shift_no" DOUBLE,

  "trip_no" DOUBLE,

  "route_variant_id" VARCHAR(4096),

  "waypoint_id" DOUBLE,

  "last_known_location_state" VARCHAR(4096)

   )

   SERVER "FileWriterServer"

    --note that this uses the server defined above

   OPTIONS

   (

    directory '/tmp/json_test/',

     --file directory where JSON file will be written. 

    formatter 'JSON',

    character_encoding 'UTF-8',

    filename_prefix 'test',

    filename_suffix '.json',

    formatter_include_rowtime 'false',

    MAX_TIME_DELTA_PER_FILE '60000'

    );

 

To actually write to a file in /tmp/json_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "JSON_OutputStream" (

  "id",

  "reported_at",

  "shift_no",

  "trip_no",

  "route_variant_id",

  "waypoint_id",

  "last_known_location_state"

)

   SELECT STREAM

  "id",

  "reported_at",

  "shift_no",

  "trip_no",

  "route_variant_id",

  "waypoint_id",

  "last_known_location_state"

   from "buses_stream";

    --this assumes that a stream called "buses_stream" exists in the same schema

 

Output

 

[{"id":"5.0115809712E10",

"reported_at":"2014-07-23 20:52:04.527000000",

"shift_no":"NULL",

"trip_no":"653.0",

"route_variant_id":"L38 7",

"waypoint_id":"NULL",

"last_known_location_state":"NULL"},

{"id":"5.0115854098E10",

"reported_at":"2014-07-23 20:52:05.443000000",

"shift_no":"NULL",

"trip_no":"NULL",

"route_variant_id":"310 7",

"waypoint_id":"NULL",

"last_known_location_state":"NULL"},

{"id":"3.46866848031E11",

"reported_at":"2014-07-23 20:52:07.713000000",

"shift_no":"1016.0",

"trip_no":"NULL",

"route_variant_id":"806 160",

"waypoint_id":"1.5588646E7",

"last_known_location_state":"NULL"}]

 

Sample Properties File to Use ECD Agent to Write JSON Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

#Column types for the source stream

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

DIRECTORY=/tmp/json_test/

FORMATTER=JSON

FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS

FILENAME_PREFIX=TEST

FILENAME_SUFFIX=.JSON

CHARACTER_ENCODING=US-ASCII

FORMATTER_INCLUDE_ROWTIME=FALSE

MAX_TIME_DELTA_PER_FILE=60000

 

See the topic Writing to Files with the ECD Adapter and Agent for a general overview of how to write to files with the ECD Adapter or Agent.