Implementing Federation for s-Server Instances in a Cluster of Compute Nodes

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > SQLstream JDBC Driver >

Implementing Federation for s-Server Instances in a Cluster of Compute Nodes

Previous pageReturn to chapter overviewNext page

Using the SQLstream JDBC driver, you can federate (link together) multiple instances of s-Server with fine-grained topology. You do so using DDL similar to that used for a SQL/MED connection to other databases. See the topic Reading Data from RDBMS Sources in this guide for more details.

Once you federate multiple instances of s-Server, you can query from and insert into streams and tables in the federated instances of s-Server.

Once you implement federation, you can perform pipeline operations such as parsing, stream enrichment through stream-table joins, filtering and aggregation over these federated instances of s-Server. These operations are described in the section Automatic Distribution of Streaming Workloads Across Federated s-Server Instances.

Federation allows for complex topologies declared entirely in SQL. See the section Scale-up & Scale-Out of Streaming Applications.

Setting Up Federation

Setting up federation requires three steps:

1.Creating a foreign data wrapper for the other instance of s-Server that references the JDBC driver.

2.Creating a server object that references this foreign data wrapper and includes connection information for the server.

3.Creating a foreign stream that references the server object.

Creating a Foreign Data Wrapper

To install the JDBC driver, you create a foreign data wrapper along the following lines:

CREATE OR REPLACE FOREIGN DATA WRAPPER CLIENT_JDBC

   LIBRARY 'class com.sqlstream.aspen.namespace.jdbc.AspenMedJdbcForeignDataWrapper'

   LANGUAGE JAVA

   OPTIONS (DRIVER_CLASS 'com.sqlstream.jdbc.Driver');

 

 

For more detail, see the topic CREATE FOREIGN DATA WRAPPER in the Streaming SQL Reference Guide.

Creating a Server Object

In the defining server section, you create a data server using the same options as a MED/JDBC foreign data server. See the subtopic "JDBC Foreign Server Definition" in the topic Writing to RDBMS systems in this guide for details on the options defined below.

CREATE OR REPLACE SERVER "my-federation-server" FOREIGN DATA WRAPPER CLIENT_JDBC

OPTIONS(

   URL 'jdbc:vjdbc:sdp://remote-host:5570',

   USER_NAME 'sa',

   PASSWORD '',

   SCHEMA_NAME 'remote-schema-name'

);

For more detail, see the topic CREATE SERVER in the Streaming SQL Reference Guide.

Creating a Foreign Stream for the Federated Server

In order to access data from the federated server, you need to create a special kind of stream called a foreign stream. Options in the stream specify the file format, character separators, whether or not a header should be written, and so on, as well as any options specific to the format type.

Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "FederationData," and creates a foreign stream called "my-federation-stream."

5_2_indicator Feature As of version 5.2, you need to specify the name of the remote stream with the "STREAM_NAME" option.

In addition to a column list, this foreign stream sets two options:

SCHEMA_NAME, which is the schema name in the foreign s-Server.

TABLE_NAME, which is the name of the table in the foreign s-Server

OR

STREAM_NAME, which is the name of the stream in the foreign s-Server.

CREATE OR REPLACE SCHEMA "FederationData";

SET SCHEMA '"FederationData"';

 

CREATE OR REPLACE FOREIGN STREAM "my-federation-stream"

("recNo" INTEGER,

"ts" TIMESTAMP NOT NULL,

"accountNumber" INTEGER,

"loginSuccessful" BOOLEAN,

"sourceIP" VARCHAR(32),

"destIP" VARCHAR(32),

"customerId" INTEGER,)

SERVER "my-federation-server"

OPTIONS ("SCHEMA_NAME" 'remote-schema-name',

        'STREAM_NAME" 'remote-stream');

 

Querying Streams

Once you have set up the foreign data wrapper and server above, you query streams and tables in the 'remote-schema-name' schema in the server on remote-host as if they were on the local server, using the SCHEMA_NAME as a qualifier, as in the following code:

SELECT STREAM * FROM "FederationData"."my-federation-stream";

 

The data will be pulled from "remote-schema-name"."remote-stream" on the remote s-Server by the foreign data wrapper.

 

Note: You cannot insert into a stream on the remote s-Server. You also cannot create or drop streams and tables in the remote schema.

Automatic Distribution of Streaming Workloads Across Federated s-Server Instances

This section describes how streaming pipelines such as parsing and stream enrichment through stream-table joins, filtering, and, in particular, aggregations, can be distributed across federated s-Server instances in a cluster of compute nodes.

You can use message queue middlewares such as Apache Kafka or AWS Kinesis to distribute raw payloads across multiple s-Server instances. This allows you to perform those analytics in a distributed manner. For more information on using Kafka and Kinesis with s-Server, see the topics Reading from Kafka, Writing to Kafka, Reading from Kinesis, and Writing to Kinesis

CREATE OR REPLACE FOREIGN STREAM aPartitionedKafkaStream (

)

SERVER KafkaServer

OPTIONS (

  TOPIC_NAME 'MyTopicWithPartitions',

  CONSUMER_GROUP 'aPartitionedKafkaStreamReader',

  BROKER_LIST 'localhost:9092',

  …

);

 

Streaming transformations such as parsing and enrichment tend to be stateless. Such transformations can be distributed across nodes easily.

Distributing stateful transformations such as aggregations is slightly more difficult, but still straightforward. The following section describe techniques that you can use to distribute aggregations across federated s-Server instances.

An application may aggregate data by either

1.Using the GROUP BY clause of the SELECT statement, known as tumbling windows.

2.Using the OVER clause of the SELECT statement, known as sliding windows.

You can easily distribute these aggregation workloads across multiple nodes in a cluster using the simple SQL scripts described in the following sections.

Parallel Aggregations

Tumbling Windows/GROUP BY

For tumbling windows, you can use rollup aggregation, partitioned parallel aggregation, or both. You can do a parallelizing GROUP BY operation in two ways:

Rollup Aggregation

Partitioned Parallel Aggregation

Sliding Windows/OVER

For sliding windows, you need to use Partitioned Parallel Aggregation.

Using Rollup Aggregation to Aggregate Tumbling Windows

The following represents aggregating four instances of s-Server into one RollupAggView on another instance of s-Server.

ad_rollupaggregation

 

-- Run on each s-Server instance

CREATE OR REPLACE SCHEMA federation;

SET SCHEMA 'federation';

CREATE OR REPLACE PartitionView AS

SELECT STREAM *

FROM aPartitionedKafkaStream AS s;

 

CREATE OR REPLACE VIEW DistributedAggView AS

SELECT STREAM

   FLOOR(s.ROWTIME TO MINUTE), key1, key2,

   SUM(v1) AS sum_v1, COUNT(v2) AS count_v2

FROM PartitionView   -- runs on node N

GROUP BY FLOOR(s.ROWTIME TO MINUTE), key1, key2;

 

-- Run on the "connected" instance (assume Node1 for this example)

-- Declare federated servers on Node1

CREATE OR REPLACE SERVER Node2

DATA_WRAPPER SYS_JDBC

OPTIONS (

      DRIVER_CLASS 'com.sqlstream.jdbc.Driver',

      URL 'jdbc:sqlstream:sdp://localhost:6570',

      USER_NAME 'sa',

      PASSWORD 'mumble'

      );

 

CREATE OR REPLACE SERVER Node3

DATA_WRAPPER SYS_JDBC

OPTIONS (

      DRIVER_CLASS 'com.sqlstream.jdbc.Driver',

      URL 'jdbc:sqlstream:sdp://localhost:7570',

      USER_NAME 'sa',

      PASSWORD 'mumble'

      );

 

CREATE OR REPLACE SERVER Node4

DATA_WRAPPER SYS_JDBC

OPTIONS (

      DRIVER_CLASS 'com.sqlstream.jdbc.Driver',

      URL 'jdbc:sqlstream:sdp://localhost:8570',

      USER_NAME 'sa',

      PASSWORD 'mumble'

      );

 

CREATE OR REPLACE FOREIGN STREAM Node2_DistributedAggView

SERVER Node2

OPTIONS (

   STREAM_NAME 'DistributedAggView',

   SCHEMA_NAME 'federation'

);

 

CREATE OR REPLACE FOREIGN STREAM Node3_DistributedAggView

SERVER Node3

OPTIONS (

   STREAM_NAME 'DistributedAggView',

   SCHEMA_NAME 'federation'

);

 

CREATE OR REPLACE FOREIGN STREAM Node4_DistributedAggView

SERVER Node4

OPTIONS (

   STREAM_NAME 'DistributedAggView',

   SCHEMA_NAME 'federation'

);

 

CREATE OR REPLACE VIEW AllPartitionsAggView AS

SELECT STREAM * DistributedAggView  -- local

UNION ALL

SELECT STREAM * Node2_DistributedAggView  -- federation

UNION ALL

SELECT STREAM * Node3_DistributedAggView  -- federation

UNION ALL

SELECT STREAM * Node4_DistributedAggView; -- federation

 

-- Run on the "connected" instance.

-- Rollup for final result

CREATE OR REPLACE VIEW RollupAggView AS

SELECT STREAM

   FLOOR(s.ROWTIME TO MINUTE), key1, key2,

   SUM(sum_v1) AS sum_v1, SUM(count_v2) AS count_v2

FROM AllPartitionsView

GROUP BY FLOOR(s.ROWTIME TO MINUTE), key1, key2;

 

 

Using Partitioned Parallel Aggregation to Aggregate Tumbling or Sliding Windows

In partitioned parallel aggregation, you aggregate data from the same partitions on four different nodes. The diagram below illustrates views on each node's partition being aggregated into an aggregated view of partitions on all four nodes. For each node, the views named Partition1View, for example, are aggregated into a view called PartitionUnion1View. In the last step, the four aggregated views are combined using a UNION ALL statement into AllPartitionsView.

ad_partitioned_parallel_agg

 

-- Run on each s-Server instance

CREATE OR REPLACE SCHEMA federation;

SET SCHEMA 'federation';

 

-- n = num partitions

-- create n views, one on each node

CREATE OR REPLACE PartitionView AS

SELECT STREAM *

FROM aPartitionedKafkaStream AS s;

 

CREATE OR REPLACE VIEW Partition1View AS

SELECT STREAM key1, key2, v1, v2 FROM PartitionView

WHERE HASH(key1, key2) = 1;

CREATE OR REPLACE VIEW Partition2View AS

SELECT STREAM key1, key2, v1, v2 FROM PartitionView

WHERE HASH(key1, key2) = 2;

 

CREATE OR REPLACE VIEW Partition3View AS

SELECT STREAM key1, key2, v1, v2 FROM PartitionView

WHERE HASH(key1, key2) = 3;

 

CREATE OR REPLACE VIEW Partition4View AS

SELECT STREAM key1, key2, v1, v2 FROM PartitionView

WHERE HASH(key1, key2) = 4;

 

-- Union data partitioned by HASH(key1, key2) = 1

CREATE OR REPLACE VIEW PartitionUnion1View AS

SELECT STREAM * Partition1View  -- federation

UNION ALL

SELECT STREAM * Node2_Partition1View  -- federation

UNION ALL

SELECT STREAM * Node3_Partition1View  -- federation

UNION ALL

SELECT STREAM * Node4_Partition1View; -- federation

 

CREATE OR REPLACE VIEW PartitionedAgg1View AS

SELECT STREAM

   FLOOR(s.ROWTIME TO MINUTE), key1, key2,

   SUM(v1) AS sum_v1, COUNT(v2) AS count_v2

FROM PartitionUnion1View

GROUP BY FLOOR(s.ROWTIME TO MINUTE), key1, key2;

 

-- Run on the "connected" instance

-- Union Final result

CREATE OR REPLACE VIEW AllPartitionsView AS

SELECT STREAM * PartitionedAggView  -- federation

UNION ALL

SELECT STREAM * Node2_PartitionedAggView  -- federation

UNION ALL

SELECT STREAM * Node3_PartitionedAggView  -- federation

UNION ALL

SELECT STREAM * Node4_PartitionedAggView; -- federation

 

Scale-up & Scale-Out of Streaming Applications

The diagram below shows how a streaming application can be distributed across multiple nodes in a cluster. The distribution of queries across nodes achieves the following objectives:

Each node in a cluster subscribes to subset of partitions of a partitioned Kafka topic.

Each node creates a dedicated pipeline of parsers/data cleansing operations for each Kafka partition.

All operators in a given pipeline execute concurrently, taking advantage of multiple CPU cores on the node. This is referred to as "Pipeline Parallelism" or "vertical scaling" or "scale up".

The result of these pipelines are routed to "aggregator operators" instantiated on each node in a cluster. In short, the aggregation operator is distributed "horizontally" across all nodes in a cluster. This is also known as "horizontal scaling" or "scale out"

The entire topology is described in declarative SQL, taking advantage of s-Server's federation capabilities.

The application is "pinned" to node 1 from the consumer's perspective.

ad_scaleup_scaleout