Time and Streaming Data
Streaming SQL is inherently time-based, and several factors determine when results are emitted. This section, Time and Streaming Data, describes time-related issues; methods to determine whether results can be delivered earlier; and corresponding system and query changes.
Let's suppose that you have defined some streams, written some streaming queries, and started to write records into those streams. But no records are coming out yet. You're probably wondering: Is the system slow? Did I write the query wrong?
Several factors determine when rows are emitted from a query. SQLstream runs in a correctness mode, which means that it will wait until all of the data necessary to give a correct answer are available. This form of delay is called inherent delay because it is caused directly by the SQL semantics or by a business rule requirement. An example is discussed in the section Inherent delay due to SQL query semantics; additional factors related to correctness mode are discussed in the Delay due to failures and slow writers section.
Other factors affecting when rows are emitted include the following:
It should be remembered that, with just one exception, SQLstream's queries have nothing to do with system time, sometimes called wallclock time because it will be the time shown on the clock on the wall of the data center. The semantics of each query are driven by data time, that is, the ROWTIME values and rowtime bounds in the streams on which that query depends. The difference between data time and wallclock time is called wallclock delay, and time zone differences also relate to wallclock delay, as discussed later at the indicated links.
There are also various kinds of system delay, such as the system running slowly because the CPU is overloaded or because the network is slow.
SQLstream's extended SQL contains constructs that allow you to represent delays and time windows. Those constructs tend to match the business problem being solved, so it is usually obvious that a query cannot be answered without an inherent delay.
For example, one can write a query that finds all orders that have not shipped within an hour of being placed. That query cannot output an order placed at 10:00 until 11:00 has arrived without that order having shipped. The query to find such orders will use a streaming join between the Orders and Shipments streams and, not coincidentally, the SQL semantics by which a windowed join generates row timestamps match the business rule.
We would say that this query has an inherent delay of 1 hour, because the SQL semantics are implementing a business rule whereby it is impossible to output a row saying that a 10:00 order has not been shipped in time until the Shipments stream has reached 11:00.
See also: Semantics of Rowtime Bounds.
In examples, all the times are data time, rowtimes, which may be different from wallclock time. According to data time, it is 10:00 when the Orders stream sends a row (or a rowtime bound) timestamped 10:00 or later. Typically this will happen very soon after 10:00, of course. Using NTP (Network Time Protocol) on all computers can ensure that system clocks are closely synchronized. However, the system will still operate correctly if there is an offset between clocks.
In general, application data arrives with rowtimes that may bear no resemblance to wallclock time, although under many real world circumstances, data time lags only slightly behind wallclock time.
SQLstream's query processing only refers to the system time (wallclock time) in two places:
|•||If a record is inserted into a stream without an explicit rowtime, SQLstream's driver generates a timestamp value using the system clock.|
|•||When a user calls any of the following time functions, it requests the wallclock time from the SQLstream system:|
|o||CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP, LOCALTIME, and LOCALTIMESTAMP.|
SQLstream's correctness mode of operation makes it easy to write applications that always produce the right results, even in a complex distributed system with many clients and servers. But if a producer crashes or goes offline while it is feeding rows into a query, or if it is just running slowly, then the query will wait for it.
For some applications, correctness mode is exactly what is needed. The query will wait until the producer catches up. Some applications would rather produce a result that is possibly incorrect than wait for one or two producers that are running slow.
If the producer is producing data infrequently, it can aid throughput speed and efficiency by periodically sending a current rowtime bound during a gap in its output rows. Receiving such a bound can enable waiting processes, happy to know there will be no further data from that stream earlier than that bound, to release results that have no further dependency on data with rowtimes up to that bound.
Another solution is for the producer to close its prepared INSERT statement. This tells SQLstream not to wait for the producer to send rows or rowtime bounds for the query to make progress. The implication of this strategy is that if the producer wants to rejoin the query, it will need to send rowtimes at or greater than the high water mark that the query has reached. Anything less will be rejected as an out-of-order row.
Both of these approaches are described in more detail in the Administrator Guide's Correctness Mode chapter, where Timeliness mode is discussed.
Suppose a stream S1 is producing 1 million rows per hour, and there are two processes reading from it, one of which is only reading 100,000 rows per hour. As time goes by, that one process falls further and further behind in reading the results sent by the producer.
The SQLstream system can not throw away an output row until it has been read by both consumers. It can either produce the data, and buffer it until the slower reader is ready for it, or delay input stream processing until output streams have caught up.
Delaying input stream processing lets the data back up through the system, like traffic backing up on a freeway. Feeder streams will eventually be prevented from sending records because their consumer, S1, has allowed its input queues to fill up. A further side-effect is that as those feeder streams are prevented from writing, any other queries that depend on those writers will also be forced to wait. S1's delay eventually starves the faster reader, forcing it to wait because new records are not being generated.
Currently in such a scenario, SQLstream tends to let data back up, because this is more efficient, at least for small amounts of back up. It saves writing data to disk, and it tends to smooth out stream processing. This is particularly true for streams that produce rows at irregular rates, enabling the system work on larger numbers of records at a time, which tends to be more efficient.
For example, the following diagram shows the gridlock that ensues when there is a slow reader. Reader 1 is running slowly, and its buffer is full of unread rows (black). Reader 2 is reading results from same query, but is keeping pace with the query. The join operator cannot produce more data until reader 1 has read the existing data, so it suspends operation (blocks).
There are several effects. First, Reader 2 is starved of more data. Second, the backlog spreads to the join operator's ancestors: Stream B, Writer 2 and Writer 3 are particularly affected, and block when their output buffers fill up, Stream A and Writer 1 less so. Third, because Stream B is unable to make progress, its other descendant, Reader 3, is starved. It is initially surprising that Reader 3 is affected, since it is neither an ancestor nor a descendant of Reader 1, the root cause of the backlog. Just like gridlock spreading through a congested highway network, flow problems can have far-reaching effects. To avoid this, Reader 1 needs to be re-engineered to run faster - perhaps by partitioning its work across a number of processes.
When there is a delay getting results from a system, the first assumption is that this is because SQLstream is running slowly. Actually, this is rarely the case, because SQLstream generally processes data very efficiently (much faster than a database, for example).
Depending on hardware specification of the system, a single node SQLstream system can generally handle tens of thousands of records per second, and/or hundreds of active queries. Under moderate loads, the system will shift its workload to work more efficiently at the expense of a slightly increased delay.
System delay can also arise from network traffic. Network latency can occur at various parts of the system. SQLstream uses TCP/IP for communications with Java clients and within the system. TCP/IP can have a significant delay, particularly over the Internet or other wide area networks (WANs), if the network is busy, and over wireless networks where there is radio interference.
SQLstream's Streaming Data Protocol (SDP) stack organizes rows into batches for efficient transmission. If there are several open streams, it will combine records from multiple streams into a single network packet. This batching increases efficiency, but may add a small delay.
SQLstream's time semantics, and in particular the TIMESTAMP values held in the ROWTIME column and passed as rowtime bounds, are consistent with the SQL standard. A timestamp value has no time zone associated with it. For example, the value TIMESTAMP '2001-01-01 00:00:00' represents the start of the millennium, but interpretation is left to the system architect.
Time data accessed via JDBC are accessed as Java timestamp values and follow Java data semantics. A Java java.sql.Timestamp object represents a particular moment in time. It contains a long (64 bit signed integer) field that represents the number of milliseconds since January 1st, 1970 UTC. (For example, on any particular day, 6:00AM PST and 9:00AM EST are the same moment, and both correspond to the same java.sql.Timestamp.)
There is a tension between these notions of time, which system architects generally resolve by standardizing on UTC as the timezone for the SQLstream system. JDBC clients can remain in their local timezone. If you read/write data using the JDBC setTimestamp(Timestamp) and getTimestamp() methods, timestamp values will automatically be converted (by the java runtime library) to UTC timestamp values. An alternative design is for the JDBC client to locate itself in the UTC zone; in this case the java library does no conversion (see TimeZone at http://java.sun.com/j2se/1.5.0/docs/api/java/util/TimeZone.html for more details).
There are many factors governing time in an streaming data application built using SQLstream.
Wallclock time and time zone differences are factors that need to be considered when designing a system but are not sources of delay as such.
In a well-provisioned system, system delay is usually negligible because SQLstream makes efficient use of CPU and network resources.
An inherent delay often arises in a streaming application as the inevitable result of asking vital business questions against available data sources. Producing correct answers using those data sources can simply require waiting for the an event to transpire, or for information necessary to the analysis to become available. Understanding the time semantics of streaming SQL will help you to mitigate those delays.