MergeRecord
将一组数据流根据用户定义的策略合并在一起,以合并JSON对象为JSON数组为例,合并后的结果对JSON对象有严格要求
参数说明
读取服务
指定用于读取数据格式的控制器服务。
写入服务
指定用于输出合并结果数据格式的控制器服务。
合并策略
指定用于合并记录的算法
属性策略
指定保留到新的数据流的属性。
最小记录数
指定需要合并的最小数据流数量,如设置为2,则需要数据流数量达到2才会进行合并;若达不到则组件会一直等待
最大记录数
指定需要合并的最大数据流数量,如设置为10,则需要数据流数量大于或等于10才会进行合并;若达不到则组件会一直等待,若大于10则将10条数据流合并为1个,10-到20的部分合并为下一个,以此类推
最小记录大小
指定合并后数据流最小大小
最大记录大小
指定合并后数据流的最大大小
记录的最大生命周期
指定当数据流数量不满足最大记录数时,组件的等待时间,超过设置的等待时间,即便数据流数量不满足最大记录数,同样会对数据进行合并
记录的最大数量
指定可以在内存中保存的最大记录数,该值要大于组件的最大并发线程数
输出连线
- failure连线表示合并失败的数据流输出
- merged连线表示合并成功的数据流输出
- original连线表示合并成功后原始的数据流输出
示例
Generate配置
由GenerateFlowfile生成触发流程的流文件,该组件生成的流文件内容为空,然后通过两个ReplaceText组件来替换流文件内容,两个替换的流文件内容为如下JSON对象:
ReplaceText组件1的替换内容:
{
"sname":"jack",
"id":"00001",
"state":"NY"
}
ReplaceText组件2的替换内容:
{
"sname":"marry",
"id":"00002"
}
流程说明
在此流程中合并的两个JSON对象格式是不一样的,因为MergeRecord对合并后的结果会严格要求,为了凸显这一特性,特意设定为两个格式不一致的JSON对象,而最终的合并结果中,数组里面每个JSON对象的格式都是一致的,缺少的键值对会自动补齐,补齐的值会自动用null填充
MergeRecord设置
以合并JSON对象为例,设置了读取服务
为JsonTreeReader,写入服务
为JsonRecordSetWriter,合并策略
为Bin-Packing算法,属性策略
为仅保留通用属性,最小记录数
为1,最大记录数
为1000,最大记录生命周期
为5s,其余属性保持默认配置。
结果
最后输出结果如下:
- merged连线输出结果
- original连线输出结果
流程模板
参见附件(请右键另存保存):模板文件