Apache Calcite官方文档中文版- 进阶-3. 流(Streaming)
第二部分 进阶(Advanced)
3. 流(Streaming)
Calcite扩展了SQL和关系代数以支持流式查询。
3.1 简介
流是收集到持续不断流动的记录,永远不停止。与表不同,它们通常不存储在磁盘上,而流是通过网络,并在内存中保存很短的时间。
数据流是对表格的补充,因为它们代表了企业现在和将来发生的事情,而表格代表了过去。一个流被存档到一个表中是很常见的。
与表一样,您经常希望根据关系代数以高级语言查询流,根据模式(schema)进行验证,并优化以充分利用可用的资源和算法。
Calcite的SQL是对标准SQL的扩展,而不是另一种“类SQL”的语言。区别很重要,原因如下:
- 对于任何知道常规SQL的人来说,流式SQL都很容易学习。
- 语义清晰,因为我们的目标是在一个流上产生相同的结果,就好像表中的数据是一样的。
- 可以编写结合了流和表(或者流的历史,基本上是内存表)的查询。
- 许多现有的工具可以生成标准的SQL。
如果不使用STREAM关键字,则返回常规标准SQL。
3.2 schema示例
流式SQL使用以下schema:
- Orders (rowtime, productId, orderId, units):一个流和一个表
- Products (rowtime, productId, name) :一个表
- Shipments (rowtime, orderId) :一个流
3.3 简单查询
最简单的流式查询:
SELECT STREAM *
FROM Orders;rowtime | productId | orderId | units
----------+-----------+---------+-------10:17:00 | 30 | 5 | 410:17:05 | 10 | 6 | 110:18:05 | 20 | 7 | 210:18:07 | 30 | 8 | 2011:02:00 | 10 | 9 | 611:04:00 | 10 | 10 | 111:09:30 | 40 | 11 | 1211:24:11 | 10 | 12 | 4
该查询读取Orders流中的所有列和行。与任何流式查询一样,它永远不会终止。只要记录到达,它就会输出一条记录Orders。
输入Control-C以终止查询。
STREAM关键字是SQL流的主要扩展。它告诉系统你对订单有兴趣,而不是现有订单。
查询:
SELECT *
FROM Orders;rowtime | productId | orderId | units
----------+-----------+---------+-------08:30:00 | 10 | 1 | 308:45:10 | 20 | 2 | 109:12:21 | 10 | 3 | 1009:27:44 | 30 | 4 | 24 records returned.
也是有效的,但会打印出现有的所有订单,然后终止。我们把它称为关系查询,而不是流式处理。它具有传统的SQL语义。
Orders很特殊,因为它有一个流和一个表。如果您尝试在表上运行流式查询或在流上运行关系式查询,则Calcite会抛出一个错误:
SELECT * FROM Shipments;ERROR: Cannot convert stream 'SHIPMENTS' to a tableSELECT STREAM * FROM Products;ERROR: Cannot convert table 'PRODUCTS' to a stream
3.4 过滤行
与常规的SQL中一样,使用一个WHERE子句来过滤行:
SELECT STREAM *
FROM Orders
WHERE units > 3;rowtime | productId | orderId | units
----------+-----------+---------+-------10:17:00 | 30 | 5 | 410:18:07 | 30 | 8 | 2011:02:00 | 10 | 9 | 611:09:30 | 40 | 11 | 12
11:24:11 | 10 | 12 | 4
3.5 表达式投影
在SELECT子句中使用表达式来选择要返回或计算表达式的列:
SELECT STREAM rowtime,'An order for ' || units || ' '|| CASE units WHEN 1 THEN 'unit' ELSE 'units' END|| ' of product #' || productId AS description
FROM Orders;rowtime | description
----------+---------------------------------------10:17:00 | An order for 4 units of product #3010:17:05 | An order for 1 unit of product #1010:18:05 | An order for 2 units of product #2010:18:07 | An order for 20 units of product #3011:02:00 | An order by 6 units of product #1011:04:00 | An order by 1 unit of product #1011:09:30 | An order for 12 units of product #4011:24:11 | An order by 4 units of product #10
我们建议您始终在SELECT 条款中包含rowtime列。在每个流和流式查询中有一个有序的时间戳,可以在稍后进行高级计算,例如GROUP BY和JOIN。
3.6 滚动窗口
有几种方法可以计算流上的聚合函数。差异是:
- How many rows come out for each row in?
- 每个输入值总共出现一次还是多次?
- 什么定义了“窗口”,一组贡献给输出行的行?
- 结果是流还是关系?
窗口类型:
- 滚动窗口(GROUP BY)
- 跳转窗口(多GROUP BY)(hopping)
- 滑动窗口(窗口函数)
- 级联窗口(窗口函数)
下图显示了使用它们的查询类型:
首先,看一个滚动窗口,它是由一个流GROUP BY定义的 。这里是一个例子:
SELECT STREAM CEIL(rowtime TO HOUR) AS rowtime,productId,COUNT(*) AS c,SUM(units) AS units
FROM Orders
GROUP BY CEIL(rowtime TO HOUR), productId;rowtime | productId | c | units
------------+---------------+------------+-------11:00:00 | 30 | 2 | 2411:00:00 | 10 | 1 | 111:00:00 | 20 | 1 | 712:00:00 | 10 | 3 | 1112:00:00 | 40 | 1 | 12
结果是流。在11点整,Calcite发出自10点以来一直到11点有下订单的 productId的小计。12点,它会发出11:00至12:00之间的订单。每个输入行只贡献到一个输出行。
Calcite是如何知道10:00:00的小计在11:00:00完成的,这样就可以发出它们了?它知道rowtime是在增加,而且它也知道CEIL(rowtime TO HOUR)在增加。所以,一旦在11:00:00时间点或之后看到一行,它将永远不会看到贡献到上午10:00:00的一行。
增加或减少的列以及表达式是单调的。(单调递增或单调递减)
如果列或表达式的值具有轻微的失序,并且流具有用于声明特定值将不会再被看到的机制(例如标点符号或水印),则该列或表达式被称为准单调。
在GROUP BY子句中没有单调或准单调表达式的情况下,Calcite无法取得进展,并且不允许查询:
SELECT STREAM productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY productId;ERROR: Streaming aggregation requires at least one monotonic expression
单调和准单调的列需要在模式中声明。当记录输入流并且由从该流中读取数据的假定查询时,单调性被强制执行。我们建议为每个流指定一个时间戳列rowtime,但也可以声明其他列是单调的,例如orderId。
我们将在下面的内容讨论标点符号,水印,并取得进展的其他方法 。
3.7 滚动窗口,改进
前面的滚动窗口的例子很容易写,因为窗口是一个小时。对于不是整个时间单位的时间间隔,例如2小时或2小时17分钟,则不能使用CEIL,表达式将变得更复杂。
Calcite支持滚动窗口的替代语法:
SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,productId,COUNT(*) AS c,SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;rowtime | productId | c | units
----------+-----------+---------+-------11:00:00 | 30 | 2 | 2411:00:00 | 10 | 1 | 111:00:00 | 20 | 1 | 712:00:00 | 10 | 3 | 1112:00:00 | 40 | 1 | 12
正如你所看到的,它返回与前一个查询相同的结果。TUMBLE 函数返回一个分组键,这个分组键在给定的汇总行中将会以相同的方式结束; TUMBLE_END函数采用相同的参数并返回该窗口的结束时间; 当然还有一个TUMBLE_START函数。
TUMBLE有一个可选参数来对齐窗口。在以下示例中,我们使用30分钟间隔和0:12作为对齐时间,因此查询在每小时过去12分钟和42分钟时发出汇总:
SELECT STREAMTUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,productId,COUNT(*) AS c,SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),productId;rowtime | productId | c | units
----------+-----------+---------+-------10:42:00 | 30 | 2 | 2410:42:00 | 10 | 1 | 110:42:00 | 20 | 1 | 711:12:00 | 10 | 2 | 711:12:00 | 40 | 1 | 1211:42:00 | 10 | 1 | 4
3.8 跳转窗口
跳转窗口是滚动窗口的泛化(概括),它允许数据在窗口中保持比发出间隔更长的时间。
查询发出的行的时间戳11:00,包含数据从08:00至11:00(或10:59.9);以及行的时间戳12:00,包含数据从09:00至12:00。
SELECT STREAMHOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR) AS rowtime,COUNT(*) AS c,SUM(units) AS units
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR);rowtime | c | units
----------+----------+-------11:00:00 | 4 | 2712:00:00 | 8 | 50
在这个查询中,因为保留期是发出期的3倍,所以每个输入行都贡献到3个输出行。想象一下,HOP函数为传入行生成一组Group Keys,并将其值存储在每个Group Key的累加器中。例如,HOP(10:18:00, INTERVAL '1' HOUR, INTERVAL '3')产生3个时间间隔周期:
[08:00, 09:00)
[09:00, 10:00)
[10:00, 11:00)
这就提出了允许不满意内置函数HOP和TUMBLE的用户来自定义的分区函数的可能性。
我们可以建立复杂的复杂表达式,如指数衰减的移动平均线:
SELECT STREAM HOP_END(rowtime),productId,SUM(unitPrice * EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))/ SUM(EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))
发出:
- 1:00:00包含[10:00:00, 11:00:00)的行;
- 1:00:01包含[10:00:01, 11:00:01)的行。
这个表达最近的订单比旧订单权重更高。将窗口从1小时扩展到2小时或1年对结果的准确性几乎没有影响(但会使用更多的内存和计算资源)。
请注意,我们在一个聚合函数(SUM)中使用HOP_START,因为它是一个子汇总(sub-total)内所有行的常量值。对于典型的集合函数( SUM,COUNT等等),这是不允许的。
如果您熟悉GROUPING SETS,可能会注意到,分区函数可以看作是泛化的GROUPING SETS,因为它们允许一个输入行对多个子汇总做出贡献。用于GROUPING SETS的辅助函数诸如如GROUPING()和GROUP_ID可以在聚合函数内部使用,所以并不奇怪, HOP_START和HOP_END可以以相同的方式使用。3.9 分组集合
GROUPING SETS对于流式查询是有效的,只要每个分组集合包含单调或准单调表达式。
CUBE和ROLLUP不适用于流式查询,因为它们将生成至少一个聚合所有内容(如GROUP BY ())的分组集合 。3.10 聚合后Consideration
与标准SQL一样,可以使用HAVING子句来过滤由流GROUP BY发出的行:
SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,productId
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId
HAVING COUNT(*) > 2 OR SUM(units) > 10;rowtime | productId
----------+-----------
10:00:00 | 30
11:00:00 | 10
3.11 子查询,视图和SQL闭包属性
前述的HAVING查询可以使用WHERE子查询中的子句来表示:
SELECT STREAM rowtime, productId
FROM (SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,productId,COUNT(*) AS c,SUM(units) AS suFROM OrdersGROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
WHERE c > 2 OR su > 10;rowtime | productId
----------+-----------10:00:00 | 3011:00:00 | 1011:00:00 | 40
HAVING子句是在SQL早期引入的,当需要在聚合之后执行过滤器时,(回想一下,WHERE在输入到达GROUP BY子句之前过滤行)。
从那时起,SQL已经成为一种数学封闭的语言,这意味着您可以在一个表上执行的任何操作也可以在查询上执行。
SQL 的闭包属性非常强大。它不仅使 HAVING陈旧过时(或至少减少到语法糖),它使视图成为可能:
CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) ASSELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),productId,COUNT(*),SUM(units)FROM OrdersGROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;rowtime | productId
----------+-----------10:00:00 | 3011:00:00 | 1011:00:00 | 40
FROM子句中的子查询有时被称为“内联视图”,但实际上它们比视图更基础。视图只是一个方便的方法,通过给出这些分片命名并将它们存储在元数据存储库中,将SQL分割成可管理的块。
很多人发现嵌套的查询和视图在流上比在关系上更有用。流式查询是连续运行的运算符的管道,而且这些管道通常会很长。嵌套的查询和视图有助于表达和管理这些管道。
顺便说一下,WITH子句可以完成与子查询或视图相同的操作:
WITH HourlyOrderTotals (rowtime, productId, c, su) AS (SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),productId,COUNT(*),SUM(units)FROM OrdersGROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;rowtime | productId
------------+-----------10:00:00 | 3011:00:00 | 1011:00:00 | 40
3.12 流和关系之间的转换
回顾一下HourlyOrderTotals视图的定义。此视图是流还是关系?
它不包含STREAM关键字,所以它是一个关系。但是,这是一种可以转换成流的关系。
可以在关系和流式查询中使用它:
# A relation; will query the historic Orders table.
# Returns the largest number of product #10 ever sold in one hour.
SELECT max(su)
FROM HourlyOrderTotals
WHERE productId = 10;# A stream; will query the Orders stream.
# Returns every hour in which at least one product #10 was sold.
SELECT STREAM rowtime
FROM HourlyOrderTotals
WHERE productId = 10;
这种方法不限于视图和子查询。遵循CQL [ 1 ]中规定的方法,流式SQL中的每个查询都被定义为关系查询,并最上面的SELECT使用STREAM关键字转换为流。
如果STREAM关键字存在于子查询或视图定义中,则不起作用。
在查询准备时间,Calcite计算查询中引用的关系是否可以转换为流或历史的关系。
有时候,一个流可以提供它的一些历史记录(比如Apache Kafka [ 2 ]主题中最后24小时的数据),但不是全部。在运行时,Calcite计算出是否有足够的历史记录来运行查询,如果没有,则会给出错误。
3.13 “饼图”问题:流上的关系查询
一个特定的情况下,需要将流转换为关系时会发生我所说的“饼图问题”。想象一下,你需要写一个带有图表的网页,如下所示,它总结了每个产品在过去一小时内的订单数量。
但是这个Orders流只包含几条记录,而不是一个小时的汇总。我们需要对流的历史记录运行一个关系查询:
SELECT productId, count(*)
FROM Orders
WHERE rowtime BETWEEN current_timestamp - INTERVAL '1' HOURAND current_timestamp;
如果Orders流的历史记录正在滚动到Orders表中,尽管成本很高,我们可以回答查询。更好的办法是,如果我们可以告诉系统将一小时的汇总转化为表格,在流式处理过程中不断维护它,并自动重写查询以使用表格。
3.14 排序
ORDER BY的故事类似于GROUP BY。语法看起来像普通的SQL,但是Calcite必须确保它能够提供及时的结果。因此,它需要在ORDER BY键的前沿(leading edge)有一个单调的表达式。
SELECT STREAM CEIL(rowtime TO hour) AS rowtime, productId, orderId, units
FROM Orders
ORDER BY CEIL(rowtime TO hour) ASC, units DESC;rowtime | productId | orderId | units
----------+-----------+---------+-------10:00:00 | 30 | 8 | 2010:00:00 | 30 | 5 | 410:00:00 | 20 | 7 | 210:00:00 | 10 | 6 | 111:00:00 | 40 | 11 | 1211:00:00 | 10 | 9 | 611:00:00 | 10 | 12 | 411:00:00 | 10 | 10 | 1
大多数查询将按照插入的顺序返回结果,因为引使用流式算法,但不应该依赖它。例如,考虑一下:
SELECT STREAM *
FROM Orders
WHERE productId = 10
UNION ALL
SELECT STREAM *
FROM Orders
WHERE productId = 30;rowtime | productId | orderId | units
----------+-----------+---------+-------10:17:05 | 10 | 6 | 110:17:00 | 30 | 5 | 410:18:07 | 30 | 8 | 2011:02:00 | 10 | 9 | 611:04:00 | 10 | 10 | 111:24:11 | 10 | 12 | 4
productId= 30 的行显然是不符合order要求的,可能是因为Orders流以productId分区,分区后的流在不同的时间发送了他们的数据。
如果您需要特定的顺序,请添加一个显式的ORDER BY:
Calcite可能会通过合并使用rowtime实现UNION ALL,这样只是效率稍微低些。
只需要添加一个ORDER BY到最外层的查询。如果需要在UNION ALL之后执行GROUP BY,Calcite将会 隐式添加ORDER BY,以便使GROUP BY算法成为可能。
3.15 表格构造器
VALUES子句创建一个拥有给定行集合的内联表。
流式传输是不允许的。这组行不会发生改变,因此一个流永远不会返回任何行。
> SELECT STREAM * FROM (VALUES (1, 'abc'));ERROR: Cannot stream VALUES
3.16 滑动窗口
标准SQL的功能特性之一可以在SELECT子句中使用所谓的“分析函数” 。不像GROUP BY,不会折叠记录。对于每个进来的记录,出来一个记录。但是聚合函数是基于一个多行的窗口。
我们来看一个例子。
SELECT STREAM rowtime,productId,units,SUM(units) OVER (ORDER BY rowtime RANGE INTERVAL '1' HOUR PRECEDING) unitsLastHour
这个功能特性付出很小的努力就包含了很多Power。在SELECT子句中可以有多个函数,基于多个窗口规则定义。
以下示例返回在过去10分钟内平均订单数量大于上周平均订单数量的订单。
SELECT STREAM *
FROM (SELECT STREAM rowtime,productId,units,AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7FROM OrdersWINDOW product AS (ORDER BY rowtimePARTITION BY productId))
为了简洁起见,在这里我们使用一种语法,其中使用WINDOW子句部分定义窗口,然后在每个OVER子句中细化窗口。也可以定义WINDOW子句中的所有窗口,或者如果您愿意,可以定义所有内联窗口。
但真正的power超越语法。在幕后,这个查询维护着两个表,并且使用FIFO队列添加和删除子汇总中的值。但是,无需在查询中引入联接,也可以访问这些表。
窗口化聚合语法的一些其他功能特性:
- 可以根据行数定义窗口。
- 该窗口可以引用尚未到达的行。(流会等到他们到达)。
-
可以计算与顺序有关的函数,如RANK中位数。
3.17 级联窗口
如果我们想要一个返回每个记录的结果的查询,比如一个滑动窗口,但是在一个固定的时间段重置总数,就像一个翻滚的窗口?这种模式被称为级联窗口。这里是一个例子:
SELECT STREAM rowtime, productId, units, SUM(units) OVER (PARTITION BY FLOOR(rowtime TO HOUR)) AS unitsSinceTopOfHour它看起来类似于滑动窗口查询,但单调表达式出现在PARTITION BY窗口的子句中。由于rowtime从10:59:59到11:00:00,FLOOR(rowtime TO HOUR)从10:00:00到11:00:00发生改变,因此一个新的分区开始。在新的时间到达的第一行将开始新的汇总; 第二行将有一个由两行组成的汇总,依此类推。
Calcite知道旧分区永远不会再被使用,因此从内部存储中删除该分区的所有子汇总。
使用级联和滑动窗口的分析函数可以组合在同一个查询中。3.18 流与表Join
有两种类型的连接,即stream-to-table join和stream-to-stream join。
如果表的内容没有改变,则流到表的连接是直接的。这个查询以每个产品的列出价格丰富了订单流:SELECT STREAM o.rowtime, o.productId, o.orderId, o.units, p.name, p.unitPrice FROM Orders AS o JOIN Products AS p ON o.productId = p.productId;rowtime | productId | orderId | units | name | unitPrice ----------+-----------+---------+-------+ -------+----------- 10:17:00 | 30 | 5 | 4 | Cheese | 17 10:17:05 | 10 | 6 | 1 | Beer | 0.25 10:18:05 | 20 | 7 | 2 | Wine | 6 10:18:07 | 30 | 8 | 20 | Cheese | 17 11:02:00 | 10 | 9 | 6 | Beer | 0.25 11:04:00 | 10 | 10 | 1 | Beer | 0.25 11:09:30 | 40 | 11 | 12 | Bread | 100 11:24:11 | 10 | 12 | 4 | Beer | 0.25如果表格在改变,会发生什么?例如,假设product#10的单价在11点增加到0.35。在11:00之前下的订单应该是旧价格,在11:00之后下的订单应该反映新价格。
实现此目的的一种方法是创建一个表,使每个版本的开始和结束生效日期保持一致,ProductVersions如下所示:SELECT STREAM * FROM Orders AS o JOIN ProductVersions AS p ON o.productId = p.productId AND o.rowtime BETWEEN p.startDate AND p.endDaterowtime | productId | orderId | units | productId1 | name | unitPrice ----------+-----------+---------+-------+ -----------+--------+----------- 10:17:00 | 30 | 5 | 4 | 30 | Cheese | 17 10:17:05 | 10 | 6 | 1 | 10 | Beer | 0.25 10:18:05 | 20 | 7 | 2 | 20 | Wine | 6 10:18:07 | 30 | 8 | 20 | 30 | Cheese | 17 11:02:00 | 10 | 9 | 6 | 10 | Beer | 0.35 11:04:00 | 10 | 10 | 1 | 10 | Beer | 0.35 11:09:30 | 40 | 11 | 12 | 40 | Bread | 100 11:24:11 | 10 | 12 | 4 | 10 | Beer | 0.35另一种实现方法是使用具有临时支持的数据库(能够像过去的任何时候一样查找数据库的内容),并且系统需要知道Orders流的rowtime列对应于Products表的事务时间戳 。
对于许多应用程序而言,暂时支持或版本化表格的成本和努力是不值得的。查询在重放时给出不同的结果是可以接受的:在这个例子中,在重放时,product#10的所有订单被分配后来的单价0.35。3.19 流与流Join
如果连接条件以某种方式强迫它们彼此保持有限的距离,那么流与流的连接就是合理的。在以下查询中,发货日期在订单日期的一小时内:
SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime FROM Orders AS o JOIN Shipments AS s ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;rowtime | productId | orderId | shipTime ----------+-----------+---------+---------- 10:17:00 | 30 | 5 | 10:55:00 10:17:05 | 10 | 6 | 10:20:00 11:02:00 | 10 | 9 | 11:58:00 11:24:11 | 10 | 12 | 11:44:00请注意,相当多的订单不会显示,因为它们在一个小时内没有发货。在系统接收到Order#10时,时间戳为11:24:11,它已经从其哈希表中删除了订单包括Order#8(时间戳10:18:07)。
正如你所看到的,把这两个流的单调或准单调列联系在一起的“锁定步骤”是系统取得进展所必需的。如果它不能推断出一个锁定步骤, 它将拒绝执行一个查询。3.20 DML
这不仅是查询对流来说有意义。运行DML语句(INSERT,UPDATE,DELETE,UPSERT和REPLACE)对流来说同样有意义。
DML非常有用,因为它允许基于流实现物华流或表格,因此经常使用值可以节省工作量。
考虑到流的应用程序通常由查询管道组成,每个查询将输入流转换为输出流。管道的组件可以是一个视图:CREATE VIEW LargeOrders AS SELECT STREAM * FROM Orders WHERE units > 1000;或者一个标准的INSERT语句:
INSERT INTO LargeOrders SELECT STREAM * FROM Orders WHERE units > 1000;这些看起来很相似,在这两种情况下,管道中的下一个步骤都可以读取LargeOrders,而不用担心它是如何填充的。效率是有差别的:INSERT无论有多少消费者,做的工作都是相同的。这个视图的确与消费者的数量成正比,特别是没有消费者的情况下就没有工作。
其他形式的DML对于流也是有意义的。例如,以下常设UPSERT语句维护一个表格,以实现最后一小时订单的汇总:UPSERT INTO OrdersSummary SELECT STREAM productId, COUNT(*) OVER lastHour AS c FROM Orders WINDOW lastHour AS ( PARTITION BY productId ORDER BY rowtime RANGE INTERVAL '1' HOUR PRECEDING)3.21 标点(Punctuation)
Punctuation [ 5 ]允许流式查询取得进展,即使单调的键中没有足够的值来推送出结果。
(我更喜欢术语“rowtime bounds”,水印[ 6 ]是一个相关的概念,但为了这些目的,Punctuation就足够了。)
如果某个流具有Punctuation,那么它可能不会被排序,不过仍然可以排序。因此,出于语义的目的,按照排序的流来工作就足够了。
顺便说一下,一个无序的流也是可排序的,如果按t-sorted排序 (即,每个记录保证在其时间戳的t秒内到达)或k-sorted排序(即每个记录保证不超过k的位置造成无序)。所以对这些流的查询可以像带有Punctuation的流式查询来进行计划。
而且,我们经常要聚合不是时间的且是单调的属性。“一个团队在获胜状态和失败状态之间转移的次数”就是这样一个单调的属性。系统需要自己弄清楚聚合这样一个属性是否安全; Punctuation不会添加任何额外的信息。
我记得一些计划器的元数据(成本指标):
- 这个流按给定的一个或多个属性排序吗?
- 是否可以对给定属性的流进行排序?(对于有限的关系,答案总是“是”;对于流,它依赖于Punctuation的存在,或属性和排序键之间的联系)。
- 我们需要引入什么延迟才能执行此类操作?
- 执行此类操作的成本(CPU,内存等)是多少?
在BuiltInMetadata.Collation中,我们已经有了(1)。对于(2),答案对于有限关系总是“true”。但是我们需要为流实现(2),(3)和(4)。
3.22 流的状态
并非本文中的所有概念都已经在Calcite中实现。其他的可能在Calcite中实现,但不能在SamzaSQL [ 3 ] [ 4 ] 等特定的适配器中实现。
已实现
- 流式SELECT,WHERE,GROUP BY,HAVING,UNION ALL,ORDER BY
- FLOOR和CEIL函数
- 单调性
- 流式VALUES是不允许的
未实现
本文档中提供的以下功能特性,以为Calcite支持它们,但实际上它还没有实现。全面支持意味着参考实现支持该功能特性(包括负面情况),TCK则对其进行测试。
- 流与流的 JOIN
- 流与表的 JOIN
- 视图上的流
- 带有ORDER BY流UNION ALL(合并)
- 流上的关系查询
- 流式窗口聚合(滑动和级联窗口)
- 检查STREAM在子查询和视图是否被忽略
- 检查流的ORDER BY子句不能有OFFSET或LIMIT
- 历史有限性; 在运行时,检查是否有足够的历史记录来运行查询。
- 准单调
- HOP和TUMBLE(和辅助HOP_START,HOP_END, TUMBLE_START,TUMBLE_END)函数
本文档做了什么
- 重新访问是否可以流式传输 VALUES
- OVER 子句来定义窗口上的流
- 考虑在流式查询中是否允许CUBE和ROLLUP,理解某些级别的聚合将永远不会完成(因为它们没有单调表达式),因此不会被发出。
- 修复该UPSERT示例以删除在过去一小时内没有发生的产品的记录。
- 输出到多个流的DML; 也许是标准REPLACE语句的扩展 。
3.23 函数
以下函数在标准SQL中不存在,但在流式SQL中定义。
标量函数:
- FLOOR(dateTime TO intervalType) 将日期,时间或时间戳值取下限为给定的间隔类型
- CEIL(dateTime TO intervalType) 将日期,时间或时间戳值取上限到给定的间隔类型
分区函数:
- HOP(t, emit, retain) 返回一个集合of group keys for a row作为跳转窗口的一部分
- HOP(t, emit, retain, align) 返回一个集合of group keys for a row作为给定对齐的跳转窗口的一部分
- TUMBLE(t, emit) 返回一个group key for a row作为滚动窗口的一部分
- TUMBLE(t, emit, align) 返回一个group key for a row作为给定对齐滚动窗口的一部分
注:
TUMBLE(t, e)相当于TUMBLE(t, e, TIME '00:00:00')。
TUMBLE(t, e, a)相当于HOP(t, e, e, a)。
HOP(t, e, r)相当于HOP(t, e, r, TIME '00:00:00')
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
