Table Reader Adapter

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Appendix A: Legacy Adapters and Agents >

Table Reader Adapter

Previous pageReturn to chapter overviewNext page

Note: This topic describes a legacy adapter. See the topics Reading from Other Sources and Writing to Other Destinations for current solutions.

The Table Reader adapter plugin "tails" an external database table in order to create a named data stream whose data types are compatible with the originating table's schema. Using the Table Reader Adapter requires setting up a database table on a database external to SQLstream as well as setting up the SQLstream pipeline to communicate with this table. The databases supported are Postgres, MySQL, SQLserver, Oracle, Paraccel, and Ingres. A state table, default name "SQLS_TableReader_State", is used to maintain a high-water mark.

A server is active while any foreign stream belonging to that server is in use by a query. When a server is active, the plugin makes JDBC connections to the foreign database. If the connection is lost, the Table Reader Adapter will try to reconnect and continue. If the reconnect fails, it will wait a bit and try again, and will continue trying to do so according to configuration options. On each pass the wait time increases by a factor, up to a maximum wait time, or to a maximum number of connection attempts. Configuration options for reconnection are defined in the table below.

To specify which columns to use, specify their names and types, as described in Defining the Foreign Stream.

The sequence of actions for setting up and using this plugin are described in the sections that follow:

1.Registering the plugin
2.Defining the server
3.Defining the foreign Stream
4.Creating a Control Stream (optional)
5. Indexing the Database you Intend to Tail and Creating a State Table

Installing the Plugin (Foreign Data Wrapper)

The Table Reader Adapter is registered by default during the SQLstream installation process. You can verify that it is present under the Plugin folder as TableReader.jar. Use CREATE OR REPLACE FOREIGN DATA WRAPPER to install the TableReader adapter. This code is the same regardless of the database you are reading.

-- Install TableReader

CREATE OR REPLACE FOREIGN DATA WRAPPER "TableReader"

  LIBRARY 'class com.sqlstream.plugin.tablereader.TableReaderStreamControlPlugin'

  LANGUAGE java

  DESCRIPTION 'adapter for reading "events" from an external database';

Defining the Server

Define one Server object for each set of tables that use the same options, using the CREATE OR REPLACE SERVER command. While each distinct table being read will have its own distinct foreign stream, distinct foreign servers are needed only when the server options must be different.

There are three ways to specify connection parameters and server/stream options for the server.

By using the OPTIONS section when creating a server or stream. Both CREATE statements let you specify connection parameters in their OPTIONS section. Some options can be defined in both the server and the stream, other options must be specifically defined in either the stream or the server.
By using a Jndiparams file. See Defining Server with JNDI Params.
By editing the connection parameters file. See Defining Server with a Connection Parameter File.

The following example shows defining connection parameters and options in the server definition itself.

CREATE OR REPLACE SERVER "Postgres_TableReader"

    FOREIGN DATA WRAPPER "TableReader"

    OPTIONS (

        URI 'jdbc:postgresql://localhost/mydatabase',

        DRIVER 'org.postgresql.Driver',

        connParamPrefix 'dbConn_',

        "dbConn_user" 'SQLstream',

        "dbConn_password" 'mypassword',

        "dbConn_applicationName" 'SQLstream TableReader Adapter',

        sqlDialect 'Postgres 8.x',

        stateTable 'SQLS_TableReader_State',

        pollingMillis '10000')

    DESCRIPTION 'Foreign server to monitor tables in "mydatabase"';

 

SQL/MED server ("foreign server") options

The table below shows the options specifiable in the CREATE SERVER command. You can also use jndiparams or a ConnParams file
 

Option

Description

Possible Values

(default in bold)

URI

JDBC connect string

See "Accepted values" below.

DRIVER

JDBC driver class

See "Accepted values" below.

connParamPrefix

Prefix for connection parameter options (case-insensitive)

CONN_

connParams

You can specify connection parameters in options, as shown in this example; or by using a file (e.g., foo.properties) in which you list the options. connParams tells where you have put this file, e.g., connParams dir/foo. Since the default path connParams is $SQLSTREAM_HOME/plugin/jndi, specifying connParams dir/foo causes the server to look in $SQLSTREAM_HOME/plugin/jndi/dir/foo.properties.

Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/5.0.XXX/s-Server.

string

user

Login user for source database table

No default

password

Login password for source database table

No default

databasename

Database name of source database

No default

[connParamPrefix] + applicationName

Connection property; value is displayed by management tools in SQL-Server so that dba can see which program is connected to database. With no value, blank shows up.

Optional: No default

[connParamPrefix] + clientProgramName

Connection property

 

Optional: No default

sqlDialect

Supported values listed below

Database vendor and version

stateTable

TableReader state table, usually "SQLS_TableReader_State"

No default: Must be supplied

pollingMillis

Description: Interval at which TableReader checks its queue for new rows. If no data is received in that time, pollingMillis is doubled, but never greater than pollingMillisMax.

10000L;

10s

pollingMillisMax

Description: Maximum interval at which TableReader checks its queue for new rows.

300000L

5 min.

RECONNECTMILLIS

Wait time (msecs) to begin reconnecting. Default is 10sec

10s

RECONNECTMULT

Reconnect wait time increase factor.

Square root of 2

RECONNECTMILLISMAX

Upper limit on reconnect wait time.

5m

RECONNECTTRIESMAX

Maximum number of reconnect attempts.

0 (no limit)

Defining the Foreign Stream

There are two parts to defining the Foreign Stream:

Defining the foreign stream columns. These need to be in the same order and have the same case-sensitive names as those in the source table. Column types need not exactly match the column types in the source foreign table, but must be assignable. For example, assigning a value from a column typed as bigint to a column typed as integer is valid as long as you can be certain that at runtime the assigned data will never exceed integer size. Mismatched foreign stream and source table rowtypes will generate validation errors.
Defining the foreign stream options. These options can add to the Server Options already defined, or can override the already-defined Server Options that would have been inherited.

Setting a Master Foreign Stream

For each set of tables defined by a server object, you need to set the MASTER option to true on one foreign stream which will then be the master. Nothing will flow until a select is done on the master. Do not set MASTER to true for more than one of the foreign streams using this server object.

Setting the Database Table

The DDL for each TableReader foreign stream must specify (in the eventsTable option) the external database table to which that stream corresponds.

Foreign Stream Options

Option

Description

Possible Values

(default in bold)

TYPE

This is used when defining a stream as a control stream or a tableEvents stream. For a tableEvents stream, this table shows additional options.

Required.

"tableEvents"

or

"Control"

MASTER

This needs to be set to true on one foreign stream which will then be the master. This can only be set to true on one foreign stream. Nothing will flow until a select is done on the master.

Required for master foreign stream.

true

false

eventsTable

Name of target table being read.

Required.

No default value

"name"

"schema.name"

"db.schema.name"

queryCol

Description: Name of column with unique ascending value or timestamp column:

--Must be an integer or date-time type.

--If queryCol can be non-unique, then to be selected a row must be less than the current-maximum-value for queryCol.

Required.

[column name]

DEFERSTART

Come up in suspended mode. No data read until a resume is done.

true

false

queryColGroupRange

Number of distinct queryCol values to retrieve during each poll.

--QueryColGroupRange specifies what range of query column keys to query for.

--For a timestamp-based key column, units are in seconds.

--For an integral-based key column, units are absolute.

--The table reader query will be restricted to a range whose starting point is the previous high-water mark.

--The end point of the range is calculated with QueryColGroupRange.

--The number of queries with no results, plus one, is multiplied by QueryColGroupRange.

--The end point of the range for the table reader query is the

sum of that result and the previous high-water mark.

0

integer

querytSort

Allow timestamps to be inserted out of order by x seconds. --If something is out of order by more than x seconds, then it will not be streamed.

-- For example: if id were a timestamp column, then setting querytSort '10' would cause a wait of 10 seconds before accepting rows, accepting them in sorted order.

Specifies the value X that determines the interval "high-water mark minus X" for queryCol entries. Arriving rows with queryCol values in that inclusive interval will not be streamed until the high-water mark rises by X. This criterion enables entries for queryCol to be inserted into the table out of order but within that range.

For integer queryCol entries, specifying X for querytSort means that a queryCol entry whose numeric difference from the current high-water mark is less than or equal to X will not be streamed.
For timestamp queryCol entries, X means that a queryCol entry whose timestamp difference from the current high-water mark is less than or equal to X seconds will not be streamed.
For example: if id were a timestamp column, then setting querytSort '10' would cause a wait of 10 seconds (in the queryCol values) from the high-water mark before streaming selected rows in sorted order.

" " (empty string)

string

If an integer is not specified, no querytSort-interval is used.

 

For timestamp values in queryCol, querytSort represents "how many seconds" for the interval: see description below.

SKIPVALIDATION

Description: Debug development tool. Should not be used in production.

--SKIPVALIDATION true means skip doing column type validation, which can be useful during development: foreign stream and target stream rowtypes are not compared and validated.

--SKIPVALIDATION false or omitted means a connect is required so that the table and stream can be compared, such as the foreign stream rowtime for read table.

true

false

customPredicate

Description: SQL expression to filter event query results.

The specified expression is ANDed with the WHERE clause of the event query, and must be valid for the specified sqlDialect.

 

Sample Code for Master Foreign Stream

Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "TableReaderData," and creates a foreign stream called "TableReaderMasterStream."

CREATE OR REPLACE SCHEMA "TableReaderData";

SET SCHEMA '"TableReaderData"';

 

CREATE OR REPLACE FOREIGN STREAM "TableReaderMasterStream" (

    "id" BIGINT NOT NULL,

   "col1" VARCHAR(10),

  "col2" DOUBLE,

 ...

  "col3" INTEGER)

   SERVER "Postgres_TableReader"

    OPTIONS (

     TYPE 'tableEvents',

 --Required

     MASTER 'true',

 --Required for one foreign stream. Set this for one foreign stream for this set of tables. Data will not flow if master is not set to true for one foreign stream.

     eventsTable 'sample_data',

 --Required. Indicates table to be tailed.

     queryCol 'id',

 ----Required. Name of column with ascending value or timestamp column

    queryColGroupRange '1000')

 DESCRIPTION 'Foreign stream to deliver rows appended to

                        ourdatabase.public.sample_data"';

queryCol and the Foreign Stream's High Water Mark

For each target table being tailed, there is one row in the state table. This row maintains a high-water mark for a target table's queryCol by either numeric record ID (BIGINT) or by record time (DATETIME or TIMESTAMP, depending on the database). TableReader uses the target table's primary key to determine which type to use for the high-water mark tracking.

Since table entries from multiple insertion sources can arrive without being in strict order, a newly arriving row can have a queryCol value higher than a prior row. If earlier rows with lower queryCol values were already streamed, a new arriving row with a higher queryCol value is seen as "out of order" and discarded.

You can minimize this data loss by specifying a querytSort parameter. This parameter establishes an interval in which rows with out-of-order queryCol can arrive. The querytSort value you choose enables the streaming of rows whose queryCol values are lower than "high-water mark minus querytSort." As incoming rows establish a new high-water mark, earlier rows outside that new interval can then be streamed.

Note: If the state table uses timestamp, the last row in the table is not streamed until another row comes in. This is because s-Server cannot set the highwater mark until it knows it has all the data for that timestamp.

SKIPVALIDATION and run-time errors

SKIPVALIDATION controls column type validation, whereby foreign stream and target table rowtypes are compared and validated. The default is 'false', meaning validation will be done. So when SKIPVALIDATION is omitted or explicitly 'false', a connect is required so that the table and stream rowtypes can be compared. When you set SKIPVALIDATION to 'true', foreign stream and target table rowtypes are not compared and validated.

During development, setting the SKIPVALIDATION server option to 'true' (to skip the column type validation) is useful and saves time, but is not recommended to be used in production.

Run time errors arise in the following cases:

Regardless of the value of SKIPVALIDATION, run-time errors will arise whenever the actual data received does not fit in the receiving column type. One example is a source field defined as bigint delivering a value that is too large for its corresponding receiving field defined as integer.
With SKIPVALIDATION 'true', rowtype validation is suppressed, so rowtypes are not passed, and when source data is not assignable to the target field's type, run-time errors will arise.

Run-time errors are logged to the trace log. (See SKIPVALIDATION in the foreign stream options table.)

Creating a State Table

The state table is a database table that tracks the last row read from the source table. The state table that maintains a high-water mark has the default name  SQLS_TableReader_State. For each source table being tailed, there is one row in the state table that tracks the last read made from the source table (high-water mark).

TableReader maintains its high-water mark for a source table's queryCol by either numeric record ID (BIGINT) or by record time (DATETIME or TIMESTAMP, depending on the database). TableReader automatically uses the source table's queryCol type to determine which type to use for the high-water mark tracking.

Sample Code for Setting up a State Table

The following code snippet can be used to set up a State Table on a postgres database. Similar code can be used in other databases. In this case, SQLstream is the owner of the state table and is the login user used. (To see State Table example code for other supported databases, which can vary in data types and SQL syntax, see Additional State Table Examples.)  The credentials and database for the state table are the

same as those specified for the table being tailed by TableReader.

CREATE TABLE "SQLS_TableReader_State" (

   "SQLS_tableName" varchar(30) PRIMARY KEY NOT NULL,

   "SQLS_highId" bigint,

  "SQLS_highTime" timestamp without time zone

);

COMMENT ON TABLE "SQLS_TableReader_State" IS 'high-water marks for TableReader adapter';

ALTER TABLE "SQLS_TableReader_State" OWNER TO SQLstream;Examples

 

Setting a Starting Row for the State Table

To set a starting row for the State Table, you use an INSERT statement with the name of the database. The State Table uses either a timestamp or numeric record number to set this row. For timestamps, the reader will start reading the first row with a timestamp more recent than the high water mark. For timestamps, the reader will start reading the first row with an id greater than the high water mark. The following code sets the high water mark at the numeric indicator "38018908".

INSERT

 INTO "SQLS_TableReader_State"

     ("SQLS_tableName", "SQLS_highId", "SQLS_highTime")

 VALUES ('reports_table', 38018908, NULL);

 

Code that uses a timestamp would look like the following:

INSERT

 INTO "SQLS_TableReader_State"

     ("SQLS_tableName", "SQLS_highId", "SQLS_highTime")

 VALUES ('reports_table', 38018908, NULL);

 

Example Code to Reset the State Table

The following code is an example of how to reset the State Table:

-- reset TableReader state table

DELETE

 FROM "SQLS_TableReader_State"

 WHERE "SQLS_tableName" = 'reports_sqlstream_clv';

INSERT

 INTO "SQLS_TableReader_State"

     ("SQLS_tableName", "SQLS_highId", "SQLS_highTime")

 VALUES ('reports_sqlstream_clv', 38018908, NULL);

-- End reset.postgres.sql

Troubleshooting

The following lines can be added to the /var/log/sqlstream/Trace.propertiesTrace.properties to enable tracing for TableReader:

 com.sqlstream.plugin.tablereader.level = [INFO,WARNING,SEVERE,FINE,FINER,FINEST]

 com.sqlstream.plugin.tablereader.data= [INFO,WARNING,SEVERE,FINE,FINER,FINEST]