Semantics of Rowtime Bounds

<< Click to Display Table of Contents >>

Navigation:  Understanding Streaming SQL Concepts > Time and Streaming Data > Rowtime Bounds >

Semantics of Rowtime Bounds

Previous pageReturn to chapter overviewNext page

Rowtime bounds never change the results of a query, but they do help the system provide the results sooner. The general rule is that the value of a streaming SQL expression (such as a query) depends on the SQL operations and their operands (the data), but not on any rowtime bounds that accompany the data.

Some operators ignore rowtime bounds. Some operators make use of rowtime bounds to produce earlier results, but never to produce different results.

In this section we examine the semantics of rowtime bounds. First we review what rowtime bounds are and how they flow through the system. Next we review why rowtime bounds are useful. Finally we consider how relational operators deal with rowtime bounds.

Implicit Rowtime Bounds

Because the rows of data in a stream are ascending in time-value, the existence of a row implicitly states that there will not later be a row with an earlier rowtime. It is as if every row in a stream is followed by a time bound with the same rowtime as the rowtime of that row:

ORDERS: 'ORCL', rowtime '2006-10-10 10:00:00'

ORDERS.bound: rowtime '2006-10-10 10:00:00'

 

As rows move through the system, sometimes they "catch up" with earlier rows in the same stream. The rowtime of each later row must be greater than or equal to the bound of the preceding row. The result is a group of rows followed by a single bound. If, for example, rows occur representing three orders, the rowtime for the implicit bound is automatically that of the last occurring row:

ORDERS: 'ORCL', rowtime '2006-09-11 12:00:00'

ORDERS: 'MSFT', rowtime '2006-09-11 12:01:00'

ORDERS: 'IBM', rowtime '2006-09-11 12:02:00'

 

Hence at any moment in time, a stream contains groups of rows, each implicitly followed by a bound.

Merging a bound onto a train of rows

A bound moves through the system as a self-sufficient unit. If it encounters or catches up with a previous train of rows, it supersedes that train's (weaker) bound. The train will look like this:

ORDERS: 'ORCL', rowtime '2006-09-11 12:00:00'

ORDERS: 'MSFT', rowtime '2006-09-11 12:01:00'

ORDERS: 'IBM', rowtime '2006-09-11 12:02:00'

ORDERS.bound: rowtime '2006-09-11 12:02:00'

Splitting trains of rows

Sometimes the system splits a train of rows in two. (This can happen when an internal buffer fills up)

When a train is split, the first train will end with the strongest bound it can, namely, the rowtime of the first row in the next train:

ORDERS: 'ORCL', rowtime '2006-10-10 10:00:00'

ORDERS: 'MSFT', rowtime '2006-10-10 10:01:00'

ORDERS.bound: rowtime '2006-10-10 10:02:00'

ORDERS: 'IBM', rowtime '2006-10-10 10:02:00'

ORDERS.bound: rowtime '2006-10-10 10:05:00'

Eliminating rows

A filter condition can eliminate rows from a stream, but bounds are never eliminated. For example, if the previous two trains passed through the condition WHERE ticker = 'ORCL', the resulting train would look like this:

ORDERS: 'ORCL', rowtime '2006-10-10 10:00:00'

ORDERS.bound: rowtime '2006-10-10 10:02:00'

ORDERS.bound: rowtime '2006-10-10 10:05:00'

When Rowtime Bounds Matter

A streaming query will automatically take advantage of any available rowtime bounds to produce more timely output. As a stream of rows is processed, rowtime bounds accompany the data as described in the previous section.

The important rowtime bounds -- those that carry extra information -- are the extra bounds (nicknamed "punctuation") that a data source adds as it inserts rows. These give advance notice of future data: namely, a lower bound on the rowtime of all future rows inserted. Asserting the bound t means that, for the next inserted row r, the field r.rowtime will be at least t. Other ways to express this are that the "clock" of the stream of inserted rows has jumped forward to t, or that the data source is going to be "quiet" until time t.

Important notes:

s-Server trusts that the rowtime bound is valid. If the data source subsequently inserts a row with rowtime less than the bound, the s-Server rejects the row and reports an error.
Every valid inserted row must have a rowtime not less than the stream clock. Hence every row either repeats or advances the stream clock,
Data sources should track inserted rowtimes and estimate the rowtime of future inserts. Estimating the earliest rowtime of future inserts (such as the opening of the next trading day) will help s-Server produce timely output.
Relational operators also advance their output streams. They advance the rowtime bound of their output stream after producing a train of rows, before an apparent pause, and the bound they set is a prediction and a promise about their next output row.

Eclipsed Rowtime Bounds

A rowtime bound may convey important extra information about a data stream, but the importance of the bound is short-lived. As soon as the next row or the next bound arrives, the older bound is no longer useful: that is, the rowtime bound is eclipsed by the new information.

Since all rows in S are in rowtime order, any row r with rowtime b.t functions as an explicit announcement that no future row will have a rowtime earlier than r.t. Every row advances the rowtime bound unless it has the same rowtime as its predecessor.

As a result, any rowtime bound is discarded once a later rowtime is made available either through a newly arrived row or another rowtime bound. All older bounds are eclipsed by expicit rowtimes and discarded.

Rowtime bounds are most useful when rows will be followed by a known gap in time. They tell the system to not wait for a new row to begin processing.

Splitting/Merging Rows and Rowtimes

At times, two trains of rows may join up and form a single train, or a single train may be split in two. In the first case the bound of the earlier train is eclipsed. In the second case a gap has been introduced, and the new early train will be given a rowtime bound, namely, the rowtime of the first row in the second, later train.

This means that rowtime bounds are "not conserved." But the information they carried is not lost, and so their effect is not lost.

Strict and non-strict bounds

All of the bounds discussed so far have been non-strict. For example, the bound

ORDERS.bound: rowtime '2006-10-10 10:00:00'

 

means 'There is not going to be a row in this stream with rowtime before 10:00:00'. In other words, each row must be 10:00:00 or after. The 'or after' makes the bound non-strict. This is similar to the SQL standard OVER operator for a 1-hour window, which contains both of its end points. Thus, a trade at 12 noon would fall into both the 11-12 window and the 12-1 window.

However, certain operators require strict bounds in order to proceed. A strict bound would say 'There is not going to be a row in this stream with rowtime at or before 10:00:00'. In other words, with a strict bound, each row must be strictly after 10:00:00. For example, the windowed aggregation operator cannot produce a summary for the hour 11:00:00 to 12:00:00 until it knows it has seen the last trade occurring on the stroke of 12:00:00.

In SQLstream rowtimes have finite precision (namely, 1 msec). A strict bound at time t is equivalent to a non-strict bound t + 1 msec.

Relational Operators and Rowtime Bounds

Rowtime bounds help data flow through a SQLstream system. They are a form of contract, giving the consumer a benefit (more timely data) if the provider accepts a constraint (to only produce rows with a given rowtime or later).

Understanding how rowtime bounds affect query processing requires looking at each of the stream-processing operators in turn.

Row-by-Row operators

Many operators act on one input row at a time, transforming it into a single output row. For example:

A PROJECT operator copies certain columns while suppressing others.
An expression calculator computes the value of a scalar expression over columns, such as the sum of two numeric column values. Usually the operator passes through the rowtime unchanged.

In this case rowtime bounds are passed through unchanged. In precise terms, a lower bound on future input rows is also a valid lower bound on future output rows. In the course of producing output, the operator passes a rowtime bound.

The same is true for filters (WHERE clauses in a SELECT statement), which are a special kind of row-by-row operator. A filter rejects some rows and passes others, but it does pass all rowtime bounds. This is the correct behavior because a rowtime bound is an assertion about all future rows. To be precise, the bound is a mathematical lower bound on the rowtime of all future rows to arrive at the filter, so it must also be a valid lower bound on any subset of future rows, including the subset of rows tha the filter will pass. The filter operator doesn't change rowtimes, so the same bound applies to the output of the filter.

(Note that there may be a better, ie larger bound. The best possible bound is the actual rowtime of the next output row, though this is generally not yet known at the time the bound is propagated.)

Another special case is a row-by-row operator that replaces the input rowtime by a calculated expression. Since the output stream must be ordered by rowtime, this expression must be a monotonic increasing function of rowtime. For example:

SELECT rowtime + 100 AS rowtime, ....

 

Here the monotonic transformation is a simple shift by 100 msecs. The discussion above about bounds still holds, provided that rowtime bounds are shifted the same way as rowtimes.

To summarize: row-by-row operators pass on rowtime bounds unchanged, unless they change rowtimes.

Merging Streams

Input streams are merged in two cases:

By the UNION ALL operator
Whenever several statements insert into the same named stream

In both cases the system merges the input streams by rowtime, so that the output stream is correctly ordered. The merge algorithm needs to know the next rowtime available on each input stream. When the next row to output has not yet arrived on an input stream, the merge must wait.

Getting rowtime bounds on all input stream can avoid some of these delays.

UNION ALL and rowtime bounds

The UNION ALL operator needs to produce its output in ascending order. Consider the streaming Union operator generated to implement the query

SELECT STREAM * FROM Orders

 UNION ALL

SELECT STREAM * FROM Trades

 

and suppose that the Union operator has already seen input

Orders: ORCL 10:01:00

Orders: MSFT 10:04:00

Trades: YHOO 10:02:00

Trades: IBM 10:03:00

 

and produced output

Union: ORCL 10:01:00

Union: YHOO 10:02:00

Union: IBM 10:03:00

 

The following row and bound:

Orders: MSFT 10:04:00

Orders.bound: 10:06:00

 

cannot flow through the union yet. To see why, imagine that a row with an earlier rowtime subsequently arrives on the Trades stream:

Trades: GOOG 10:03:30

 

If the union had already emitted the MSFT 10:04:00 record, it would now be forced to output a row out of order!

So, in order to proceed, the union needs the assurance that no row with rowtime less than 10:04:00 will ever arrive in the Trades stream. In short, it needs a rowtime bound. Such a rowtime bound might arrive on its own:

Trades.bound: 10:05:00

 

or be implied by a row with a later rowtime:

Trades: IBM 10:05:30

 

Named Streams and Rowtime Bounds

When several clients are simultaneously inserting rows into the same named stream, the system has to merge the inputs by rowtime. This gets tricky when the data sources come and go.

Simultaneous Inserts

Consider a stream definition such as CREATE STREAM Orders (ticker VARCHAR(10));

and two producers each periodically executing the prepared statement INSERT INTO Orders (rowtime, ticker) VALUES (?, ?);

The behavior is similar to union: a row will only flow out of the Orders stream with a particular rowtime when both of the producers have sent a row or a rowtime bound up to at least that rowtime.

One outcome is that a stream seems to run at the speed of its slowest producer, which may be inconvenient when one producer is significantly slower than the others. When they are implemented, timeliness constraints will mitigate this effect.

The "slowest producer" effect is yet another example of a contract: In this case, open producers benefit by being given as much time as they need to produce the next row. Consumers, however, suffer by possibly being required to wait an arbitrarily long time for a given row to appear. Also, the producer benefit from this contract may not help a producer that has just joined a stream: If the producer has a row at rowtime 11:00:00 and the stream's most recent row was timestamped 11:10:00, that producer's row is simply out of sequence, and the producer must discard the row.

Producers Entering and Leaving a Stream

When a named stream is created, it has no rowtime bound: the first row inserted can have any rowtime, and this rowtime then becomes the rowtime bound of the stream. The next row cannot be earlier, whether inserted by the same or a new producer.

Later, whenever a new producer joins, the stream's rowtime bound is the time of the latest row inserted. The newcomer can insert a row only if its rowtime is the same as, or later than, the stream bound. Note that the newcomer can barge in ahead of the other producers if its row is earlier than theirs. The point, of course, is to ensure that the rows in the stream are in rowtime order.

Here is a concrete example: stream S is created (and has no bound). Producer P1 arrives and asserts its bound is 10:00. The stream bound is still undefined, since no actual row has been inserted yet.

Then P1 inserts a row with rowtime 10:00, consistent with its declared bound. The row enters the stream and the stream bound becomes 10:00.

Next, producer P2 arrives, asserting its bound is 10:05. This is acceptable, as it is later than 10:00. If P2 tries then to insert a row with rowtime 10:05, it may have to wait (if P1 or a newcomer has earlier rows), but eventually it will succeed. On the other hand, if P2 tried to insert a row with rowtime 9:59, it would be rejected as a late row.

If P1 now closes and detaches from the stream, then P2 is the only source, so its 10:05 row is next up and enters the stream, advancing the stream clock to 10:05. The departure of input P1 let the stream advance: while it was still attached, with a bound of only 10:00, there was a chance it would produce the next row (with any time t such that 10:00 ≤ t < 10:05).

If all inputs close and detach, the stream "remembers" its rowtime bound, so that a producer P3 which appears later cannnot insert a late row.