What are Rowtime Bounds?

<< Click to Display Table of Contents >>

Navigation:  Understanding Streaming SQL Concepts > Time and Streaming Data > Rowtime Bounds > Managing Late Rows >

What are Rowtime Bounds?

Previous pageReturn to chapter overviewNext page

Streams and Time

Before explaining rowtime bounds, let's begin by reviewing the role of time in a data stream.

Streams, Rows and Rowtimes

A stream is a sequence of timestamped data rows. All rows in a stream have the same rowtype (the same list of columns).

The column rowtime is mandatory, and the stream is ordered by rowtime. That means that a row follows all earlier rows (each such row having a smaller rowtime), and precedes all later rows (each such row having a larger rowtime). Stream rows need to be timesorted before being accepted: if an incoming row has a timestamp earlier than rows already-received-and-accepted, that incoming row is rejected. For example, if the stream time is 10:00 (due to receiving a row or a rowtime-bound with 10:00 as its rowtime), then incoming rows with rowtimes of 9:58, 9:47, 9:59 will all be rejected as out of order, even though 9:59 follows 9:47 (Several adjacent rows can have the same rowtime, and can be reordered by application logic, without violating the constraint that rowtimes must not be decreasing.)

Often the rows in a stream represent real-world observations or events, and the rowtime is the actual time of the event. The application that inserts a row can set the rowtime value explicitly; the default value of the rowtime is the system clock time of the insertion.

A named stream is a stream defined in the catalog by a CREATE STREAM statememt. It is a persistent object, accessible by its name in SQL. Applications can INSERT rows into it, or read rows from it:

INSERT into foo (x, y) VALUES(101, 118);

 SELECT STREAM * from foo;

 

Here the query results are the entire contents of stream foo.

Now consider a more complex query, such as

  SELECT STREAM rowtime, (x + y), z  from foo where z > 12;

Past, Present, and Future

Picture a stream as a horizontal line, its timeline, with the past at the right. The rows are dots that appear on the line, each new dot appearing to the left of all its predecessors. There is a natural frontier, the present, which separates the past (at the right, with dots) from the future (to the left, with unknown invisible dots).

pastpresentfuture755

The rowtime of its latest row serves as a natural clock for a stream. We call it the stream clock. For example, when a row with rowtime 10:00 is added to stream S, the stream clock of S becomes 10:00. When the next row is inserted, with rowtime 10:05, the clock jumps to 10:05.

Since an added row cannot be earlier than its predecessors, we know that all future rows of stream S have a rowtime no earlier than the clock of S. That is, the stream clock is a lower bound for future rows. Hence a synonym for stream clock is "rowtime bound".

To restate this: a rowtime bound is an assertion about the future contents of a stream. It states that the next row in the stream will have a rowtime no earlier than t, the value of the bound. This is a mathematical lower bound on the set of future rowtimes. Note that with rowtimes "earlier" is the same as "less", and "later" is the same as "more". If we compare two bounds on the same stream, with values t1 and t2, where t2 > t1, then we call t2 the stronger or stricter bound, because mathematically it is a more restrictive constraint, and hence also a more informative one.

laterpastpresentfuture755

(If the rowtimes in the last example reflect real time, there is a five minute wait between these two rows. But if the rowtimes are historical and all the data is available, the system can process it as fast as possible, and the "stream clock" shows a fictional time that jumps instantaneously from 10:00 to 10:05. In both cases there seems no point to imagining the clock as continuous, and asking about the stream S at 10:01 etc.)

We can add a relational operation to this picture by imagining it as a black box, with one or more input streams going in, at the left side, and a result stream coming out at the right side. As always, the output rows are added to the result stream one at a time and in rowtime order. The output is computed from past and present input rows -- but not from the unknown future inputs.

union2streamsByRowtime2_64NEW

Note that duplicate row times can occur and are acceptable in input or output streams.

To read more about removing duplicate records, based on more than rowtime, see the topics SELECT ALL and SELECT DISTINCT and GROUP BY in the s-Server Streaming SQL Reference Guide.

Rowtime Bounds: a hint about the future

There are situations where a bit of insight into the future is feasible and can make a big difference, especially where rows are merged or grouped.

Here are two simple examples: Grouping by Hours and Merging Two Streams.

Example: Grouping by Hours

Consider a stream of color-coded events, and a black box counting events, with a separate count for each color. Each hour, on the hour, the box outputs the total count for the last hour. (In SQL, you implement this as a streaming GROUP BY hour, where hour is calculated from the rowtime.) On the surface, implementing the black box seems trivial: every hour, as defined by the internal "stream clock," you reset the count. The problem arises in cases where the stream is updated long after the hour closes, say at 6:15 when the hour closed at 5:00. These cases can produce late or inaccurate counts. We get around this problem by implementing a rowtime bound.

But consider the following data:

input stream

rowtime

color

3:01

red

3:01

blue

3:03

blue

3:15

red

3:59

red

4:00

red

4:05

red

4:06

blue

4:30

red

4:45

blue

4:49

blue

6:15

red

The arrival of the row (4:00, red) moves the input clock to 4:00, closing off the period from 3:00 to 4:00. During this period there were 5 input rows, and the counts are (red = 3, blue = 2).

The next period starts at 4:00 and ends just before 5:00. This period also includes 6 rows, with counts (red = 3, blue = 3). However the input stream clock jumps from 4:49 to 6:15, and it is only on seeing the row at 6:15 that the counting operator knows that the period 4:00 -- 5:00 has ended! The operator produces the correct results --- but 1:15 too late! Worse, suppose that 5:00 is the end of the business day and that the next input row doesn't occur until next morning. Now the hourly count is late by hours.

Example: Merging Two Streams

Consider a black box computing the UNION ALL of two streams P and Q: this just means to accept all rows from either input. Of course the output as well as both inputs must be ordered by rowtime, so what the black box does is to merge the two inputs by rowtime: it always picks the earlier input row from P or Q, and copies it to its output stream.

There's a catch: this algorithm is correct but it can get stuck. Input rows can appear intermittently, so there can be a time period when there is a row available from one input but none on the other side.

For example, suppose the input rows arrive with the rowtimes shown in this table:

stream P

stream Q

1:00

1:01

1:03

1:02

1:06

1:04

The merge algorithm produces:

P UNION ALL Q

1:00 from P

1:01 from Q

1:02 from Q

1:03 from P

1:04 from Q

And now it stops, with no input rows on deck from Q. It cannot output "1:06 from P", because for all it knows there may be an earlier row from Q that hasn't arrived yet, say a row at 1:05. On the other hand the next Q row might be later, say 1:10, in which case it would be correct to output "1:05 from P".

The merge algorithm will always wait as soon as one input is exhausted.

Rowtime Bounds Help the Server Produce Timely Data

In fact the system does not need to wait for the next row. It could proceed on less information. In the merging streams example above, if it learns that the next Q rowtime is later than the latest P row, then it can consume the P row immediately. That is to say, when no Q row is available, it can help to have a rowtime bound for Q.

Thus it can be very useful for a data source to advance the rowtime bound explicitly, in anticipation of a future row.

For more detail, see the topic JDBC Driver in the s-Server Integration Guide.

Some relational operators on streams -- notably union, join, and windowed aggregation -- produce output related to the end of a time period. Advancing the rowtime bound lets the system produce this output as early as possible.

as a data source inserts rows into a stream, the rowtime bound automatically increases
the data aource can advance the rowtime bound without inserting a row
relational operators automatically respond to the bound and pass it downstream

Informally, we call the action of advancing the rowtime bound as punctuating the stream. This is an analogy to punctuation in english text: punctuation marks are not words, but affect the meaning of a series of words.

Advancing the Rowtime bound gives early notice about future data in a stream, as well as immediate notice that a time window has ended.

Early notice of a lower bound on future inputs can expedite the ordered union of streams, by guiding the ordered merger of the input rows. In ordered merging, an input row is held back until there is certainty that it is the earliest of all inputs. Time-constraints provide that certainty. Without them, processing could not continue until receipt of an input row with a later rowtime, because there would be no lower bound on unseen future inputs.

Immediate notice that a time window has ended enables immediate progress or results from operations awaiting one more input message. One such operation is windowed join, which may await the "last" transaction of a certain type, such as when merging streams. Another is windowed aggregation, which may await the "last" transaction within a specified time window in order to compute totals and other statistics. Aggregation often uses sliding windows, which move forward in time continuously: e.g., the last ten minutes relative to the current row.

Example Revisited: Grouping By Hours

Let's see how rowtime bound can help with the first example above. At the end of each hour, keeping time by the stream clock, we want to output the value of each counter, and then clear the counters. When the input data is dense in time, we can rely on each new row to advance the stream clock (to its rowtime). A row at 3:00:01 moves the clock to 3:00:01, and implies that the period from 2:00 to 3:00 is ended. The delay of 1 second is acceptable (we assume). But suppose the last input of the day occurs at 4:59. We should recognize the end of the period from 4:00 to 5:00 as soon as possible, that is at 5:00. To cause this, the data source punctuates. At 5:00 it knows there will be no more input rows for the day, so it can advance the rowtime bound past 5:00. (This is an example of a strict bound, "next row is later than 5:00". A normal bound is non-strict, "next row is at least 5:00 or later" But to the server rowtimes are discrete, measured to the millisecond, so a strict bound at 5:00 is the same as a non-strict bound at 5:00:00.001)

In fact, if we know that the earliest possible next input row could be at 9:00 the next day, the data source could advance the bound to that 9:00 (non-strict). The effect is the same.

Example Revisited: Merging Inputs

Now to reconsider the second example. The merge operator is stuck with inputs:

stream P

stream Q

1:06

...

The input clock for Q is still at 1:04, from the last input row from Q. If we knew that the next Q row will arrive with rowtime 1:05, then we know we must wait for it, and output it, On the other hand, if we knew that the next Q row will have rowtime 1:10, we can output the last P row immediately.

If the source of the Q rows knows a little bit about the next row, it can set the rowtime bound, which will be propagated downstream to the merge operator, which will see it as the value of the input clock for Q. The source doesn't need to know the whole row in advance, and it doesn't even need to know the rowtime, it only needs to know a lower bound on the rowtime (that is larger than the rowtime of the latest inserted row, in this case 1:04). Thus punctuation by data sources results in better input clocks and a more efficient merge operator. The same correct output is produced without the advantage of punctuation: but it is delayed.

Rowtime bounds summary

A rowtime bound is a constraint on the rows that will later be inserted into a stream. Rows that are less than the bound will be rejected.
Because the rowtimes in a stream must be non-descending, each row automatically creates a non-strict bound following itself.
Rowtime bounds are never lost. Two rowtime bounds can combine, but if they do, the stronger bound is preserved.
Rowtime bounds can be strict or non-strict. A non-strict bound allows the next row to have the same or greater rowtime, just not less, whereas a strict bound requires the following row only a greater rowtime.
Rowtime bounds do not flush data through the system, but they allow operators to make progress.