Windowed Aggregation on Streams

<< Click to Display Table of Contents >>

Navigation:  Analyzing Data in s-Server >

Windowed Aggregation on Streams

Previous pageReturn to chapter overviewNext page

In analyzing streaming data, you will often make use of what is called windowed aggregation. Windowed aggregation performs an analytic function across a window specified by either time, such as "one hour proceeding" or rows, such as "the last six rows."

Time-Based Windows

A time-based window is a finite window defined by a rowtime interval. The window's defining criteria specify a finite set of rows, using a rowtime-based specification. At any arbitrary wall-clock-time, the number of rows to be found in that window can vary, based on the number of rows that have arrived whose rowtimes are within the window's defined period. (The SQL standard calls this a "logical" window, and calls a Row-based Window a "physical" window.)

For example, RANGE INTERVAL '1' HOUR PRECEDING specifies that the window contains all rows whose ROWTIMEs are within the hour preceding the stream's current time. (That time is usually the rowtime of the most recent row received.)

For time-based windows on streams, analytic functions are normally only able to identify the complete set of rows (and hence calculate a result) once a row arrives that falls outside the later bound of the current window. For example, in the case of a window that is RANGE INTERVAL '1' MINUTE FOLLOWING, the arrival of a row with a rowtime more than one minute later than that of the row for which the window is being evaluated would indicate that there will be no more rows in the window, and the query can return a result.

Alternately, you can use a rowtime bound can be used to indicate that no more rows will arrive within a given window, enabling the query to return a result.

Row-based Windows

A row-based window is specified by a fixed number of rows; for example ROWS 10 PRECEDING specifies that only the latest 10 rows be included in the window. (The SQL standard calls this a "physical" window, and calls a Time-based Window a "logical" window.)

For row-based windows on streams, such as ROWS 3 PRECEDING, rowtime bounds have no effect on windows (and hence, on the evaluation of analytic functions).

See below for examples of rowtime bounds in windowed aggregation.

Assume the following information flowing through the stream WEATHERSTREAM:

ROWTIME

CITY

TEMP

2018-11-01 01:00:00.0

Denver

29

2018-11-01 01:00:00.0

Anchorage

2

2018-11-01 06:00:00.0

Miami

65

2018-11-01 07:00:00.0

Denver

32

2018-11-01 09:00:00.0

Anchorage

9

2018-11-01 13:00:00.0

Denver

50

2018-11-01 17:00:00.0

Anchorage

10

2018-11-01 18:00:00.0

Miami

71

2018-11-01 19:00:00.0

Denver

43

2018-11-02 01:00:00.0

Anchorage

4

2018-11-02 01:00:00.0

Denver

39

2018-11-02 07:00:00.0

Denver

46

2018-11-02 09:00:00.0

Anchorage

3

2018-11-02 13:00:00.0

Denver

56

2018-11-02 17:00:00.0

Anchorage

2

2018-11-02 19:00:00.0

Denver

50

2018-11-03 01:00:00.0

Denver

36

2018-11-03 01:00:00.0

Anchorage

1

 

Let's say we want to find the minimum and maximum temperature recorded in the 24-hour period prior to any given reading, globally, regardless of city. To do this, we define a window of RANGE INTERVAL '1' DAY PRECEDING, and use it in the OVER clause for the MIN and MAX analytic functions:

SELECT STREAM

       ROWTIME,

       MIN(TEMP) OVER W1 AS WMIN_TEMP,

       MAX(TEMP) OVER W1 AS WMAX_TEMP

FROM WEATHERSTREAM

WINDOW W1 AS (

   RANGE INTERVAL '1' DAY PRECEDING

);

Results:

ROWTIME

WMIN_TEMP

WMAX_TEMP

2018-11-01 01:00:00.0

29

29

2018-11-01 01:00:00.0

2

29

2018-11-01 06:00:00.0

2

65

2018-11-01 07:00:00.0

2

65

2018-11-01 09:00:00.0

2

65

2018-11-01 13:00:00.0

2

65

2018-11-01 17:00:00.0

2

65

2018-11-01 18:00:00.0

2

71

2018-11-01 19:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 07:00:00.0

4

71

2018-11-02 09:00:00.0

3

71

2018-11-02 13:00:00.0

3

71

2018-11-02 17:00:00.0

2

71

2018-11-02 19:00:00.0

2

56

2018-11-03 01:00:00.0

2

56

2018-11-03 01:00:00.0

1

56

 

Now, let's assume we want to find the minimum, maximum, and average temperature recorded in the 24 hour period prior to any given reading, broken down by city. To do this, we add a PARTITION BY clause on CITY to the window specification, and add the AVG analytic function over the same window to the selection list:

SELECT STREAM

       ROWTIME,

       CITY,

       MIN(TEMP) over W1 AS WMIN_TEMP,

       MAX(TEMP) over W1 AS WMAX_TEMP,

       AVG(TEMP) over W1 AS WAVG_TEMP

FROM AGGTEST.WEATHERSTREAM

WINDOW W1 AS (

       PARTITION BY CITY

       RANGE INTERVAL '1' DAY PRECEDING

);

Results:

ROWTIME

CITY

WMIN_TEMP

WMAX_TEMP

WAVG_TEMP

2018-11-01 01:00:00.0

Denver

29

29

29

2018-11-01 01:00:00.0

Anchorage

2

2

2

2018-11-01 06:00:00.0

Miami

65

65

65

2018-11-01 07:00:00.0

Denver

29

32

30

2018-11-01 09:00:00.0

Anchorage

2

9

5

2018-11-01 13:00:00.0

Denver

29

50

37

2018-11-01 17:00:00.0

Anchorage

2

10

7

2018-11-01 18:00:00.0

Miami

65

71

68

2018-11-01 19:00:00.0

Denver

29

50

38

2018-11-02 01:00:00.0

Anchorage

2

10

6

2018-11-02 01:00:00.0

Denver

29

50

38

2018-11-02 07:00:00.0

Denver

32

50

42

2018-11-02 09:00:00.0

Anchorage

3

10

6

2018-11-02 13:00:00.0

Denver

39

56

46

2018-11-02 17:00:00.0

Anchorage

2

10

4

2018-11-02 19:00:00.0

Denver

39

56

46

2018-11-03 01:00:00.0

Denver

36

56

45

2018-11-03 01:00:00.0

Anchorage

1

4

2

Rowtime Bounds and Windowed Aggregation

This is an example of a windowed aggregate query:

SELECT STREAM ROWTIME, ticker, amount, SUM(amount)

   OVER (

       PARTITION BY ticker

       RANGE INTERVAL '1' HOUR PRECEDING)

AS hourlyVolume

FROM Trades

 

Because this is a query on a stream, rows pop out of this query as soon as they go in. For example, given the inputs:

Trades: IBM 10 10 10:00:00

Trades: ORCL 20 10:10:00

Trades.bound: 10:15:00

Trades: ORCL 15 10:25:00

Trades: IBM 30 11:05:00

Trades.bound: 11:10:00

 

the output will be:

Trades: IBM 10 10 10:00:00

Trades: ORCL 20 20 10:10:00

Trades.bound: 10:15:00

Trades: ORCL 15 35 10:25:00

Trades: IBM 30 30 11:05:00

Trades.bound: 11:10:00

 

The rows still hang around behind the scenes for an hour, and thus the second ORCL row output has a total of 35; but the original IBM trade falls outside the "hour preceding" window, and so is excluded from the IBM sum.

Windowed-Aggregation Specifications

windowed_aggregn06

Interval Clause

interval__endpoint_04

Example:

Some business problems seem to need totals over the whole history of a stream, but this is usually not practical to compute. However, such business problems are often solvable by looking at the last day, the last hour, or the last N records. Sets of such records are called windowed aggregates.

They are easy to compute in a stream database, and can be expressed in ANSI (SQL:2008) standard SQL as follows:

  SELECT STREAM ticker,

     avg(price  OVER  lastHour AS avgPrice,

     max(price) OVER  lastHour AS maxPrice

  FROM Bids

  WINDOW lastHour AS  (

     PARTITION BY ticker

     RANGE INTERVAL '1' HOUR PRECEDING)

 

Note: The Interval_clause must be of an appropriate type:

An integer literal with ROWS
A numeric value for RANGE over a numeric column
An INTERVAL for a RANGE over a date/time/timestamp