Skip to main content

Notify

概述

日常非常使用,常用于构建异步业务流程时。和Wait搭配使用。

提供缓存中缓存释放信号的标识符,可以选择将其与数据流的属性一起缓存。一旦在缓存中发现这个信号,在相应的wait组件持有的所有流文件都将被释放。

属性说明

  • 信号释放标识符
    一个值或者根据属性表达式计算数据流的结果值,该值用来确定释放信号缓存键值。注意和wait配置同样的值

  • 信号计数器增量
    对“信号释放标识符”的值做增量,或者根据属性表达式对FlowFile计算的结果值,该值用来确定信号计数器增量,指定计数器应该增加多少。

    例如,如果在上游流中以批处理的方式处理多个信号事件,则可以立刻用该属性通知处理的事件数量。

    0有一个特殊的含义,它将目标计数清除回0,这在与Wait组件的'可释放的数据流计数'=0模式一起使用时特别有用,以提供“开闭门”类型的流控制。1可以打开相应的Wait组件,而0则关闭它。

  • 缓存服务
    用于缓存释放信号的控制器服务,这些信号用来释放在相应Wait组件上排队等待的流文件。

    访问缓存的客户端,一般是DistributedMapCacheClientService

    “信号释放标识符”和其值的存储服务端一般是DistributedMapCacheServer

示例

实际业务中,经常需要业务1和业务2并发执行,在执行完毕后开启业务3的执行。使用Notify-Wait能很好解决该问题。

demo

###配置说明 (下面和Wait组件示例一样)

  1. GenerateFlowFile调度里调度策略使用默认“Timer driven”,运行安排:1d。这里是按照方便demo使用需要来配置的,可忽略。

  2. 业务1、业务2、业务3我们都使用ReplaceText组件代表业务。实际中可以是多个组件的具体流程。其他业务1和业务2都是由GenerateFlowFile触发执行,达到并发效果。

  3. 业务1后放置Notify组件,配置" 信号释放标识符"、"信号计数器增量"、"缓存服务" properties

  4. 业务2后放置Wait组件,配置" 信号释放标识符"、"目标信号计数"、"过期时间"、"缓存服务"。 properties

  5. 在平台根的配置上搜索DistributedMapCacheServer, 保持默认配置,启动即可。该缓存服务默认端口为4557。 cacheServer

  6. Notify和Wait组件使用DistributedMapCacheClientService构建缓存服务的客户端访问。 cacheClient

###执行结果

pre_notify

在notify执行前,业务1和业务2执行完毕,流程在Wait线上卡住,等待notify执行后,当前例子中目标信号计数达到1,就释放继续流程。才能开启业务3的执行。

注意,Wait和Notify使用同一个信号释放标识变量,建议用${UUID()}可以和其他不冲突

流程模板

参见附件(请右键另存保存):异步Notify示例