Introduction to Pipelines

<< Click to Display Table of Contents >>

Navigation:  Building Streaming Applications >

Introduction to Pipelines

Previous pageReturn to chapter overviewNext page

Most s-Server applications are built as pipelines. Pipelines move Their exact composition will depend on your business requirements, but they are generally a series of s-Server streams, pumps, inputs, and outputs.

Pipelines consist of the following elements:

Pipeline Element

Explanation

Source Streams

These are streaming read access to third-party platforms, such as the file system, databases, sockets, Kafka, AMQP, IBM MQ, or Amazon Kinesis. You set these up by defining a server object and a foreign stream object. Once these are established, you can use pumps to move data from them into named streams.

Named Streams

These are intermediate tanks in the pipeline that you "fill" by starting a pump. You can get data from these by running a SELECT query. The key thing about intermediate streams (tanks) is that the data in them evaporates if there are no readers hooked up to them. You will often use these to move subsets of data around, so that such data is available for analytic views.

Views

Views are reusable queries. These are often used to cleanse or analyze data.

Pumps

Pumps (INSERT macros) are used in s-Server to pass data from one point in a streaming pipeline to the other. You can think of a pipeline as similar to a plumbing network. In this model, pumps are like faucets installed on top of a tank (a named stream). These take water from water processing equipment and push it into the tank. The processing equipment is assembled on demand based on the description of the processing (view definition).

Sink Streams or Tables

These are "tanks" that feed other systems, such as visualization tools or databases.

It is generally best practice to create all of these objects within the same schema. This will let you do things like drop all streams at once, or start all pumps at once. If you are using sqlline to communicate with s-Server, you could create all of these items in a single SQL script and then use the !run command to run the script.

pipeline_example

Keeping Pipeline Objects Compatible

In order for data to move through a pipeline, stream columns must always be compatible. Otherwise, errors might result.

sbur_match_datasqlrf_pump_pipeline_example

SQL for Source Streams

A source stream needs information to connect to its source. The code below sets up a server object for a file accessible through the file system.

CREATE OR REPLACE SERVER "BusFileReaderServer"

FOREIGN DATA WRAPPER ECDAWRAPPER

OPTIONS (classname 'com.sqlstream.aspen.namespace.common.FileSetColumnSet',

       parser 'CSV',

       character_encoding 'UTF-8',

       separator ',',

       skip_header 'false',

       directory '/tmp',

       filename_pattern 'buses\.log'

);

 

 

It then creates a foreign stream that uses this server object to pull data out of the file. The result is a stream with columns that you can query or pump to a named stream.

CREATE OR REPLACE SCHEMA "buses";

SET SCHEMA '"buses"';

CREATE OR REPLACE FOREIGN STREAM "buses_stream"

(

"id" DOUBLE, --Identification number for the bus.

"reported_at" TIMESTAMP, --Time location was reported.

"shift_no" DOUBLE, --Shift number for the bus's driver.

"trip_no" VARCHAR(4096), --Trip number for the bus.

"route_variant_id" VARCHAR(4096), --ID number for bus route.

"waypoint_id" VARCHAR(4096), --ID number for bus waypoint.

"lat" VARCHAR(4096), --Latitude of location.

"lon" VARCHAR(4096), --Longitude of location.

"speed" DOUBLE, --Reported speed of bus.

"bearing" VARCHAR(4096), --Navigational bearing for bus.

"driver_no" DOUBLE, --Driver identification for number.

)

SERVER "BusFileReaderServer"

--Server created in the previous step.

--Provides connection information for the log file.

;

 

SQL for Enriching Data

To enrich data means to take streaming data and use historical data to add information to the stream. The streaming data from our buses demonstration includes information on buses longitude and latitude. Using information from a mapping database, we can identify the road segment for a particular latitude/longitude.

CREATE OR REPLACE VIEW "ConditionedPositionsWithRoadInfo"

DESCRIPTION 'Vehicle positions and road information' AS

   SELECT STREAM *

       FROM STREAM("roadInfo"(CURSOR(SELECT STREAM VID, "DateTime",

                                                   CAST(NULL AS SMALLINT) AS "segmentId",

                                                   "vLat", "vLon",

                                                   "Bearing" AS "vBearing",

                                                   "Speed" AS "vSpeed"

                                         FROM "Stage1" WHERE MISSING = 0 AND

                                                             MISMATCH = 0 AND

                                                             BOUNDS = 0),

                              CURSOR(SELECT STREAM * FROM "RoadInfoControl"),

                              'road_segments', 'vLat', 'vLon',

                              10000, -- cache

                              false, -- no preload

                              false  -- no fuzzy

                   ));

 

Analytic View on Data

Here, you use SQL or a UDX to perform some calculation on your data. This might be calculating a running average, sorting data into groups, calculating frequency distribution, calculating deviation, and so on. This is the middle of the pipeline, after data has been pulled into s-Server and before it is visualized or written. Often, you will use views to perform such analysis. Views are reusable "macros" that

For example, the SQL below creates a view that identifies speeding buses by testing if speed is over 75 mph.

CREATE or REPLACE VIEW "buses"."speeders" AS SELECT STREAM * FROM "buses"."buses_with_rowtime" WHERE "speed" > 75;

 

 

SQL for External Sink

Here, we are creating a server object that connects with an external database.

CREATE OR REPLACE SERVER "PostgreSQL_DB_1"

   FOREIGN DATA WRAPPER "SYS_JDBC"

   OPTIONS (

       "URL" 'jdbc:postgresql://localhost/demo',

       "USER_NAME" 'demo',

       "PASSWORD" 'demo',

       "SCHEMA_NAME" 'public',

       "DIALECT" 'PostgreSQL',

       "pollingInterval" '1000',

       "txInterval" '1000',

       "DRIVER_CLASS" 'org.postgresql.Driver'

   );

 

 

We then create a foreign table that uses this server object. As you move data into this table, it automatically moves to the defined table in the foreign database.

SET SCHEMA '"buses"';

CREATE FOREIGN TABLE "postgres_archive"

--these are column names for the foreign table as it exists in s-Server

   ("id" double,

    "reported_at" TIMESTAMP,

    "shift_no" DOUBLE,

    "trip_no" DOUBLE)

   SERVER "PostgreSQL_DB_1"

   OPTIONS (

   "SCHEMA_NAME" 'public',

   --this is the table name in the postgres database.

   "TABLE_NAME" 'buses_archive',

   --Amount of data to wait for before committing.

   "TRANSACTION_ROW_LIMIT" '0',

   --Amount of time to wait between commits.

   "TRANSACTION_ROWTIME_LIMIT" '1000'

);

 

Pumps

For all of the above stages, we need to create pumps to move data from stage to stage. Like streams, views, and tables, you create pumps in schemas. By default, pumps are created as stopped.

CREATE PUMP "buses"."postgres-pump" AS

INSERT INTO "buses"."postgres_archive"

   ("id", "reported_at", "shift_no", "trip_no")

SELECT STREAM "id", "reported_at", "shift_no", "trip_no"

   FROM "buses"."buses";

 

Starting Pumps

You can start all pumps in a single schema by using ALTER PUMP yourschema.* START. This ensures that all pumps are started in what we call topological order. Starting pumps this way ensures that data will be available for all streams.

ALTER PUMP "buses".* START;

 

This section describes tips for building and troubleshooting application pipelines.

Stream Computing Architecture (SCA) Approach to Organizing SQL

Troubleshooting Streaming SQL