Starting OmniStateStore
- Configure the installation information.
Set the necessary OmniStateStore configuration items in the flink-conf.yaml file in the conf subdirectory of Flink based on the service usage and deployment environment. For details about the configuration items, see 1.
Add or modify the following configuration items in ${FLINK_HOME}/conf/flink-conf.yaml to enable OmniStateStore. Update the configuration files of the JobManager and all TaskManagers.
Table 1 Configuration items Configuration Item
Description
Example
Remarks
state.backend
Open-source Flink parameter, which is used to configure the state backend.
com.huawei.ock.bss.OckDBStateBackendFactory
This configuration item determines the state backend type. Verify that the value is case-sensitive and correctly spelled.
state.backend.ockdb.localdir
Local OmniStateStore state data path.
/usr/local/flink/ockdb
Check that the path exists and the Flink run user has the read and write permissions on the path.
state.backend.ockdb.jni.logfile
OmniStateStore log path.
/usr/local/flink/log/kv.log
The Flink log path is recommended for this path.
Configuration item format: ${Configuration item name} + ${Colon} + ${Space} + ${Configuration item value}, for example, Figure 1.
- Enable the MapState iterator optimization feature.
Optimizations for Flink include optimizing the mechanism of accessing the states of the Join operator (JoinRecordStateViews) and TopN operator (AppendOnlyTopNFunction). The traversal access of service operators to the MapState iterator is changed to directly invoke the Get API of ValueState and the precise key value query of MapState. In this way, the status access path is updated from "full traversal" to "precise location", dramatically reducing the state management overhead.
- When the MapState iterator optimization feature is enabled, changes to the storage state type prevent state data from being migrated between OmniStateStore and other state backends using savepoints.
- If the Flink service does not contain the operator logic or you choose to use a customized operator, this optimization feature cannot be enabled.
The JAR package name is flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-${flink.version}.jar. In this example, the version is 1.16.3.- Add the following configuration to the ${FLINK_HOME}/conf/flink-conf.yaml configuration file of the TaskManagers:
env.java.opts: -javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar
- Add the following configuration to the ${FLINK_HOME}/conf/flink-conf.yaml configuration file of the JobManager:
env.java.opts: -javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar
- Add the following startup parameters to the ${FLINK_HOME}/bin/sql-client.sh script of the JobManager:
-javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar
Add this parameter to the exec command line in the elif branch of the script. See Figure 2.
- Start a Flink task and view the configuration items in the log to check whether the configuration is successful.
- Run the ${FLINK_HOME}/examples/streaming/WordCount.jar sample program. If "OmniStateStore service start success" is displayed in the TaskManager log, OmniStateStore is started successfully.

