Writing to WebSockets

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > Writing Data Out of s-Server > Writing to Other Destinations >

Writing to WebSockets

Previous pageReturn to chapter overviewNext page

To write data over Web Sockets, use the Extensible Common Data Adapter (ECDA) or ECD Agent. This feature is new in s-Server 5.2.

5_2_indicator Version 5.2 Feature

To write data, you first define a server object with connection information, including the directory and information on file rotation. Once you define this server object, you can write to the file system by referencing it. See the topic CREATE SERVER in the s-Server Streaming SQL Reference Guide for more details.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Implementing the ECD Adapter and Agent for Web Sockets

To write data to a web socket, you need to create a server object which references the data wrapper ECDA and is of type 'websocket'.

CREATE OR REPLACE SERVER "WebsocketWriterServer" TYPE 'websocket'

FOREIGN DATA WRAPPER ECDA;

 

Note: ECD Adapter server definitions need to reference the ECD foreign data wrapper. You can do so with the syntax FOREIGN DATA WRAPPER ECDA.

Unlike server objects, all foreign streams need to be created in a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named "WebSocketStream."

CREATE OR REPLACE SCHEMA "WebSocketSchema";

SET SCHEMA '"WebSocketSchema"';

 

CREATE FOREIGN STREAM "WebSocketStream"

("id" VARCHAR(2040),

"reported_at" VARCHAR(2040),

"shift_no" VARCHAR(2040),

"trip_no" VARCHAR(2040)'

"route_variant_id" DOUBLE)

SERVER "WebSocketWriterServer"

OPTIONS (

       FORMATTER 'JSON',

       --This needs to be a web socket accessible from s-Server

       "URL" 'wss://listener-streamer-services-poc.labs.myserver.com/84be767d-bc36-4d24-82c2-fa6c88e00c1b',

       "HEADER_Connection" 'Upgrade',

       "HEADER_Authorization" 'token d90621ca-9601-4a49-96f3-160f61082c34',

       "HEADER_Upgrade" 'websocket',

       "HEADER_Host" 'listener-myserver.com',

       "HEADER_Origin" 'listener-myserver.com'

   );

 

Again, to get data moving, you need to create and start a pump. You do so with code along the following lines:

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

 

CREATE OR REPLACE PUMP "writerPump" STOPPED AS

--We recommend creating pumps as stopped

--then using ALTER PUMP "Pumps"."writerPump" START to start it

INSERT INTO "WebSocketWriterSchema"."WebSocketWriterStream"

SELECT STREAM * FROM "MyStream";

--where "MyStream" is a currently existing stream

 

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

Implementing the ECD Agent for Web Sockets

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Port, format, host

FORMATTER=CSV

URL=wss://listener-streamer-services-poc.labs.myserver.com/84be767d-bc36-4d24-82c2-fa6c88e00c1b

HEADER_Connection=Upgrade

HEADER_Authorization=token d90621ca-9601-4a49-96f3-160f61082c34

HEADER_Upgrade=websocket

HEADER_Host=listener-myserver.com

HEADER_Origin=listener-myserver.com

SCHEMA_NAME='SOCKETWRITER'

TABLE_NAME=WebSocketStream

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, DOUBLE route_variant_id)