Rate This Document
Findability
Accuracy
Completeness
Readability

Enabling OmniStream for DataStream

This section describes how to start a Flink cluster and enable OmniStream in DataStream scenarios.

  1. Optional: If DataStream tasks are running in a multi-Task Manager environment, add omni.batch: true to the flink-conf.yaml file to improve shuffle efficiency and achieve better performance.
    1. Open the /usr/local/flink/conf/flink-conf.yaml file.
      1
      vi /usr/local/flink/conf/flink-conf.yaml
      
    2. Press i to enter the insert mode and add the following content to the file:
      omni.batch: true
    3. Press Esc, type :wq!, and press Enter to save the file and exit.
  2. Access the flink_jm_8c32g container and start the Flink cluster.
    docker exec -it flink_jm_8c32g /bin/bash
    source /etc/profile
    cd /usr/local/flink-1.16.3/bin
    ./start-cluster.sh

    Each time you exit and access the container again, you need to run the source /etc/profile command to reload the environment variables. This ensures that the dependencies are properly detected when running tasks.

  3. Check whether the Job Manager and Task Manager are started successfully.
    1. Check the flink_jm_8c32g container for the StandaloneSessionClusterEntrypoint process.
      source /etc/profile
      jps

      If the StandaloneSessionClusterEntrypoint process exists, the Job Manager is started successfully.

    2. Access the flink_tm1_8c32g and flink_tm2_8c32g containers and check for the TaskManagerRunner process. The following commands use the flink_tm1_8c32g container as an example:
      docker exec -it flink_tm1_8c32g /bin/bash
      source /etc/profile
      jps

      If the TaskManagerRunner process exists, the Task Manager is started successfully.

  4. Create and configure the Kafka consumer and producer configuration files.
    1. Access the flink_tm1_8c32g container.
      docker exec -it flink_tm1_8c32g /bin/bash
    2. Create an /opt/conf directory.
      mkdir /opt/conf
      cd /opt/conf
    3. Create the Kafka consumer configuration file kafka_consumer.conf.
      fetch.queue.backoff.ms=20
      group.id=omni
      max.poll.records=10000
    4. Create the Kafka producer configuration file kafka_producer.conf.
      queue.buffering.max.messages=2000000
      queue.buffering.max.kbytes=20971520
      queue.buffering.max.ms=5
      linger.ms=5
      batch.num.messages=200000
      batch.size=3145728
      max.push.records=10000
    5. Access the flink_tm2_8c32g container and perform steps 4.b to 4.d.
      docker exec -it flink_tm1_8c32g /bin/bash
  5. Start ZooKeeper and Kafka on the physical machine. For details, see Kafka Deployment Guide.
  6. Use Kafka to create topics and generate data.

    Replace all the example IP addresses of physical machines in the commands or scripts with the actual IP addresses of the Kafka servers.

    1. Create topics for the source and sink.
      cd /usr/local/kafka
      bin/kafka-topics.sh --create --bootstrap-server IP_address_of_Kafka_server's_physical_machine:9092 --replication-factor 1 --partitions 1 --topic source_abcd
      bin/kafka-topics.sh --create --bootstrap-server IP_address_of_Kafka_server's_physical_machine:9092 --replication-factor 1 --partitions 1 --topic result
    2. Save the following content as the script file producer.sh.
      #!/bin/bash
      
      # Kafka installation directory (Replace the example directory with the actual one.)
      KAFKA_HOME="/usr/local/kafka"
      TOPIC_NAME="source_abcd"  # Kafka topic name
      BROKER="IP_address:9092"  # IP address of the Kafka broker server
      MESSAGE_COUNT=10             # Number of sent messages
      
      # Check whether Kafka console-producer.sh exists.
      if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then
          echo "Error: kafka-console-producer.sh was not found. Check the KAFKA_HOME path."
          exit 1
      fi
      
      # Generate a random character string and send it to Kafka.
      for ((i=1; i<=$MESSAGE_COUNT; i++)); do
          # Generate four random letters (case-sensitive) + Space + 1.
          RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1)
          MESSAGE="${RAND_STR} 1"  # Format: 4 letters + Space + 1
      
      # Invoke the Kafka producer to send messages.
          echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \
              --bootstrap-server "$BROKER" \
              --topic "$TOPIC_NAME"
          echo "Sent: $MESSAGE"
      done
    3. Run the script to generate test data and write it to the source topic.
      ./producer.sh
  7. Build a job JAR package.
    1. Go to the /opt directory of the physical machine and create the /opt/job/src/main/java/com/huawei/boostkit directory.
      mkdir -p /opt/job/src/main/java/com/huawei/boostkit
      cd /opt/job/
    2. Create a Java file for the Flink Job.
      1. Open /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java.
        vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java
      2. Press i to enter the insert mode and add the following content:
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        package com.huawei.boostkit;
        
        import org.apache.flink.api.common.eventtime.WatermarkStrategy;
        import org.apache.flink.api.common.serialization.SimpleStringSchema;
        import org.apache.flink.connector.base.DeliveryGuarantee;
        import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
        import org.apache.flink.connector.kafka.sink.KafkaSink;
        import org.apache.flink.connector.kafka.source.KafkaSource;
        import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.common.serialization.ByteArrayDeserializer;
        
        import java.util.Properties;
        
        public class FlinkWordCount {
            public static void main(String[] args) throws Exception {
                String broker = "ip:port";
                String sourceTopic = "source_abcd";
                String targetTopic = "result";
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                Properties properties = new Properties();
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    ByteArrayDeserializer.class.getCanonicalName());
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    ByteArrayDeserializer.class.getCanonicalName());
                KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers(broker)
                    .setTopics(sourceTopic)
                    .setGroupId("your-group-id")
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .setProperties(properties)
                    .build();
                
                properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
                properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.ByteArraySerializer");
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.ByteArraySerializer");
                properties.put(ProducerConfig.ACKS_CONFIG, "0");
                properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
                properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator");
                KafkaSink<String> sink = KafkaSink.<String>builder()
                    .setBootstrapServers(broker)
                    .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                            .setTopic(targetTopic)
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build())
                    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .setKafkaProducerConfig(properties)
                    .build();
                DataStream<String> source;
                source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining();
                SingleOutputStreamOperator<String> result = source.map(line ->
                    line
                );
                result.sinkTo(sink);
                result.disableChaining();
                env.execute("Wordcount");
            }
        }
        
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
    3. Create a pom.xml file.
      1. Open /opt/job/pom.xml.
        vi /opt/job/pom.xml
      2. Press i to enter the insert mode and add the following content:
          1
          2
          3
          4
          5
          6
          7
          8
          9
         10
         11
         12
         13
         14
         15
         16
         17
         18
         19
         20
         21
         22
         23
         24
         25
         26
         27
         28
         29
         30
         31
         32
         33
         34
         35
         36
         37
         38
         39
         40
         41
         42
         43
         44
         45
         46
         47
         48
         49
         50
         51
         52
         53
         54
         55
         56
         57
         58
         59
         60
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
        
            <groupId>com.huawei.boostkit</groupId>
            <artifactId>ziliao</artifactId>
            <version>1.0-SNAPSHOT</version>
            <packaging>jar</packaging>
        
            <properties>
               <flink.version>1.16.3</flink.version>
               <maven.compiler.source>1.8</maven.compiler.source>
               <maven.compiler.target>1.8</maven.compiler.target>
            </properties>
        
            <dependencies>
               <!-- Flink dependencies -->
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kafka</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                  <version>2.5.0</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-statebackend-rocksdb</artifactId>
                  <version>1.16.3</version>
               </dependency>
            </dependencies>
        
            <build>
               <plugins>
                  <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-assembly-plugin</artifactId>
                     <version>3.3.0</version>
                     <configuration>
                        <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                     </configuration>
                     <executions>
                        <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                              <goal>single</goal>
                           </goals>
                        </execution>
                     </executions>
                  </plugin>
               </plugins>
            </build>
        
        </project>
        
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
    4. After the mvn clean package command is executed, the ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar file is generated in the target directory. Upload the JAR package to the /usr/local/flink directory in the flink_jm_8c32g container.
      mvn clean package
      docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
  8. Export environment variables from the flink_jm_8c32g container.
    export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH
    export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$C_INCLUDE_PATH
    export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH
    export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH
  9. Modify the UDF configuration file.
    1. Set the test case package name (udf_package) and main class name (main_class).
      vim /opt/udf-trans-opt/udf-translator/conf/udf_tune.properties
    2. Press i to enter the insert mode and modify udf_package and main_class as follows:
      udf_package=com.huawei.boostkit
      main_class=com.huawei.boostkit.FlinkWordCount
    3. Press Esc, type :wq!, and press Enter to save the file and exit.
  10. Translate the test case JAR package.
    sh /opt/udf-trans-opt/udf-translator/bin/udf_translate.sh /usr/local/flink/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink
  11. Submit the job from the flink_jm_8c32g container.
    cd /usr/local/flink
    bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar
  12. View the sink topic data.
    1. Consume Kafka data and check whether the job is running properly.
      cd /usr/local/kafka
      bin/kafka-console-consumer.sh --bootstrap-server IP_address_of_Kafka_server's_physical_machine:9092 --topic result --from-beginning

  13. In the flink_jm_8c32g container, view the latest Flink client log flink-root-client-xxx.log.
    1
    cd /usr/local/flink-1.16.3/log
    

    If no error information is displayed, OmniStream is enabled successfully.