Table Update Adapter

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Appendix A: Legacy Adapters and Agents >

Table Update Adapter

Previous pageReturn to chapter overviewNext page

Note: This topic describes a legacy adapter. See the topics Reading from Other Sources and Writing to Other Destinations for current solutions.

The Table Update adapter uses rows from a stream to update an external database table by means of SQL inserts, updates, and upserts/merges. As with most adapters, you use it by registering the adapter, defining a server object, and defining on or more foreign streams that use this server object. You can use it to push data from s-Server to another database.

To specify which columns to update, specify their names and types, as described in Define the Foreign Stream.

To specify whether such updates should change column values incrementally or by replacement, use the Foreign Stream management columns and column options.

Note: For Vectorwise, you can use the Table Update Adapter to bulk load setting batchsize to large number such as 50000 rows. See Vectorwise connection parameters below for details.

The sequence of actions for setting up and using this plugin are described in the sections that follow:

Registering the plugin
Defining the server
Defining the foreign Stream

The adapter is fully defined in the com.sqlstream.plugin.tableupdate JavaDoc, and its classes implement the interfaces in com.sqlstream.plugin.impl, with further dependency on com.sqlstream.plugin.

See the topic Table Reader Adapter in this guide.

Registering the Plugin (Foreign Data Wrapper)

The TableUpdate Adapter is registered by default during the SQLstream installation process. You can verify that it is present under the Plugins folder as TableUpdate.jar. Use CREATE OR REPLACE FOREIGN DATA WRAPPER to install the adapter and create a name for the tableupdate adapter in your catalog so that you can reference it in later steps.

-- Install the TableUpdate adapter

CREATE OR REPLACE FOREIGN DATA WRAPPER "TableUpdate"

   LIBRARY 'class com.sqlstream.plugin.tableupdate.TableUpdateStreamControlPlugin'

   LANGUAGE JAVA

   DESCRIPTION 'adapter for applying external database changes';

 

   DESCRIPTION 'MySQL 2010 TableUpdates';

-- Define the foreign stream that will upsert to MySQL table

Defining the Server

You define one server object for each set of tables that use the same options, using the CREATE OR REPLACE SERVER command. Server objects contain connection information for an external server. While each distinct table that needs to be updated will have its own distinct foreign stream, you only need to define one server for each external database.

In the TableUpdate server, you provide vendor-specific database connection info.

These can be supplied in three ways:

1.Connect string parameters on the URI
2.Individual options prefixed either by CONN_ (the default) or the value of the connParamPrefix option
3.The JNDI properties file specified by the connParams option, where "dir/foo" specifies "$SQLSTREAM_HOME/plugin/jndi/dir/foo.properties"

Example Using Options Defined in CREATE SERVER Statement

-- Define the SQL/MED external/foreign server:

CREATE OR REPLACE SERVER "MySQL_TableUpdates"

   FOREIGN DATA WRAPPER "TableUpdate"  --must match foreign data wrapper label above

   OPTIONS (

       SKIPVALIDATION 'false',

       URI 'jdbc:mysql://localhost:3306/mysql_2010_downloadstats',

       DRIVER 'com.mysql.jdbc.Driver',

       connParamPrefix 'dbConn_',

       "dbConn_databaseName" 'mysql_2010_downloadstats',

       "dbConn_user" 'SQLstream',

       "dbConn_password" '',

       "dbConn_applicationName" 'SQLstream TableUpdate Adapter',

       sqlDialect 'MySQL 5.x',

       commitMillis '5000' ,

       commitCount '3200' )

Example Using Options Defined in JNDI Properties File

where properties file is "mochi/vis.postgres.properties."

CREATE OR REPLACE SERVER "Postgres_TableUpdate"

FOREIGN DATA WRAPPER "TableUpdate"

OPTIONS (

   connParams 'mochi/viz.postgres',

   sqlDialect 'Postgres 8.x',

   pollingMillis '5000',

   commitCount '5',

   batchCount '0',

   commitMillis '2000')

DESCRIPTION 'PostgreSQL database with visualization results';

Contents of JNDI File for Example

URI=jdbc:postgresql://localhost/mochi

DRIVER=org.postgresql.Driver

CONNPARAMPREFIX=dbConn_

dbConn_databaseName=mochi

dbConn_user=SQLstream

dbConn_password=

dbConn_applicationName=SQLstream TableUpdate Adapter

 

Defining the Foreign Stream

Since the TableUpdate adapter does not know the structure of the target foreign table, you must define the SQLstream FOREIGN STREAM rowtype to match the rowtype of the SQL Server target table. The foreign stream columns need to be in the same order and have the same case-sensitive names as those of the target table.

The foreign stream column types need not exactly match the column types in the target foreign table, but must be assignable. For example, assigning a value from a column typed as bigint to a column typed as integer is valid as long as you can be certain that at runtime the assigned data will never exceed integer size.

At DDL validation time, when the sql is run (typically by !run catalog.sql), the foreign stream and target table rowtypes are compared. A mismatch generates validation errors.

Example code for TableUpdate Stream

Since the TableUpdate adapter does not know the structure of the target foreign table, you need to define the SQLstream FOREIGN STREAM rowtype to match the rowtype of the target table. The foreign stream columns need to be in the same order and have the same case-sensitive names as those of the target table.

Like all streams, foreign streams must be created within a schema. The example below assumes that a schema has been defined. The code here first creates and sets the schema "Test," then creates a foreign stream called "Persist_DownloadsPerVersion" which uses the server "MySQL_TableUpdate" created in the previous example, and updates a table called "DownloadsPerVersion" with the rows "update" "ts" "product" "version" "country" and "count".

CREATE OR REPLACE FOREIGN STREAM "Persist_DownloadsPerVersion" (

   "SQLS_opcode" CHAR(2) NOT NULL OPTIONS("update" 'skip'),

   "SQLS_chg" VARBINARY(32) OPTIONS("update" 'skip'),

   "ts" TIMESTAMP NOT NULL OPTIONS("update" 'skip'),

   "product" VARCHAR(12) NOT NULL OPTIONS ("update" 'skip'),

   "version" VARCHAR(12) NOT NULL OPTIONS("update" 'skip'),

   "dlType" VARCHAR(15) OPTIONS("update" 'skip'),

   "country" VARCHAR(3) OPTIONS("update" 'skip'),

   "count" INTEGER NOT NULL) OPTIONS("update" 'skip'),

   SERVER "MySQL_TableUpdates"

   OPTIONS (

       TYPE 'tableUpdates',

       MASTER 'true',

       RequirePK 'false',

       updatesTable 'DownloadsPerVersion',

       batchCount '3200')

   DESCRIPTION 'apply streaming upserts to "DownloadsPerVersion"';

 

Updates to rows would be made as shown below. In this example, rows are being merged ('ME').

CREATE OR REPLACE PUMP "100-Persist_DownloadsPerVersionPump" STOPPED

DESCRIPTION 'pump Download activity into "DownloadsPerVersion" table' AS

   INSERT INTO "Persist_DownloadsPerVersion" (

       "SQLS_opcode", "SQLS_chg",

       "ts", "product", "version", "dlType", "country", "count")

   SELECT STREAM

       'ME', CAST(NULL AS VARBINARY),

       "ts_org",

       "product_org",

       "version_org",

       "dlType_org",

       "country_org",

       "count"

   FROM "AnotherStream";

 

Overrides to Columns are made as shown in the example below, where the SQLS_opcode passed is 'ME', causing the ts_org column to be unaffected, though the ts_org column will be incremented by 1 as per the table above.

CREATE OR REPLACE PUMP "100-Persist_DownloadsPerVersionPump" STOPPED

DESCRIPTION 'pump Download activity into "DownloadsPerVersion" table' AS

   INSERT INTO "Persist_DownloadsPerVersion" (

       "SQLS_opcode", "SQLS_chg",

       "ts", "product", "version", "dlType", "country", "count")

   SELECT STREAM

       'ME', CAST(NULL AS VARBINARY),

   --MERGE update option for TableUpdate operations

       "ts_org",

       "version_org", - eg column merge override

       "dlType_org",

       "country_org",

       "count"

   FROM "AnotherStream";

 

Defining the management columns for the Foreign Stream

Each foreign stream created to update a table must have two management columns followed by a list of columns that align with the columns in the target table. In the "SQLS_opcode" management column, you specify updating as incremental or by replacement. While inserting rows into the foreign stream, you need to pass appropriate values in that column specifying the type of operation you want to perform. The foreign stream columns need not be exactly the same types as the table's columns, but the names must match exactly and the target columns must be assignable from the foreign stream columns.

The management columns are:

Column name

Meaning

Explanation

"SQLS_opcode" CHAR(2) NOT NULL

Operation code

Specifies what TableUpdate should do with this row.

See Update Options table below for updates and column modes.

"SQLS_chg" VARBINARY(32)

Bitmap of changed columns:

currently unused

Supply null for this column.

Use CAST(NULL AS VARBINARY) in the SELECT list of a pump.

(See invocation example code.)

Update Options for TableUpdate Operations

You direct the type of operation to be performed by inserting one of the following two-character codes in the SQLS_opcode column of the foreign stream.

SQLS_

opcode

Operation

Notes

IN

INSERT

Inserts row to target table.

Note: TableUpdate does not enforce uniqueness. It is expected that no row with the same primary key exists in the table. The underlying database will issue error if there is a constraint on the key columns.

ME

MERGE ("upsert")

If no row exists with the supplied primary key, ME applies the insert mode specified in the call.

If such a row does exist, ME updates that existing row.

The Column Mode options section describes the insert and update modes.

UP

UPDATE

Updates existing row with new values.

No-op if no matching row exists.

See the Column Mode options section for incremental updates.

US

UPDATE STRICT

Updates existing row with new values.

If no matching row exists, an error is logged to the trace log.

See the Column Mode options section for incremental updates.

DE

DELETE

Deletes row from target table.

No-op if no matching row exists.

DS

DELETE STRICT

Deletes row from target table.

If no matching row exists, an error is logged to the trace log.

MM

MEMO

No-op.

Column Mode options

These options specify how to handle an individual column for an insert, merge, or update, overriding the Update option for the table. You use these options to adjust how the table updates at a column level. The first option lets you change the kind of update, and the second lets you change how the update itself occurs, as in the following example.

"sumtotal" INTEGER OPTIONS("update" 'plus')

 

The external table's schema may specify column defaults such that a Table Update Adapter insert operation should ignore, or skip, one or more columns. Similarly, it might be appropriate for a Table Update Adapter update operation to leave one or more columns unchanged. The Table Update Adapter uses the column mode options to generate insert/update/merge SQL in the target database's dialect.

Update types

The first argument to OPTIONS lets you override the type of update for the column.

Option

Definition

INSERT

Inserts row to target table.

TableUpdate does not enforce uniqueness.

Underlying database will issue error if there is a constraint on the key columns.

It is expected that no row with the same primary key exists in the table.

MERGE ("upsert")

If no row exists with the supplied primary key, ME applies the insert mode specified in the call.

If such a row does exist, ME updates that existing row.

The Column Mode options section describes the insert and update modes.

UPDATE

Updates existing row with new values.

No-op if no matching row exists.

See the Column Mode options section for incremental updates.

UPDATE STRICT

Updates existing row with new values.

If no matching row exists, an error is logged to the trace log.

See the Column Mode options section for incremental updates.

DELETE

Deletes row from target table.

No-op if no matching row exists.

DELETE STRICT

Deletes row from target table.

If no matching row exists, an error is logged to the trace log.

MEMO

No-op.

Possible values for second argument

The possible values for the second argument to OPTIONS change depending on the type of update, and change the mode of the update type.

Insert modes (SQLS_opcode="IN" or first argument set to INSERT or INSERT STRICT)

Insert behavior operates as shown in the following table, where val is the new value provided by the insert.

mode

new column value

notes

none

val

Conventional insert

skip

column default

Column omitted from generated SQL

Notes:

A column defined as "skip" must not also be defined as NOT NULL, because you must not insert any column value for a "skip" column, and this required omission will violate the NOT NULL condition.
Merge operations apply the insert mode for new rows (and the update mode for existing rows).

Update modes  (SQLS_opcode="UP" or "US" or first argument set to UPDATE or UPDATE STRICT)

Update behavior operates as shown in the following table, where col is the existing column value and val is the new value provided by the update.

mode

new column value

notes

none

val

Conventional update.

skip

col

Column omitted from generated SQL.

plus

col + val

Update using col + val.

incr

col + 1

Increment col by one.

greatest

greatest(col, val)

Update using the greater of col or val.

least

least(col, val)

Update using the lesser of col or val.

coalesce

coalesce(col, val)

If col is null, update using val.

Merge modes (SQLS_opcode="ME" or first argument set to MERGE)

Merge operations apply the update mode for existing rows (and the insert mode for new rows).

SKIPVALIDATION and run-time errors

SKIPVALIDATION controls column type validation, whereby foreign stream and target table rowtypes are compared and validated. The default is 'false', meaning validation will be done. So when SKIPVALIDATION is omitted or explicitly 'false', a connect is required so that the table and stream rowtypes can be compared. When you set SKIPVALIDATION to 'true', foreign stream and target table rowtypes are not compared and validated.

During development, setting the SKIPVALIDATION server option to 'true' (to skip the column type validation) is useful and saves time, but is not recommended to be used in production.

Run time errors arise in the following cases:

Regardless of the value of SKIPVALIDATION, run-time errors will arise whenever the actual data received does not fit in the receiving column type. One example is a source field defined as bigint delivering a value that is too large for its corresponding receiving field defined as integer.
With SKIPVALIDATION 'true', rowtype validation is suppressed, so rowtypes are not passed, and when source data is not assignable to the target field's type, run-time errors will arise.

Run-time errors are logged to the trace log. (See SKIPVALIDATION in the server definition example code.)

Configuration

The tables below show the options for the SQL/MED server ("foreign server") and foreign streams.

SQL/MED server ("foreign server") options

Option

Description

Possible Values (default in bold)

URI

JDBC connect string

See below

DRIVER

JDBC driver class

See below

connParamPrefix

Prefix for connection parameter options, default is "CONN_"

CONN_

connParams

Alternate location for connection parameter options;

Specifies properties file, such as $SQLSTREAM_HOME/plugin/jndi/dir/foo.properties

Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/5.0.XXX.

string

user

Login user for source database table

No default

password

Login password for source database table

No default

databasename

Database name of source database

No default

sqlDialect

Database vendor and version, supported values listed below

See supported values in table reader section

commitMillis

interval at which TableUpdate commits; default is 0, which disables the commit timer

 

commitCount

Number of rows to insert/update before committing transaction

1

Sample JNDI Properties File

Connection parameters can be defined in a JNDI properties file. For example, the following is a sample set of connection parameters for PostgreSQL:

DRIVER=org.postgresql.Driver

URI=jdbc:postgresql://myserver

CONNPARAMPREFIX=dbConn_

dbConn_databaseName=mydatabase

dbConn_user=SQLstream

dbConn_password=mypassword

dbConn_applicationName=SQLstream TableUpdate Adapter

SQL/MED foreign stream options

Option

Description

TYPE

Must be ""tableUpdates""

MASTER

Must be "true"

updatesTable

Name of target table

Can be "name", "schema.name", "db.schema.name"

requirePK

If requirePK is true, then the target table must have a primary key (or keys);

If it is false, then the target table need not have a primary key, but if there is no primary key, then merge (upsert) is not supported.

batchCount

Number of rows to insert or update at a time in one jdbc batch. Batches will be smaller if commit is needed.

Accepted values for the sqlDialect option (with corresponding connect strings and driver classes)

For PostgreSQL and PostgreSQL 8.x

URI

jdbc:postgresql://host:port/database

DRIVER

org.postgresql.Driver

Default port

5432

Dialect

"Postgres" or

"Postgres 8.x"

"PostgreSQL"

"PostgreSQL 8.x"

"Vectorwise"

For MySQL and MySQL 5.x

URI

jdbc:mysql://host:port/database

DRIVER

com.mysql.jdbc.Driver

Default port

3306

Dialect

"MySQL" or "MySQL 5.x"

SQLserver

SQL Server, SQL Server 2000, SQL Server 2005, SQL Server 2000/2005, Microsoft SQL Server, Microsoft SQL Server 2000, Microsoft SQL Server 2005, Microsoft SQL Server 2000/2005

URI

jdbc:sqlserver://host:port

DRIVER

com.microsoft.sqlserver.jdbc.SQLServerDriver

Default port

1433

Dialect

"Microsoft SQL Server 2000/2005/2008" or

"Microsoft SQL Server"

"Microsoft SQL Server 2000"

"Microsoft SQL Server 2005"

"Microsoft SQL Server 2008"

"SQL Server 2000/2005/2008"

"SQL Server"

"SQL Server 2000"

"SQL Server 2005"

"SQL Server 2008"

For Oracle

URI

jdbc:oracle:thin:@localhost:1521/SGA

or

jdbc:oracle:thin:@localhost:1521:XE

DRIVER

oracle.jdbc.OracleDriver

Default port

1521

Dialect

"Oracle"

"Oracle 10.x",

Generically, the Oracle connection URL is jdbc:oracle:thin:@<host>:<port>:<SID> or jdbc:oracle:thin://@<host>:<port>/<SERVICE_NAME>

For Paraccel

URI

jdbc:paraccel://localhost:5439

DRIVER

com.paraccel.Driver

Default port

5439

Dialect

"Paraccel"

Generically, the Paraccel connection URL is jdbc:paraccel://<host>:<port>

For Vectorwise

URI

jdbc:ingres://localhost:GG7/hcl

DRIVER

com.ingres.jdbc.IngresDriver

Default port

 

Note: you can achieve high-speed load (bulk-loading) with Vectorwise by setting batchsize to large number such as 50000 rows.

Troubleshooting

The following lines can be added to the /var/log/sqlstream/Trace.propertiesTrace.properties to enable tracing for TableUpdate:

 com.sqlstream.plugin.tableupdate.level = [INFO,WARNING,SEVERE,FINE,FINER,FINEST]

 com.sqlstream.plugin.tableupdate.data= [INFO,WARNING,SEVERE,FINE,FINER,FINEST]

More information

For more information about the topics and commands mentioned above, please see the following topics the in the s-Server Streaming SQL Reference Guide:

CREATE FOREIGN DATA WRAPPER
CREATE SERVER
CREATE FOREIGN STREAM
The built-in functions, from ABS to W3C_LOG_PARSE