Streaming Joins

<< Click to Display Table of Contents >>

Navigation:  Understanding Streaming SQL Concepts > Time and Streaming Data > Rowtime Bounds > Semantics of Rowtime Bounds >

Streaming Joins

Previous pageReturn to chapter overviewNext page

When you use streaming join queries, as with streaming union queries, you need to remember that s-Server always outputs rows in ascending order of rowtime.

You cannot join two entire streams -- that is, find all matching pairs of rows regardless of rowtime -- is not allowed, because it requires unbounded memory to save the entire history of both streams.

You can, though, apply sliding windows to each input stream and to find matching pairs of rows, one from each window. This is the same "window" construct used in windowed aggregation.

Each window join must have a finite size, though its contents vary over time. Windowed joins can benefit from rowtime bounds on their input to emit earlier output. In particular, this helps an outer join by finding unmatched rows as soon as possible.

Like other operators, a windowed join operator sets its output rowtime bound to indicate the earliest possible rowtime for its next output.

When the output is due to a matched pair of input rows, each has its own rowtime. The output rowtime is the later of these times, because the rows could not match until both "existed".

For an outer join, an unmatched row results in an output row. The output rowtime is not the rowtime of the unmatched row: to be consistent with the treatment of matches, it is the time we discover the row to be unmatched, i.e., the time the unmatched row is dropped from its window. Rowtime bounds on the input allows this be discovered as soon as possible.

Examples of Streaming Joins

Join query #1

SELECT STREAM

   ROWTIME, o.orderId, o.ticker,

   o.amount AS orderAmount, t.amount AS tradeAmount

 FROM Orders AS o

 JOIN Trades OVER (RANGE INTERVAL '10' MINUTE FOLLOWING) AS t

 ON o.orderId = t.orderId

 

This query prints all orders matched by a trade within ten minutes of the order being placed. As ever, the output stream must be sorted by rowtime; the question becomes, "Which rowtime?" The query is written in a way that centers on a single row from the Orders stream at a time, joined to a window of rows from the Trades stream. The output rowtime, appropriately, is the rowtime of the single Orders row.

(To see why, apply the definition of the rowtime of a join: the base-time of the windows at the time the match is discovered. The orders row is alone in its window, a row-based window of size 1 with no offset, so its base-time is the rowtime of the orders row.)

Orders matched within ten minutes

Step

Orders

Trades

Output

1

10:00:00 1 ORCL 100



 

10:03:00 2 YHOO 25



2

 

10:02:00 1 ORCL 60


 

 

 

10:00:00 1 ORCL 100 60

 

 

10:04:00 2 YHOO 25


 

 

10:07:30 1 ORCL 30


 

 

 

10:00:00 1 ORCL 100 30

3

 

10:11:00 bound


 

 

 

10:03:00 2 YHOO 25 25

4

10:10:00 bound

 

 

5

 

10:12:00 1 ORCL 10

 

Let's look at the execution of this query in detail.

In step 1, two orders are placed.
In step 2, three trades occur, and they all match an order.

The trades for order #1 placed at 10:00:00 can be output, but not those for order #2 placed at 10:03:00.

Why not? Because it's still possible for a trade for order #1 to arrive before the 10:10:00 deadline.

In step 3, a rowtime bound on the Trades stream tells us that no further trades will match order #1.

The streaming join operator can now remove order #1 from its memory.

It's now possible to output the match on YHOO that was seen in step #2.

In step 4, a rowtime bound on the Orders stream arrives. It has no effect.
In step 5, a trade arrives for order #1, but too late for its window (which ended at 10:00): no match.

Join query #2

SELECT STREAM

   ROWTIME, o.orderId, o.ticker,

   o.amount AS orderAmount, t.amount AS tradeAmount

 FROM Orders OVER (RANGE INTERVAL '10' MINUTE PRECEDING) AS o

 JOIN Trades AS t

 ON o.orderId = t.orderId

 

This query is similar to the one above. Orders are still matched with trades which occur within ten minutes of being placed. The only difference is that the join is performed from the perspective of the Trades stream, and therefore the output rowtime is that of the trade. As a result, the timings when rows are output are quite different:

Step

Orders

Trades

Output

1

10:00:00 1 ORCL 100

 

 

 

10:03:00 2 YHOO 25

 

 

2

 

10:02:00 1 ORCL 60

 

 

 

10:04:00 2 YHOO 25

 

 

 

10:07:30 1 ORCL 30

 

 

 

 

10:02:00 1 ORCL 100 60

3

 

10:11:00 bound

 

4

10:10:00 bound

 

 

 

 

 

10:04:00 2 YHOO 25 25

 

 

 

10:07:30 1 ORCL 100 30

5

 

10:12:00 1 ORCL 10

 

Let's look at the execution in detail.

In step 1, two orders are placed.
In step 2, three trades occur, and they all match an order. We can output the 10:02:00 trade for order #1, but we cannot move onto the 10:04:00 trade yet. How can this be?

We know that there are no more trades before 10:04:00, so why can't we move on to it?

The problem is not with the Trades stream -- it is with the Orders stream.

The streaming join operator thinks that there could be another Order record with orderId=1 before the 10:04:00 deadline;

if such an order were to arrive, it would generate another output row time-stamped 10:02:00.

(We happen to know that there is orderIds are unique, and that can't happen.)

In step 3, a bound for Trades arrives.
In step 4, a bound for Orders arrives. The streaming join operator now knows that it is impossible to see another order pairing with the 3 trade rows seen so far.
In step 5, an unpaired trade arrives. The streaming join operator will keep this trade in memory until the Orders stream reaches 10:12:00, because it would be possible for a matching order to arrive. (We know that the order #1 has already been seen, and that there will not be another, but once again the windowed join operator cannot deduce that.)

Join query #3

Now consider a variation on query #1 as an outer join, with an unmatched trade and an unmatched order in the input data. (The select list has been expanded to clarify the new output rows.)

SELECT STREAM

   ROWTIME, o.orderId, o.ticker, o.amount AS orderAmount,

   t.ticker, t.tradeId, t.amount AS tradeAmount,

 FROM Orders AS o

 OUTER JOIN Trades OVER (RANGE INTERVAL '10' MINUTE FOLLOWING) AS t

 ON o.orderId = t.orderId

 

Step

Orders

Trades

Output

1

10:00:00 0 IBM 110

 

 

 

10:00:00 1 ORCL 100

 

 

 

10:03:00 2 YHOO 25

 

 

2

 

10:02:00 1 ORCL 60

 

 

 

 

10:00:00 1 ORCL 100 1 ORCL 60

 

 

10:04:00 2 YHOO 25

 

 

 

10:07:30 1 ORCL 30

 

 

 

 

10:00:00 1 ORCL 100 1 ORCL 30

3

 

10:11:00 bound

 

 

 

 

10:00:00 0 IBM 110 null null null

 

 

 

10:03:00 2 YHOO 25 2 YHOO 25

4

10:10:00 bound

 

 

5

 

10:12:00 1 ORCL 10

 

 

 

 

10:12:00 null null null 1 ORCL 10

The inputs are the same as before, but we add an umatched order at the start. As the same matches occur, the output should be a superset of that of query #1, with the same rowtimes for the matches.

The unmatched order for IBM results in an output row with null Trade-columns; as with the matches, the output rowtime is the Order rowtime (10:00 for Order #0); but the mismatch is discovered only when the 10 minute window expires for this order, at 10:10. Hence the 10:11 bound on the Orders stream causes the mismatch for IBM to be output, and then (as before) the match for YHOO from 10:03.

The unmatched trade at 10:12 for order 1 results in an output row with null Order-columns. But when? The output rowtime cannot be the time of the matched Orders row, since there isn't any. Instead it is the time of a missing order that could have matched this Trade, which could have occured any time during the 10 minute window preceding 10:12; thus the mismatch is known at the end of the window, that is, immediately at 10:12, which is the correct rowtime.