Rate This Document
Findability
Accuracy
Completeness
Readability

Starting OmniStateStore

  1. Configure the installation information.

    Set the necessary OmniStateStore configuration items in the flink-conf.yaml file in the conf subdirectory of Flink based on the service usage and deployment environment. For details about the configuration items, see 1.

    Add or modify the following configuration items in ${FLINK_HOME}/conf/flink-conf.yaml to enable OmniStateStore. Update the configuration files of the JobManager and all TaskManagers.

    Table 1 Configuration items

    Configuration Item

    Description

    Example

    Remarks

    state.backend

    Open-source Flink parameter, which is used to configure the state backend.

    com.huawei.ock.bss.OckDBStateBackendFactory

    This configuration item determines the state backend type. Verify that the value is case-sensitive and correctly spelled.

    state.backend.ockdb.localdir

    Local OmniStateStore state data path.

    /usr/local/flink/ockdb

    Check that the path exists and the Flink run user has the read and write permissions on the path.

    state.backend.ockdb.jni.logfile

    OmniStateStore log path.

    /usr/local/flink/log/kv.log

    The Flink log path is recommended for this path.

    Configuration item format: ${Configuration item name} + ${Colon} + ${Space} + ${Configuration item value}, for example, Figure 1.

    Figure 1 Example configuration item
  2. Enable the MapState iterator optimization feature.

    Optimizations for Flink include optimizing the mechanism of accessing the states of the Join operator (JoinRecordStateViews) and TopN operator (AppendOnlyTopNFunction). The traversal access of service operators to the MapState iterator is changed to directly invoke the Get API of ValueState and the precise key value query of MapState. In this way, the status access path is updated from "full traversal" to "precise location", dramatically reducing the state management overhead.

    • When the MapState iterator optimization feature is enabled, changes to the storage state type prevent state data from being migrated between OmniStateStore and other state backends using savepoints.
    • If the Flink service does not contain the operator logic or you choose to use a customized operator, this optimization feature cannot be enabled.
    The JAR package name is flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-${flink.version}.jar. In this example, the version is 1.16.3.
    1. Add the following configuration to the ${FLINK_HOME}/conf/flink-conf.yaml configuration file of the TaskManagers:
      env.java.opts: -javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar
    2. Add the following configuration to the ${FLINK_HOME}/conf/flink-conf.yaml configuration file of the JobManager:
      env.java.opts: -javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar
    3. Add the following startup parameters to the ${FLINK_HOME}/bin/sql-client.sh script of the JobManager:
      -javaagent:$FLINK_LIB_DIR/flink-boost-statebackend-1.0.0-SNAPSHOT-for-flink-1.16.3.jar

      Add this parameter to the exec command line in the elif branch of the script. See Figure 2.

      Figure 2 exec command line example
  3. Start a Flink task and view the configuration items in the log to check whether the configuration is successful.
  4. Run the ${FLINK_HOME}/examples/streaming/WordCount.jar sample program. If "OmniStateStore service start success" is displayed in the TaskManager log, OmniStateStore is started successfully.