Skip to main content

MergeRecord

将一组数据流根据用户定义的策略合并在一起,以合并JSON对象为JSON数组为例,合并后的结果对JSON对象有严格要求

参数说明

读取服务

指定用于读取数据格式的控制器服务。

写入服务

指定用于输出合并结果数据格式的控制器服务。

合并策略

指定用于合并记录的算法

属性策略

指定保留到新的数据流的属性。

最小记录数

指定需要合并的最小数据流数量,如设置为2,则需要数据流数量达到2才会进行合并;若达不到则组件会一直等待

最大记录数

指定需要合并的最大数据流数量,如设置为10,则需要数据流数量大于或等于10才会进行合并;若达不到则组件会一直等待,若大于10则将10条数据流合并为1个,10-到20的部分合并为下一个,以此类推

最小记录大小

指定合并后数据流最小大小

最大记录大小

指定合并后数据流的最大大小

记录的最大生命周期

指定当数据流数量不满足最大记录数时,组件的等待时间,超过设置的等待时间,即便数据流数量不满足最大记录数,同样会对数据进行合并

记录的最大数量

指定可以在内存中保存的最大记录数,该值要大于组件的最大并发线程数

输出连线

  • failure连线表示合并失败的数据流输出
  • merged连线表示合并成功的数据流输出
  • original连线表示合并成功后原始的数据流输出

示例

Generate配置

由GenerateFlowfile生成触发流程的流文件,该组件生成的流文件内容为空,然后通过两个ReplaceText组件来替换流文件内容,两个替换的流文件内容为如下JSON对象:

ReplaceText组件1的替换内容:
{
"sname":"jack",
"id":"00001",
"state":"NY"
}
ReplaceText组件2的替换内容:
{
"sname":"marry",
"id":"00002"
}

流程说明

在此流程中合并的两个JSON对象格式是不一样的,因为MergeRecord对合并后的结果会严格要求,为了凸显这一特性,特意设定为两个格式不一致的JSON对象,而最终的合并结果中,数组里面每个JSON对象的格式都是一致的,缺少的键值对会自动补齐,补齐的值会自动用null填充

MergeRecord设置

以合并JSON对象为例,设置了读取服务为JsonTreeReader,写入服务为JsonRecordSetWriter,合并策略为Bin-Packing算法,属性策略为仅保留通用属性,最小记录数为1,最大记录数为1000,最大记录生命周期为5s,其余属性保持默认配置。

结果

最后输出结果如下:

  • merged连线输出结果

  • original连线输出结果

流程模板

参见附件(请右键另存保存):模板文件