Skip to main content

ExecuteGroovyScript

用于在编排流程中执行 Groovy 脚本,Groovy 是一种基于 JVM(Java虚拟机)的脚本语言,能够与 Java 代码很好地结合,也能用于扩展现有代码。由于其运行在 JVM 上的特性,Groovy 可以使用其他 Java 语言编写的库

一、组件属性

1

  • 脚本内容:Groovy 脚本内容,与「脚本文件」参数互斥,只能使用一个

  • 附加 Classpath:脚本中需要引用的额外 jar 包,多个文件用分号分隔,也可使用通配符**.jar来匹配文件

  • 其他配置可以保持默认,需要时参考属性说明自行配置即可

二、Script Bindings

variabletypedescription
sessionProcessSession用于读取、修改和输出 FlowFile 的会话
contextProcessContextFlowFile上下文(几乎不会使用用)
logComponentLog组件的日志对象
REL_SUCCESSRelationshipsuccess 连线
REL_FAILURERelationshipfailure 连线
CTLjava.util.HashMap<String,ControllerService>使用CTL.*前缀定义的属性可以访问到全部类型的控制器服务
SQLjava.util.HashMap<String, [groovy.sql.Sql](http://docs.groovy-lang.org/latest/html/api/groovy/sql/Sql.html)>使用 SQL.*前缀定义的属性可以访问 DBCPConnectPool 类型的控制器服务
RecordReaderjava.util.HashMap<String,RecordReaderFactory>使用RecordReader.*前缀定义的属性可以访问 RecordReader 类型的控制器服务
RecordWriterjava.util.HashMap<String,RecordSetWriterFactory>使用RecordWriter.*前缀定义的属性可以访问 RecordSetWriterFactory 类型的控制器服务
Dynamic processor propertiesPropertyDescriptor所有不以CTL.SQL.开头的自定义属性都绑定到脚本变量

三、脚本语法

3.1 FlowFile属性

  • 新增属性

    def flowFile = session.get()
    def name = "le.zw"
    //写法 1
    flowFile.author = name
    //写法 2 (属性名中需要带特殊符号的情况)
    flowFile.'author.name' = 'le.zw'
    //写法 3
    flowFile.putAttribute("author", name)
    //写法 4
    flowFile = session.putAttribute(flowFile, "author", name)
    REL_SUCCESS << flowFile
  • 移除属性

    //FlowFile中已有一个属性 author=le.zw
    def flowFile = session.get()
    //写法 1
    flowFile.author = null
    //写法 2
    flowFile = session.removeAttribute(flowFile, "author")
    REL_SUCCESS << flowFile
  • 获取属性值

    //FlowFile中已有一个属性 author=le.zw
    def flowFile = session.get()
    //写法 1
    String a = flowFile.author
    //写法 2
    String a = flowFile.getAttribute("author")
    REL_SUCCESS << flowFile

3.2 FlowFile内容

  • 读流文件内容

    def flowFile = session.get()
    if (flowFile == null) {
    return
    }
    //写法 1:读取流文件内容为 InputStream
    InputStream i = flowFile.read()
    //写法 2:读取流文件内容为 JSON
    def json = new groovy.json.JsonSlurper().parse(flowFile.read())
    //写法 3:读取流文件内容为 String
    String text = flowFile.read().getText("UTF-8")
    REL_SUCCESS << flowFile
  • 写流文件内容

    def flowFile = session.get()
    if (flowFile == null) {
    return
    }
    // 写法 1 直接输出 String
    flowFile.write("UTF-8", "写的流文件内容")
    // 写法 2 通过 java.io.Writer 来写
    flowFile.write("UTF-8"){writer->
    do something with java.io.Writer...
    }
    // 写法 3 通过 OutPutStream 来写
    flowFile.write{outStream->
    do something with output stream...
    }
    // 写法 4 通过 InputStream 读,处理完再通过 OutputStream 写
    flowFile.write{inStream, outStream->
    do something with input and output streams...
    }
    REL_SUCCESS << flowFile

3.3 输出到连线

输出连线只有2条,无法自定义

  • 输出到 success 连线

    def flowFile = session.get()
    // 写法 1
    REL_SUCCESS << flowFile
    // 写法 2
    flowFile.transfer(REL_SUCCESS)
    // 写法 3
    session.transfer(flowFile, REL_SUCCESS)
  • 同时输出到 success 连线和 failure 连线

    def flowFile = session.get()
    //流文件被输出到一条连线之后,就不能再被其他对象消费了,如果需要输出到多条连线,可以通过 clone 操作复制出多条数据流(clone 需要在原 flowFile 被消费之前进行)
    def flowFile_clone = session.clone(flowFile)
    flowFile.write("UTF-8", "输出到 success 连线的数据")
    REL_SUCCESS << flowFile
    flowFile_clone.write("UTF-8", "输出到 failure 连线的数据")
    REL_FAILURE << flowFile_clone

3.4 模块变量

  • 读取模块变量

    def flowFile = session.get()
    def appKey = context.newPropertyValue('${appKey}').evaluateAttributeExpressions().getValue()
    flowFile.write("UTF-8", appKey)
    REL_SUCCESS << flowFile

3.5 SQL.* 和 CTL.*

  • SQL.*
    • SQL必须大写
    • 仅支持 DBCPConnectionPool 类型的 ControllerService

image-20220607165129518

import groovy.sql.Sql 

//定义名为 SQL.mysql 的属性连接到 DBCPConnectionPool 控制器服务
//使用准备好的语句从数据库中读取值
//并创建属性值 db.yesterday
def daysAdd = -1
def row = SQL.mysql.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual")
flowFile.'db.yesterday' = row.DB_DATE

//使用数据库中的 BLOB 和 CLOB
//使用参数转换使用 groovy.sql.Sql.BLOB(Stream) 和 groovy.sql.Sql.CLOB(Reader)

//将流文件的内容写入数据库 blob
flowFile .read{ rawIn->
def parms = [
p_id : flowFile.ID as Long, //获取名为 ID 的流文件属性作为 SQL 变量p_id
p_data: Sql.BLOB(rawIn), //使用流文件内容作为 SQL 变量p_data
]
SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id")
}
  • CTL.*
    • CTL必须大写
    • 支持所有类型的 ControllerService

image-20220607165129518

四、流程模板

参见模板文件