Real-time congestion detection with Streaming SQL

I am going to discuss a SQLstream application for monitoring traffic flow in real-time. In this application, vehicles with GPS enabled devices transmit vehicle position along with other vehicle information such as speed and engine state. SQLstream receives this information as a real-time data stream and uses streaming SQL analytics to detect and predict the rapid onset of congestion on the road network in real-time.

Streaming SQL for Congestion Detection

The SQLstream application for congestion detection uses a typical streaming SQL processing pipeline. In this case, data is fed into the SQLstream pipeline using our Log File Adapter. SQLstream adapters provide an interface to sources and targets such as databases, log files, network sockets and mail servers. Adapters are built using SQL/MED specification which is part of ANSI SQL standard. In this application, each log file contains the vehicle positions on the road network for the latest minute.

The conditioning pipeline performs data cleansing operations such as rejecting poor quality data (records with missing or out-of-bounds columns) followed by mapping of vehicle positions (lat/long pair) to a “road element” of the road network using a UDX to perform geo-spatial lookups in an external road network database.

The diagram and the example SQL below show our implementation of a streaming SQL pipeline for congestion detection. Each vehicle reports its position and speed every minute. Two consecutive vehicle positions are then used to interpolate vehicle speeds for each road element on the vehicle path between reporting positions. The interpolated speed is based on actual distance traveled by the vehicle between two consecutive reports. The interpolated speed is calculated in a User Defined Transform(UDX). The UDX is written in Java. The UDX also associates a confidence factor with each interpolated speed value based on the position of the road element relative to endpoints of the vehicle path.

Streaming Traffic Flow Analytics
As illustrated below, the analytics pipeline calculates 15, 5, 4, 3, 2 & 1 minute moving average speeds for each road element. Each road element is color coded based on the 15-minute moving average speed. The results are streamed to a Google Earth display.

  CREATE OR REPLACE VIEW "EstimatedReSpeeds" AS
    SELECT STREAM "RE", "reID", "Carriageway", "rePrescribed", "reSpeedLimit",
        SUM("reVehicles") OVER "last1" AS "reVehiclesLast1",
        SUM("reVehicles") OVER "last2" AS "reVehiclesLast2",
        SUM("reVehicles") OVER "last3" AS "reVehiclesLast3",
        SUM("reVehicles") OVER "last4" AS "reVehiclesLast4",
        SUM("reVehicles") OVER "last5" AS "reVehiclesLast5",
        SUM("reVehicles") OVER "last15" AS "reVehiclesLast15",
        SUM("reSpeed" * "reConfidence") OVER "last1" /
        SUM("reConfidence") OVER "last1" AS "reSpeedLast1",
        SUM("reSpeed" * "reConfidence") OVER "last2" /
        SUM("reConfidence") OVER "last2" AS "reSpeedLast2",
        SUM("reSpeed" * "reConfidence") OVER "last3" /
        SUM("reConfidence") OVER "last3" AS "reSpeedLast3",
        SUM("reSpeed" * "reConfidence") OVER "last4" /
        SUM("reConfidence") OVER "last4" AS "reSpeedLast4",
        SUM("reSpeed" * "reConfidence") OVER "last5" /
        SUM("reConfidence") OVER "last5" AS "reSpeedLast5",
        SUM("reSpeed" * "reConfidence") OVER "last15" /
        SUM("reConfidence") OVER "last15" AS "reSpeedLast15"
    FROM "Stage3"
    WINDOW "last1" AS (PARTITION BY "RE"
        RANGE INTERVAL '1' MINUTE PRECEDING),
           "last2" AS (PARTITION BY "RE"
        RANGE INTERVAL '2' MINUTE PRECEDING),
           "last3" AS (PARTITION BY "RE"
        RANGE INTERVAL '3' MINUTE PRECEDING),
           "last4" AS (PARTITION BY "RE"
        RANGE INTERVAL '4' MINUTE PRECEDING),
           "last5" AS (PARTITION BY "RE"
        RANGE INTERVAL '5' MINUTE PRECEDING),
           "last15" AS (PARTITION BY "RE"
        RANGE INTERVAL '15' MINUTE PRECEDING);

Detecting the rapid onset of congestion

Congestion is detected by comparing moving averages for the larger time window with that for the smaller time window. For example, comparing a 2-minute average with a 1-minute average:

  CREATE OR REPLACE VIEW "CongestionRule1" AS
    SELECT STREAM
        –- name, ID, highway name, speed limit etc. for each road element
        "RE", "reID", "Carriageway", "rePrescribed", "reSpeedLimit",
        -- volume of vehicle reports in each time window
        "reVehiclesLast1", "reVehiclesLast2", "reVehiclesLast3",
        "reVehiclesLast4", "reVehiclesLast5", "reVehiclesLast15",
        –- estimated avg speed for each road element
        "reSpeedLast1", "reSpeedLast2", "reSpeedLast3",
        "reSpeedLast4", "reSpeedLast5","reSpeedLast15"
    FROM "EstimatedReSpeeds"
    WHERE "reSpeedLast1" < 0.80 * "reSpeedLast2" AND – slowdown by 20 %
        "reSpeedLast2" < 0.80 * "reSpeedLast3" AND
        "reSpeedLast3" < 0.80 * "reSpeedLast4" AND
        "reSpeedLast4" < 0.80 * "reSpeedLast5" ;

Note that these estimated speeds are over overlapping windows and as such slowdown thresholds are set accordingly. Fine tuning slowdown thresholds and other information, such as the proximity of traffic lights and the volume of vehicle reports in each time window, improves the quality of congestion detection algorithm.

SQLstream Traffic Congestion Detection - Visualization

The Google Earth screenshot illustrates real-time traffic view as well as detected slowdowns as pins. The severity of the slowdown is indicated by different shades of red.