Writing to the File System

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing to the File System

Previous pageReturn to chapter overviewNext page

Show/Hide Hidden Text

To write to files over the file system, you use the Extensible Common Data Adapter (ECDA) or ECD Agent.

To write data, you first define a server object with connection information, including the directory and information on file rotation. Once you define this server object, you can write to the file system by referencing it. See the topic CREATE SERVER in the s-Server Streaming SQL Reference Guide for more details.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Note on Writing Output Files

If ORIGINAL_FILENAME is set then, then this is the name of the file being actively written to. When this file is rotated out (based on either MAX_BYTES_PER_FILE or MAX_TIME_DELTA_PER_FILE), it is given a filename of the form <prefix><date><suffix> using the last rowtime in the file.

If ORIGINAL_FILENAME isn't set, then the the file being actively written to is given the form <prefix><date><suffix> but using the first rowtime in the file set.

For example, the following options:

filename_prefix 'test-',

filename_date_format 'yyyy-MM-dd_HH:mm:ss',

filename_suffix '.csv',

 

produce rotating file names like the following:

test-2020-05-23_19:44:00.csv

 

Writing Over the File System

To write over the file system, you need to create a server object which references the data wrapper with type 'FILE'.

CREATE OR REPLACE SERVER "FileReaderServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

Note: ECD Adapter server definitions need to reference the ECD foreign data wrapper. You do so with the syntax FOREIGN DATA WRAPPER ECDA.

You then just need to designate a directory for the file in stream options:

CREATE OR REPLACE SCHEMA "FileWriterSchema"

SET SCHEMA 'FileWriterSchema';

 

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("recNo" INTEGER,

"ts" TIMESTAMP,

"accountNumber" INTEGER,

"loginSuccessful" BOOLEAN,

"sourceIP" VARCHAR(32),

"destIP" VARCHAR(32),

"customerId" INTEGER)

--Columns for the new stream

SERVER "FileWriterServer"

OPTIONS

(directory 'myDirectory',

--directory for the file

formatter 'CSV',

filename_pattern 'myRecord.csv',

--regex for filename pattern to look for

character_encoding 'UTF-8',

skip_header 'true');

 

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

 

CREATE OR REPLACE PUMP "writerPump" STOPPED AS

--We recommend creating pumps as stopped

--then using ALTER PUMP "Pumps"."writerPump" START to start it

INSERT INTO "FileWriterSchema"."FileWriterStream"

SELECT STREAM * FROM "MyStream";

--where "MyStream" is a currently existing stream

 

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

 

Using the ECD Agent to Write Over the File System

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Location, date format, prefix, suffix

DIRECTORY=/var/tmp

FORMATTER=CSV

FILENAME_PATTERN=myRecord.csv

CHARACTER_ENCODING=UTF-8

FORMATTER_INCLUDE_ROWTIME=FALSE

# Schema, name, and parameter signature of origin stream

SCHEMA_NAME=MY_SCHEMA

TABLE_NAME=FILEREADER_STREAM

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

 

A properties file configured to write files might look like the following:

# Location, date format, prefix, suffix

DIRECTORY=/var/tmp

FORMATTER=CSV

FILENAME_DATE_FORMAT=YYYY_MM_DD_HH_MM_SS

FILENAME_PREFIX=MYPREFIX-

FILENAME_SUFFIX=.TXT

CHARACTER_ENCODING=UTF-8

FORMATTER_INCLUDE_ROWTIME=FALSE

  # Rotation settings = 1 hour

MAX_TIME_DELTA_PER_FILE=3600000

# Schema, name, and parameter signature of origin stream

SCHEMA_NAME=SALES

TABLE_NAME=BIDS

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

 

Format Type Options

Other options are specific to format type.

hmtoggle_plus1Formatting Files as CSV

When writing CSV files, the Extensible Common Data Adapter converts rows into character-separated output based on options supplied through the options sections of the CREATE FOREIGN STREAM statement. It converts streaming tuples into a character-separated file.

Note: For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

To write a CSV file, you need to give the Extensible Common Data Adapter the following information:

A destination directory for the file.
A formatter of CSV
A character encoding type for the file.
How often the file will rotate in terms of bytes or time.

Example Code

To write to CSV files, you need to set up a server object which references one of the I/O systems.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

Finally, you create a foreign stream which references the server object. Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "WebData," and creates a foreign stream called "FileWriterStream." To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SCHEMA "WebData";

SET SCHEMA '"WebData"';

 

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("STR1" VARCHAR(32))

SERVER "FileWriterServer" OPTIONS

(directory 'path/to/myfile',

formatter 'CSV',

filename_date_format 'yyyy-MM-dd-HH:mm:ss',

filename_prefix 'test-',

filename_suffix '.csv',

character_encoding 'US-ASCII',

formatter_include_rowtime 'false',

max_bytes_per_file '1073741824')

 

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "FileWriterStream"

SELECT STREAM "MyStream";

--where "MyStream" is a currently existing stream

hmtoggle_plus1Formatting as XML

For XML files, the Extensible Common Data Adapter takes batches of rows and maps them to XML elements, depending on the options you specify.

If no value for DATA_ELEMENTS or <col_name>_ELEMENTS or <col_name>_ATTRIBUTES is specified, then the column name is used as an element name (not an <attribute> name).  So a column named foo would be in an XML element named /batch/row/foo if no values were specified.

Sample Foreign Stream Implementing ECD Adapter to Write XML Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"

("recNo" INTEGER,

"ts" TIMESTAMP NOT NULL,

"accountNumber" INTEGER,

"loginSuccessful" BOOLEAN,

"sourceIP" VARCHAR(32),

"destIP" VARCHAR(32),

"customerId" INTEGER,)

SERVER "FileWriterServer"

OPTIONS

(directory

'/path/to/myfile',

formatter 'XML',

filename_date_format 'yyyy-MM-dd-HH:mm:ss',

filename_prefix 'test-',

filename_suffix '.xml',

data_elements 'data',

character_encoding 'US-ASCII',

formatter_include_rowtime 'false',

max_bytes_per_file '1073741824');

 

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "FileWriterStream"

SELECT STREAM "MyStream";

--where "MyStream" is a currently existing stream

 

Sample Properties File to Use ECD Agent to Write XML Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

#Column types for the source stream

ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)

DIRECTORY=/PATH/TO/MYFILE

FORMATTER=XML

FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS

FILENAME_PREFIX=TEST-

FILENAME_SUFFIX=.XML

DATA_ELEMENTS=DATA

CHARACTER_ENCODING=US-ASCII

FORMATTER_INCLUDE_ROWTIME=FALSE

MAX_BYTES_PER_FILE=1073741824

hmtoggle_plus1Formatting as JSON

The ECDA adapter writes batches of data to JSON tuples. To configure how the adapter writes such tuples, you use foreign stream options. These options are listed below.

Here is an example of the SQL used to define a foreign stream for the JSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'

FOREIGN DATA WRAPPER ECDA;

 

CREATE OR REPLACE FOREIGN STREAM "JSON_OutputStream"

  ("id" DOUBLE,

  "reported_at" VARCHAR(4096),

  "shift_no" DOUBLE,

  "trip_no" DOUBLE,

  "route_variant_id" VARCHAR(4096),

  "waypoint_id" DOUBLE,

  "last_known_location_state" VARCHAR(4096)

   )

   SERVER "FileWriterServer"

    --note that this uses the server defined above

   OPTIONS

   (

    directory '/tmp/json_test/',

     --file directory where JSON file will be written. 

    formatter 'JSON',

    character_encoding 'UTF-8',

    filename_prefix 'test',

    filename_suffix '.json',

    formatter_include_rowtime 'false',

    MAX_TIME_DELTA_PER_FILE '60000'

    );

 

To actually write to a file in /tmp/json_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS

INSERT INTO "JSON_OutputStream" (

  "id",

  "reported_at",

  "shift_no",

  "trip_no",

  "route_variant_id",

  "waypoint_id",

  "last_known_location_state"

)

   SELECT STREAM

  "id",

  "reported_at",

  "shift_no",

  "trip_no",

  "route_variant_id",

  "waypoint_id",

  "last_known_location_state"

   from "buses_stream";

    --this assumes that a stream called "buses_stream" exists in the same schema

 

Output

 

[{"id":"5.0115809712E10",

"reported_at":"2014-07-23 20:52:04.527000000",

"shift_no":"NULL",

"trip_no":"653.0",

"route_variant_id":"L38 7",

"waypoint_id":"NULL",

"last_known_location_state":"NULL"},

{"id":"5.0115854098E10",

"reported_at":"2014-07-23 20:52:05.443000000",

"shift_no":"NULL",

"trip_no":"NULL",

"route_variant_id":"310 7",

"waypoint_id":"NULL",

"last_known_location_state":"NULL"},

{"id":"3.46866848031E11",

"reported_at":"2014-07-23 20:52:07.713000000",

"shift_no":"1016.0",

"trip_no":"NULL",

"route_variant_id":"806 160",

"waypoint_id":"1.5588646E7",

"last_known_location_state":"NULL"}]

 

Sample Properties File to Use ECD Agent to Write JSON Files

The following code uses the file system for output. To write data over other input/output systems, such as IBM MQ or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

#Column types for the source stream

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

DIRECTORY=/tmp/json_test/

FORMATTER=JSON

FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS

FILENAME_PREFIX=TEST

FILENAME_SUFFIX=.JSON

CHARACTER_ENCODING=US-ASCII

FORMATTER_INCLUDE_ROWTIME=FALSE

MAX_TIME_DELTA_PER_FILE=60000

 

See the topic Writing to Files with the ECD Adapter and Agent for a general overview of how to write to files with the ECD Adapter or Agent.