AggregateRecord
AggregateRecord组件为QueryRecord组件的简化版本,旨在为不熟悉SQL语法的用户提供可视化操作。 AggregateRecord从FlowFile读取内容作为输入,要求输入FlowFile的格式必须为自带schema的Avro。 因此,如果你的数据格式不是Avro,请使用格式转换类组件(比如ConvertRecord)进行格式转换。由于额外的格式转换会消耗更多的资源,在对性能有要求的情况下, 推荐使用QueryRecord组件,它更灵活,也更快。
说明
AggregateRecord接收两个用来确定输出数据内容的参数:
- 输出格式 :此项必填,默认为
Avro
,还支持JSON
、XML
和CSV
格式。 - schema内容:此项选填。用来告诉组件输出数据应该包含哪些列。此schema专指
Avro
中的schema。
关于 schema内容 这一参数,这里详细说明一下:schema内容 在不使用聚集函数和别名功能时,可以不填,此时AggregateRecord会尝试根据 输入Avro 和
行筛选条件 推断出合适的schema
。一旦在Aggregate中使用聚集函数和别名功能时,必须填写schema内容,schema中的name
一般为高级选项中指定的别名。
建议在使用聚集函数时,同时指定别名。
高级选项下的UI界面如下:
界面由5各部分组成:
- 返回列:这里可以填写你希望AggregateRecord从查询中返回哪些列。列名必须真实存在于输入Avro的schema中。默认返回所有列。
- 行筛选条件:这里对查询的内容进行筛选,只有同时满足所有行筛选条件的数据行才会被返回。
- 分组筛选条件:这里是对聚集操作的结果进行筛选的,只有在使用了聚集函数的情况下才能使用分组筛选条件。
- 排序方式:指定返回结果的排序方式,排序的优先级依次为表中第一列、第二列、第三列……
- 返回结果限制:用于限制返回结果的数量,
-1
代表返回结果数量不受限制
示例
首先给出示例流程:
流程分别给出了使用AggregateRecord进行简单查询(无聚集函数)和复杂查询(有聚集函数)的示例。
测试数据
测试数据的格式是CSV
,内容如下:
purchase_no, customer_id, item_id, item_name, price, quantity
10280, 40070, 1028, Box of pencils, 6.99, 2
10280, 40070, 4402, Stapler, 12.99, 1
12440, 28302, 1029, Box of ink pens, 8.99, 1
28340, 41028, 1028, Box of pencils, 6.99, 18
28340, 41028, 1029, Box of ink pens, 8.99, 18
28340, 41028, 2038, Printer paper, 14.99, 10
28340, 41028, 4018, Clear tape, 2.99, 10
28340, 41028, 3329, Tape dispenser, 14.99, 10
28340, 41028, 5192, Envelopes, 4.99, 45
28340, 41028, 3203, Laptop Computer, 978.88, 2
28340, 41028, 2937, Monitor, 329.98, 2
49102, 47208, 3204, Powerful Laptop Computer, 1680.99, 1
为了满足AggregateRecord的输入,使用了ConvertRecord进行了CSV
到Avro
的格式转换。
流程中的很多地方还用到了一个变量avro.schema
,它的内容如下:
{
"name": "helloAvro",
"namespace": "org.apache.nifi.blogs",
"type": "record",
"fields": [
{ "name": "purchase_no", "type": "long" },
{ "name": "customer_id", "type": "long" },
{ "name": "item_id", "type": ["null", "long"] },
{ "name": "item_name", "type": ["null", "string"] },
{ "name": "price", "type": ["null", "double"] },
{ "name": "quantity", "type": ["null", "int"] },
{ "name": "total_price", "type": ["null", "double"] },
{ "name": "avg_price", "type": ["null", "double"]},
{ "name": "total_quantity", "type": ["null", "int"]}
]
}
组件示例
以下对部分使用了AggregateRecord的组件给出了配置示例及输出示例:
简单查询1
输出JSON,指定schema(内容同avro.schema
),未进行UI设置,因此返回所有。
返回的内容如下:
[ {
"purchase_no" : 10280,
"customer_id" : 40070,
"item_id" : 1028,
"item_name" : "Box of pencils",
"price" : 6.99,
"quantity" : 2,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 10280,
"customer_id" : 40070,
"item_id" : 4402,
"item_name" : "Stapler",
"price" : 12.99,
"quantity" : 1,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 12440,
"customer_id" : 28302,
"item_id" : 1029,
"item_name" : "Box of ink pens",
"price" : 8.99,
"quantity" : 1,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 28340,
"customer_id" : 41028,
"item_id" : 1028,
"item_name" : "Box of pencils",
"price" : 6.99,
"quantity" : 18,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 28340,
"customer_id" : 41028,
"item_id" : 1029,
"item_name" : "Box of ink pens",
"price" : 8.99,
"quantity" : 18,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 28340,
"customer_id" : 41028,
"item_id" : 2038,
"item_name" : "Printer paper",
"price" : 14.99,
"quantity" : 10,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 28340,
"customer_id" : 41028,
"item_id" : 4018,
"item_name" : "Clear tape",
"price" : 2.99,
"quantity" : 10,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 28340,
"customer_id" : 41028,
"item_id" : 3329,
"item_name" : "Tape dispenser",
"price" : 14.99,
"quantity" : 10,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 28340,
"customer_id" : 41028,
"item_id" : 5192,
"item_name" : "Envelopes",
"price" : 4.99,
"quantity" : 45,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 28340,
"customer_id" : 41028,
"item_id" : 3203,
"item_name" : "Laptop Computer",
"price" : 978.88,
"quantity" : 2,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 28340,
"customer_id" : 41028,
"item_id" : 2937,
"item_name" : "Monitor",
"price" : 329.98,
"quantity" : 2,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
}, {
"purchase_no" : 49102,
"customer_id" : 47208,
"item_id" : 3204,
"item_name" : "Powerful Laptop Computer",
"price" : 1680.99,
"quantity" : 1,
"total_price" : null,
"avg_price" : null,
"total_quantity" : null
} ]
简单聚集
指定输出
CSV
,指定schema内容。
这里使用了简单的聚集函数,并对返回行及分组进行了筛选,还指定了排序方式。
注意这里分组筛选条件的写法。
返回的结果如下:
purchase_no,customer_id,item_id,item_name,price,quantity,total_price,avg_price,total_quantity
12440,28302,,,,,,8.99,1
49102,47208,,,,,,1680.99,1
10280,40070,,,,,,9.99,3
28340,41028,,,,,,170.35,115
复杂聚集1
这里演示了支持四则运算的聚集函数的写法。类似的你还可以在不使用聚集函数时使用四则运算,记得指定别名,以便能够在schema中找到对应。
返回的结果如下:
<?xml version="1.0" ?>
<root>
<record>
<purchase_no>49102</purchase_no>
<customer_id>47208</customer_id>
<item_id></item_id>
<item_name></item_name>
<price></price>
<quantity></quantity>
<total_price>1680.99</total_price>
<avg_price></avg_price>
<total_quantity></total_quantity>
</record>
<record>
<purchase_no>28340</purchase_no>
<customer_id>41028</customer_id>
<item_id></item_id>
<item_name></item_name>
<price></price>
<quantity></quantity>
<total_price>3459.6099999999997</total_price>
<avg_price></avg_price>
<total_quantity></total_quantity>
</record>
</root>
因为schema的原因,返回的XML中出现了以下空标签。
复杂聚集2
指定返回Avro格式的数据,其它配置和复杂聚集1完全相同。
简单查询2
这里没有指定schema,也没有配置高级界面。AggregateRecord会进行schema推断,返回的结果如下:
purchase_no,customer_id,item_id,item_name,price,quantity,total_price,avg_price,total_quantity
10280,40070,1028,Box of pencils,6.99,2,,,
10280,40070,4402,Stapler,12.99,1,,,
12440,28302,1029,Box of ink pens,8.99,1,,,
28340,41028,1028,Box of pencils,6.99,18,,,
28340,41028,1029,Box of ink pens,8.99,18,,,
28340,41028,2038,Printer paper,14.99,10,,,
28340,41028,4018,Clear tape,2.99,10,,,
28340,41028,3329,Tape dispenser,14.99,10,,,
28340,41028,5192,Envelopes,4.99,45,,,
28340,41028,3203,Laptop Computer,978.88,2,,,
28340,41028,2937,Monitor,329.98,2,,,
49102,47208,3204,Powerful Laptop Computer,1680.99,1,,,
简单查询3
这里也没有指定schema,但配置了高级界面。AggregateRecord推断出来的schema会稍有不同。
返回的结果如下:
[ {
"customer_id" : 28302,
"purchase_no" : 12440
}, {
"customer_id" : 40070,
"purchase_no" : 10280
}, {
"customer_id" : 47208,
"purchase_no" : 49102
}, {
"customer_id" : 41028,
"purchase_no" : 28340
} ]
参考模板
AggregateRecord示例包含了以上示例的所有配置。
AggregateRecord_Test是对该组件的测试用例流程。