ExecuteGroovyScript
用于在编排流程中执行 Groovy 脚本,Groovy 是一种基于 JVM(Java虚拟机)的脚本语言,能够与 Java 代码很好地结合,也能用于扩展现有代码。由于其运行在 JVM 上的特性,Groovy 可以使用其他 Java 语言编写的库
一、组件属性
脚本内容:Groovy 脚本内容,与「脚本文件」参数互斥,只能使用一个
附加 Classpath:脚本中需要引用的额外 jar 包,多个文件用分号分隔,也可使用通配符
*
,*.jar
来匹配文件其他配置可以保持默认,需要时参考属性说明自行配置即可
二、Script Bindings
variable | type | description |
---|---|---|
session | ProcessSession | 用于读取、修改和输出 FlowFile 的会话 |
context | ProcessContext | FlowFile上下文(几乎不会使用用) |
log | ComponentLog | 组件的日志对象 |
REL_SUCCESS | Relationship | success 连线 |
REL_FAILURE | Relationship | failure 连线 |
CTL | java.util.HashMap<String,ControllerService> | 使用CTL.* 前缀定义的属性可以访问到全部类型的控制器服务 |
SQL | java.util.HashMap<String, [groovy.sql.Sql](http://docs.groovy-lang.org/latest/html/api/groovy/sql/Sql.html)> | 使用 SQL.* 前缀定义的属性可以访问 DBCPConnectPool 类型的控制器服务 |
RecordReader | java.util.HashMap<String,RecordReaderFactory> | 使用RecordReader.* 前缀定义的属性可以访问 RecordReader 类型的控制器服务 |
RecordWriter | java.util.HashMap<String,RecordSetWriterFactory> | 使用RecordWriter.* 前缀定义的属性可以访问 RecordSetWriterFactory 类型的控制器服务 |
Dynamic processor properties | PropertyDescriptor | 所有不以CTL. 或SQL. 开头的自定义属性都绑定到脚本变量 |
三、脚本语法
3.1 FlowFile属性
新增属性
def flowFile = session.get()
def name = "le.zw"
//写法 1
flowFile.author = name
//写法 2 (属性名中需要带特殊符号的情况)
flowFile.'author.name' = 'le.zw'
//写法 3
flowFile.putAttribute("author", name)
//写法 4
flowFile = session.putAttribute(flowFile, "author", name)
REL_SUCCESS << flowFile移除属性
//FlowFile中已有一个属性 author=le.zw
def flowFile = session.get()
//写法 1
flowFile.author = null
//写法 2
flowFile = session.removeAttribute(flowFile, "author")
REL_SUCCESS << flowFile获取属性值
//FlowFile中已有一个属性 author=le.zw
def flowFile = session.get()
//写法 1
String a = flowFile.author
//写法 2
String a = flowFile.getAttribute("author")
REL_SUCCESS << flowFile
3.2 FlowFile内容
读流文件内容
def flowFile = session.get()
if (flowFile == null) {
return
}
//写法 1:读取流文件内容为 InputStream
InputStream i = flowFile.read()
//写法 2:读取流文件内容为 JSON
def json = new groovy.json.JsonSlurper().parse(flowFile.read())
//写法 3:读取流文件内容为 String
String text = flowFile.read().getText("UTF-8")
REL_SUCCESS << flowFile写流文件内容
def flowFile = session.get()
if (flowFile == null) {
return
}
// 写法 1 直接输出 String
flowFile.write("UTF-8", "写的流文件内容")
// 写法 2 通过 java.io.Writer 来写
flowFile.write("UTF-8"){writer->
do something with java.io.Writer...
}
// 写法 3 通过 OutPutStream 来写
flowFile.write{outStream->
do something with output stream...
}
// 写法 4 通过 InputStream 读,处理完再通过 OutputStream 写
flowFile.write{inStream, outStream->
do something with input and output streams...
}
REL_SUCCESS << flowFile
3.3 输出到连线
输出连线只有2条,无法自定义
输出到 success 连线
def flowFile = session.get()
// 写法 1
REL_SUCCESS << flowFile
// 写法 2
flowFile.transfer(REL_SUCCESS)
// 写法 3
session.transfer(flowFile, REL_SUCCESS)同时输出到 success 连线和 failure 连线
def flowFile = session.get()
//流文件被输出到一条连线之后,就不能再被其他对象消费了,如果需要输出到多条连线,可以通过 clone 操作复制出多条数据流(clone 需要在原 flowFile 被消费之前进行)
def flowFile_clone = session.clone(flowFile)
flowFile.write("UTF-8", "输出到 success 连线的数据")
REL_SUCCESS << flowFile
flowFile_clone.write("UTF-8", "输出到 failure 连线的数据")
REL_FAILURE << flowFile_clone
3.4 模块变量
读取模块变量
def flowFile = session.get()
def appKey = context.newPropertyValue('${appKey}').evaluateAttributeExpressions().getValue()
flowFile.write("UTF-8", appKey)
REL_SUCCESS << flowFile
3.5 SQL.* 和 CTL.*
- SQL.*
- SQL必须大写
- 仅支持 DBCPConnectionPool 类型的 ControllerService
import groovy.sql.Sql
//定义名为 SQL.mysql 的属性连接到 DBCPConnectionPool 控制器服务
//使用准备好的语句从数据库中读取值
//并创建属性值 db.yesterday
def daysAdd = -1
def row = SQL.mysql.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual")
flowFile.'db.yesterday' = row.DB_DATE
//使用数据库中的 BLOB 和 CLOB
//使用参数转换使用 groovy.sql.Sql.BLOB(Stream) 和 groovy.sql.Sql.CLOB(Reader)
//将流文件的内容写入数据库 blob
flowFile .read{ rawIn->
def parms = [
p_id : flowFile.ID as Long, //获取名为 ID 的流文件属性作为 SQL 变量p_id
p_data: Sql.BLOB(rawIn), //使用流文件内容作为 SQL 变量p_data
]
SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id")
}
- CTL.*
- CTL必须大写
- 支持所有类型的 ControllerService
四、流程模板
参见模板文件