# Calculating Decaying Averages in Streaming SQL

At SQLstream we have a comprehensive implementation of standard SQL windowing operations such as `SUM`

, `COUNT`

and `AVG`

. Recently though we needed a more sophisticated function for a decaying weighted average that would emphasize more recent samples over older samples. We implemented a new `EXP_AVG()`

operation, as shown in this example query:

SELECT rowtime, ticker, price, EXP_AVG(price, INTERVAL '10' SECOND) OVER w FROM t WINDOW w AS (PARTITION BY ticker ORDER BY rowtime RANGE INTERVAL '30' SECOND PRECEDING);

`EXP_AVG`

takes a value expression and an interval constant half life. In this example, two samples within the `WINDOW`

are separated by 10 seconds, the older one will be given half as much weight as the newer one.

### How would this look in standard SQL without an EXP_AVG function?

I thought it would be interesting to look at how a decaying average would be implemented in SQL without an `EXP_AVG`

operation, as under the covers we implement it using standard SQL windowed sums. If we knew that the times for our rows would always fall under a narrow range, we would increase the weights of the samples as time goes forwards and simply scale down the total (rather than lowering the weights of samples as time goes backwards from the most recent sample). This would be straightforward. The SQL would look something like this.

First, we need a function to turn our time based units into something on which we can do arithmetic:

CREATE FUNCTION toSeconds(t TIMESTAMP, offset TIMESTAMP) RETURNS DECIMAL(12,3) CONTAINS SQL RETURN CAST((t - offset) SECOND(9,3) AS DECIMAL(12,3));

Then, we need a function for calculating exponential weight:

CREATE FUNCTION expWeight (seconds DOUBLE, halfLife DOUBLE) RETURNS DOUBLE CONTAINS SQL RETURN EXP(seconds * (LN(2)/halfLife));

The complete SQL will then be:

SELECT rowtime, ticker, SUM(price * expWeight(toSeconds(rowtime, OFFSET), 10)) OVER w SUM(expWeight(toSeconds(rowtime, OFFSET), 10)) OVER w AS avgPrice FROM t WINDOW w AS (PARTITION BY ticker ORDER BY rowtime RANGE INTERVAL '30' SECOND PRECEDING);

Unfortunately this would not work in practice as the `expWeight`

function would overflow once the row times got more than a few halfLifes advanced from `OFFSET`

. We can’t reset the offset as long as there are any non zero values still in the window. This gives us our out. What we can do is partition the incoming values into two windowed aggregates, always sending zeros to one of them and switching and reseting the offset when the window for that aggregate fills with zeros, this way we’re never summing values whose weights are calculated using different starting offsets.

Here’s the SQL for this. We’ll need some more functions describing when and how we reset our offsets. We’ll need an arbitrary reference for our time calculations:

CREATE FUNCTION toSeconds(t TIMESTAMP) RETURNS DECIMAL(12,3) CONTAINS SQL RETURN CAST((t - TIMESTAMP '1970-01-01 00:00:00') SECOND(9,3) AS DECIMAL(12,3));

To use modulo arithmetic in SQL, convert to an integer:

CREATE FUNCTION toMillis(t TIMESTAMP) RETURNS BIGINT CONTAINS SQL RETURN CAST(toSeconds(t))*1000 aS BIGINT);

We’ll want to partition time into window sized epochs starting at our arbitrary time reference:

CREATE FUNCTION isEvenEpoch (t TIMESTAMP, windowSeconds INT) RETURNS BOOLEAN CONTAINS SQL RETURN MOD(toMillis(t), windowSeconds*2000)< windowSeconds*1000;

We’ll use the start of each epoch as the offset for all rows that fall in that epoch:

CREATE FUNCTION epochStart (t TIMESTAMP, windowSeconds INT) RETURNS decimal(12,3) CONTAINS SQL RETURN CAST(MOD(toMillis(t), windowSeconds*1000) AS DECIMAL(12,3))/1000;

The following two functions are used to correct the inserting of zeros into a window and all the non zero values still in the window are from the previous epoch:

CREATE FUNCTION evenAgingFactor (t TIMESTAMP, windowSeconds INT, halflife DOUBLE) RETURNS DOUBLE CONTAINS SQL RETURN CASE WHEN isEvenEpoch(t, windowSeconds) THEN 1 ELSE expWeight(-windowSeconds, halflife) END;

CREATE FUNCTION oddAgingFactor (t TIMESTAMP, windowSeconds INT, halflife DOUBLE) RETURNS double CONTAINS SQL RETURN CASE WHEN isEvenEpoch(t, windowSeconds) THEN expWeight(-windowSeconds, halflife) ELSE 1 END;

The complete SQL that uses the two window approach to calculate the decaying average using a 30 second window and a 10 second half life is:

SELECT rowtime, ticker, (oddAgingFactor(rowtime, 30, 10) * SUM(CASE WHEN isEvenEpoch(rowtime, 30) THEN 0 ELSE price * expWeight(epochStart(rowtime, 30), 10) END) OVER w + evenAgingFactor(rowtime, 30, 10) * SUM(CASE WHEN isEvenEpoch(rowtime, 30) THEN price * expWeight(epochStart(rowtime, 30), 10) ELSE 0 END) OVER w) / (oddAgingFactor(rowtime, 30, 10) * SUM(CASE WHEN isEvenEpoch(rowtime, 30) THEN 0 ELSE expWeight(epochStart(rowtime, 30), 10) END) OVER w + evenAgingFactor(rowtime, 30, 10) * SUM(CASE WHEN isEvenEpoch(rowtime, 30) THEN expWeight(epochStart(rowtime, 30), 10) ELSE 0 END) OVER w) as avgPrice FROM t WINDOW w AS (PARTITION BY TICKER ORDER BY rowtime RANGE INTERVAL '30' SECOND PRECEDING);

To understand how this would work in practice lets assume we start our query at 12:00. We’ll only look at the numerator of our query as the denominator uses the same technique to calculate a weighted count.

Time | Window A state | Window B state |
---|---|---|

12:00:00 | Empty | Empty |

12:00:00 – 12:00:30 | Values with increasing weights being added. | Zeros being added. |

12:00:30 | Has weighted sum * expWeight(30,10) | Zero |

12:00:30 – 12:01:00 | Zeros being added. Will have older part of weighted sum. | Values with increasing weights being added. Will have newer part of weighted sum. |

12:01:00 | Zero. All non zero values will have been aged out. We can safely reset our scaling factor. | Has weighted sum * expWeight(30,10) |

With care to apply appropriate scaling to the parts this SQL can be used to calculate a weighted average using standard windowed operations. However, I think you’ll agree, it’s easier with our `EXP_AVG()`

operation.