Wait
概述
日常非常使用,常用于构建异步业务流程时。和Notify搭配使用。
直到Notify组件提供匹配的某个相应释放信号,将原输入的数据流输出到"wait"连线上。当使用“wait”连线循环时,建议使用优先排序(例如,先入先出)。
当识别出一个匹配的释放信号时,等待的数据流将被输出到“success”连线,并且会复制Notify组件生成释放信号的数据流中的属性,然后从缓存中删除释放信号。
如果等待的流文件超过了过期时间,数据流则输出到“expired”连线。
如果需要等待多个信号,请通过“目标信号计数”属性指定所需的信号数量。 对于将源流文件分割为多个片段(如SplitText)的组件来说,这特别有用。 为了等待所有片段都被处理,可以将“original”连线连接到Wait组件,将“split”连线连接到相应的Notify组件。 配置Notify和Wait组件的"释放信号标识符(Release Signal Identifier)"的值为'${fragment.identifier}',指定Wait组件"目标信号值"属性的值为${fragment.count}'。
属性说明
信号释放标识符
一个值或者根据属性表达式计算数据流的结果值,该值用来确定释放信号缓存键值。注意和wait配置同样的值目标信号计数
一个值或者根据属性表达式计算数据流的结果值,该值用来确定目标信号值的大小。该组件检查信号计数是否达到这个数字。如果指定了信号计数器名称,该组件将检查特定的计数器,否则将检查信号的总数。过期时间
指定多长时间后,等待中的数据流会输出到“expired”连线。缓存服务
用来检查协同组件Notify释放信号的控制器服务,缓存释放信号的控制器服务,这些信号用来释放在相应Wait组件上排队等待的流文件。访问缓存的客户端,一般是DistributedMapCacheClientService
“信号释放标识符”和其值的存储服务端一般是DistributedMapCacheServer
示例
实际业务中,经常需要业务1和业务2并发执行,在执行完毕后开启业务3的执行。使用Notify-Wait能很好解决该问题。
###配置说明 (下面和Notify组件示例一样)
GenerateFlowFile调度里调度策略使用默认“Timer driven”,运行安排:1d。这里是按照方便demo使用需要来配置的,可忽略。
业务1、业务2、业务3我们都使用ReplaceText组件代表业务。实际中可以是多个组件的具体流程。其他业务1和业务2都是由GenerateFlowFile触发执行,达到并发效果。
业务1后放置Notify组件,配置" 信号释放标识符"、"信号计数器增量"、"缓存服务"
业务2后放置Wait组件,配置" 信号释放标识符"、"目标信号计数"、"过期时间"、"缓存服务"。
在平台根的配置上搜索DistributedMapCacheServer, 保持默认配置,启动即可。该缓存服务默认端口为4557。
Notify和Wait组件使用DistributedMapCacheClientService构建缓存服务的客户端访问。
###执行结果
在notify执行前,业务1和业务2执行完毕,流程在Wait线上卡住,等待notify执行后,当前例子中目标信号计数达到1,就释放继续流程。才能开启业务3的执行。
注意,Wait和Notify使用同一个信号释放标识变量,建议用${UUID()}可以和其他不冲突
流程模板
参见附件(请右键另存保存):异步Wait示例