启动OmniStateStore
- 配置安装信息。
根据业务使用情况和待安装部署的环境设置Flink的conf子目录下flink-conf.yaml中的相关配置项,OmniStateStore相关配置项说明参考表1 OmniStateStore配置项。
以下为使能OmniStateStore特性必须要在${FLINK_HOME}/conf/flink-conf.yaml中新增或修改的配置项,需要在JobManager和所有TaskManager的配置文件中同时修改。
表1 同步配置项说明 配置项名称
简要描述
配置示例
注意事项
state.backend
Flink开源参数,用于配置state.backend状态后端。
com.huawei.ock.bss.OckDBStateBackendFactory
此配置用于切换状态后端的类型,需要保证字符完全正确并区分大小写。
state.backend.ockdb.localdir
OmniStateStore状态数据本地存储路径。
“/usr/local/flink/ockdb”
保证路径已存在且对Flink运行用户有读写权限。
state.backend.ockdb.jni.logfile
OmniStateStore日志路径。
“/usr/local/flink/log/kv.log”
建议配置为Flink的日志目录。
配置项格式:${配置项名称} + ${英文冒号} + ${空格} + ${配置项值},如图1所示。
- 启用MapState迭代器优化特性(可选)。
Flink框架针对Join算子(JoinRecordStateViews)和TOPN算子(AppendOnlyTopNFunction)的状态访问机制进行优化,通过将业务算子对MapState迭代器的遍历访问重构为直接调用ValueState的get接口及MapState的精准键值查询,实现状态访问路径从“全量遍历”到“精准定位”的升级,有效降低状态管理开销。
- 由于存储的状态类型发生了变化,因此在启用MapState迭代器优化特性场景下,不支持通过Savepoint在OmniStateStore和其它类型的状态后端之间迁移状态数据。
- 如果运行的Flink业务不包含相关算子逻辑或者用户使用自定义的算子,也不支持使能此优化特性。
JAR包名称为flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-${flink.version}.jar 。示例中以1.16.3版本为例。- 在TaskManager的“${FLINK_HOME}/conf/flink-conf.yaml”配置文件中增加以下配置。
env.java.opts: -javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar
- 在JobManager的“${FLINK_HOME}/conf/flink-conf.yaml”配置文件中增加以下配置。
env.java.opts: -javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar
- 在JobManager的“${FLINK_HOME}/bin/sql-client.sh”脚本中增加以下启动参数。
-javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar
参考图2,将该参数添加到脚本中elif分支中的exec命令行。
- 启动Flink任务,查看日志中的配置项,检查配置是否成功。
- 执行“${FLINK_HOME}/examples/streaming/WordCount.jar”示例程序,观察到Task Manager日志中打印“OmniStateStore service start success.”,说明OmniStateStore启动成功。