Using Rowtime Bounds with the JDBC Driver

<< Click to Display Table of Contents >>

Navigation:  Integrating SQLstream Blaze with Other Systems > SQLstream JDBC Driver >

Using Rowtime Bounds with the JDBC Driver

Previous pageReturn to chapter overviewNext page

Threads and Blocking

While the JDBC driver has its own internal threadpool, the driver has no threads of its own exposed to the calling code. Instead, it relies upon client application threads to prepare new statements, wait for query results, insert new data, and so on. The driver's internal threads are generally waiting upon select/poll/epoll calls and handle the low level interactions with the network. The data is then handed off to the calling threads for the application to process.

It is a fundamental characteristic of SQLstream s-Server that there may be arbitrarily long intervals between client thread calls into our driver. For example, a data-producing thread might insert a row of data, then block, waiting for another part of the application to produce more data. A data-consuming thread might block in the driver waiting for stream data, thus preventing that thread from doing other useful work. In either case, the thread may fail while executing in application code and never return to the driver for data or an orderly cleanup.

Consider the cursor loop in the receiving messages example described below. The client app's thread might be busy for long intervals of time in between periods of servicing the stream by executing this loop. Processes may be waiting for the client application thread to service the SDP connection before they can write or read more data.

Conversely, in the context of the client app's thread:

next() might block forever waiting for data

next() might endlessly return data and never return false

An application written against the SQLstream JDBC driver must be aware of these system characteristics. SQLstream s-Server presently enables the following possible models of client application interaction with the driver:

1.blocking - wait, possibly forever, for data to arrive

2.timeout - wait for an interval for data to arrive

These two options match a conventional system's use of JDBC; the blocking model is the most common for applications operating on tables that are not expected to block.

Blocking

In this model, all client application calls into the driver will wait forever until data is delivered (for INSERTs) or received (for SELECTs). The client application is responsible for managing its own threads to ensure that other application features are not starved.

To prevent the main application from blocking, the developer must do one of the following:

Place ResultSet processing in its own thread so it can safely block indefinitely.

Use query timeouts to exit the ResultSet.next() if no data is available.

The second option above constitutes "polling" the ResultSet, a technique that allows main application processing to proceed between polls.

Timeout

In this model, you can set per-query timeouts; if the timeout limit is exceeded, an exception is thrown.

For 1-second-granularity per-query timeouts, use Statement.setQueryTimeout. (See the example under Insert.) See http://java.sun.com/j2se/1.5.0/docs/api/java/sql/Statement.html?#setQueryTimeoutint for more details.

For millisecond-granularity per-query timeouts, use StreamingStatement.setQueryTimeoutMillis.

The StreamingStatement interface is an extension to the standard Java Statement. See http://java.sun.com/j2se/1.5.0/docs/api/java/sql/Statement.html for more details.

StreamingPreparedStatement extends the standard Java PreparedStatement interface, and also SQLstream's StreamingStatement. See http://java.sun.com/j2se/1.5.0/docs/api/java/sql/PreparedStatement.html for more details.

All Statement objects created by a SQLstream driver will implement StreamingStatement. All PreparedStatement objects created by a SQLstream driver will implement both StreamingStatement and StreamingPreparedStatement.

Rowtime Bounds: Forcing Timely Output

Another issue that the JDBC driver must deal with is timely output, which relates to the current time of a stream (the stream's ro.

A message stream is a sequence of timestamped messages (or rows): each row has a rowtime, and the sequence is ordered by rowtime. The current time of a stream is the rowtime of the latest row. When a relational operator executes, rows are passing downstream through that operator. The current time of its output stream(s) cannot be later than the latest of the rowtimes in all of its inputs. (It can be earlier, as discussed below.)

For efficiency, current time is implicit in the data. (The stream is implemented as an "asynchronous" series of messages). But this means the current time of a stream advances only when the next message arrives. This can be a problem for certain operations, which pause waiting for one more input message. Some examples are merging and rolling windows.

Merging and Rolling Windows

Multiple inputs merge into one output stream when several clients insert into the same named stream; or when the UNION ALL operator executes. The issue is the same: the output row is the earliest input row; that is, it is taken from the input stream with the earliest current time. But to know the current time of all input streams, the system must wait for a row to arrive on each. This can cause the operator to wait and introduce a delay in downstream processing. Note that this happens only when there is a noticeable real time gap between input rows, or else the waiting is negligible or there is almost always a next row already buffered up.

A window of a stream is a sequential block of messages. Rolling windows (also called sliding windows) means a partitioning of a stream into a series of windows: for instance, a series that groups together all messages with a rowtime in the same hour (called rolling one hour windows).

An example rolling-window query is to find totals and other statistics for the last hour of a stream of trades. This, too, has an issue with timeliness. When does a window end? Only when the next window begins, when a row arrives that belongs to the next window. Here again the system delays, waiting for a row to arrive.

To solve the problems of rolling windows and merging, a mechanism is needed to advance the current time of a stream explicitly and immediately, without waiting for an additional message. Generally, this solution is needed infrequently, because there is usually a next message, and advancing the current time implicitly works well.

Setting and Getting Rowtime Bounds

For rolling hourly windows, the client inserting the data would also set a rowtime bound (constraint) on the hour, in order to close off an hourly window. The only information this constraint carries is a rowtime, and we extend the PreparedStatement interface to handle such constraints.

Note that the rowtime bounds are supplied by the data source. By setting a rowtime bound with a value of noon, a data source is asserting a constraint on the future of the stream: noon is a lower bound to all forthcoming rows. A down-to-earth interpretation is that the source is announcing that it has nothing more to say until noon.

The rowtime bound applies to the inserted data, not to the rolling averages query that makes use of it, nor to any client reading from that query.

The JDBC driver does not know the actual ROWTIMEs of the rows passing through the driver.

To set a stream's rowtime bound, the client program can use StreamingPreparedStatement.setRowtimeBound().

To get (acquire) the most recently set rowtime bound, use StreamingStatement.getRowtimeBound().

Canceling Statements

The JDBC 3.0 spec does not mention canceling statements. The SQLstream JDBC driver implements Statement.cancel as described in the Sun document guidance for JDBC driver writers. The Sun document describes Statement.cancel as being primarily for multithreaded processing where one thread needs to cancel a long-running statement in another thread. The assumption is that a "long-running statement" is taking a long time to compute, either to perform its insert/update or to return a row, and that the thread running that statement is blocked, requiring the intervention of a second thread.

The SQLstream JDBC driver also implements Statement.cancel in a single-threaded context to interrupt long-running PreparedStatements, while retaining the streaming machinery necessary to handle new values that may be supplied to the statement's bind variables. When a client application supplies new parameter values and re-executes, the statement is already prepared and the streaming machinery is ready to handle inputs satisfying the new values.

Here is a sequence description of such an app/driver interaction:

Application Actions

Driver Actions

Application prepares query with bind params.

Driver prepares statement.

Application supplies param values and executes PreparedStatement.

Driver executes statementId w/ first set of param values.

Application gets first ResultSet and reads rows.

Driver reads data from SDP stream.

Application cancels PreparedStatement.

Driver cancels statementId;

Driver flushes SDP stream by reading and discarding data until EOS;

Driver closes first ResultSet, but SDP stream remains open for Statement.

Application supplies 2nd set of param values and re-executes PreparedStatement.

Driver executes statementId w/ 2nd set of param values.

Application gets 2nd ResultSet and reads rows.

Driver reads data from same SDP stream.

Application closes PreparedStatement.

Driver shuts down SDP stream;

Driver disposes statementId;

Driver closes ResultSet, if left open .

A typical client application may not (and is not required to) explicitly cancel the Java PreparedStatement in between executions. More likely the application simply stops reading incoming rows, supplies the 2nd set of parameter values, and re-executes. In this case, the driver implicitly cancels the running statement and closes the first ResultSet before performing the 2nd execute.

Sending Rowtime Bounds

To illustrate the use of rowtime bounds, consider the example of a rolling hourly summary query against a stream of trades.

-- stream definition

CREATE STREAM trades

      ( rowtime TIMESTAMP

      , ticker  VARCHAR(6)

      , shares  INTEGER

      , price   DECIMAL

      );

-- rolling window query, benefits from rowtime bounds

SELECT STREAM hour, ticker, SUM(shares), AVG(price) FROM

  (SELECT STREAM FLOOR(rowtime TO HOUR) as hour

        , ticker, shares, price

     FROM trades)

GROUP by hour, ticker;

 

A client application writes data into the stream trades by executing a prepared INSERT statement.

java.sql.PreparedStatement pstmt = connection.prepareStatement(

   "INSERT INTO trades(rowtime, ticker, shares, price) VALUES (?,?,?,?)"

   ); ...

while (true) {

   -- somehow check for data

   if (haveData) {

       String ticker;

       int shares;

       java.sql.Decimal price;

       ...

       -- bind the columns

       pstmt.setTimestamp(1, new java.sql.Timestamp()); -- rowtime = current time

       stmt.setString(2, ticker);

       pstmt.setInteger(3, shares);

       pstmt.setDecimal(4, price);

       -- insert another row

       pstmt.executeUpdate();

   } else {

       -- somehow check for new hour and whether idle since hour changed

       if (needRowtimeBound) {

           -- downcast the PreparedStatement,

           -- to use the SQLstream extended interface

           StreamingStatement sstmt = (StreamingStatement) pstmt;

           sstmt.setRowtimeBound(new Timestamp(System.currentTimeMillis()));

       }

   }

}

 

In order to cooperate with the rolling hourly averages, the client is well-mannered and chimes in every hour on the hour to announce, "no more inserts for the current hour," using setRowtimeBound.