鲲鹏社区首页
中文
注册
开发者
我要评分
获取效率
正确性
完整性
易理解
在线提单
论坛求助

(DataStream场景)使能OmniStream

在DataStream场景下,详细描述从启动Flink集群到完成OmniStream使能的操作步骤。

  1. 可选:如果是在多Task Manager场景下运行DataStream任务,需要在flink-conf.yaml文件中添加配置omni.batch: true,以提升多该场景下的shuffle效率,以达到更优性能。
    1. 打开“/usr/local/flink/conf/flink-conf.yaml”文件。
      1
      vi /usr/local/flink/conf/flink-conf.yaml
      
    2. “i”进入编辑模式,增加如下配置。
      omni.batch: true
    3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  2. 进入flink_jm_8c32g容器,启动Flink集群。
    docker exec -it flink_jm_8c32g /bin/bash
    source /etc/profile
    cd /usr/local/flink-1.16.3/bin
    ./start-cluster.sh

    每次退出并重新进入容器后,需要执行source /etc/profile命令重新注入环境变量,避免运行任务找不到依赖组件。

  3. 查看Job Manager和Task Manager是否启动成功。
    1. 在flink_jm_8c32g容器中查看是否存在StandaloneSessionClusterEntrypoint进程。
      source /etc/profile
      jps

      存在StandaloneSessionClusterEntrypoint进程,表示Job Manager启动成功。

    2. 分别进入flink_tm1_8c32g、flink_tm2_8c32g容器查看是否存在TaskManagerRunner进程。下述命令以flink_tm1_8c32g容器为例。
      docker exec -it flink_tm1_8c32g /bin/bash
      source /etc/profile
      jps

      存在TaskManagerRunner进程,表示Task Manager启动成功。

  4. 创建并配置Kafka消费者和生产者配置文件。
    1. 进入flink_tm1_8c32g容器。
      docker exec -it flink_tm1_8c32g /bin/bash
    2. 创建“/opt/conf”目录。
      mkdir /opt/conf
      cd /opt/conf
    3. 新增Kafka消费者配置文件kafka_consumer.conf。
      fetch.queue.backoff.ms=20
      group.id=omni
      max.poll.records=10000
    4. 新增Kafka生产者配置文件kafka_producer.conf。
      queue.buffering.max.messages=2000000
      queue.buffering.max.kbytes=20971520
      queue.buffering.max.ms=5
      linger.ms=5
      batch.num.messages=200000
      batch.size=3145728
      max.push.records=10000
    5. 进入flink_tm2_8c32g,执行步骤4.b4.d
      docker exec -it flink_tm1_8c32g /bin/bash
  5. 在物理机上启动ZooKeeper和Kafka,详情请参见《Kafka 部署指南》。
  6. 使用Kafka创建Topic并生成数据。

    实际操作过程中,请将命令或脚本中所有物理机IP地址替换为实际的Kafka服务端的IP地址。

    1. 创建Source和Sink的Topic。
      cd /usr/local/kafka
      bin/kafka-topics.sh --create --bootstrap-server Kafka服务端的物理机IP地址:9092 --replication-factor 1 --partitions 1 --topic source_abcd
      bin/kafka-topics.sh --create --bootstrap-server Kafka服务端的物理机IP地址:9092 --replication-factor 1 --partitions 1 --topic result
    2. 将下面的内容保存为脚本文件producer.sh。
      #!/bin/bash
      
      # Kafka安装目录(请根据实际路径修改)
      KAFKA_HOME="/usr/local/kafka"
      TOPIC_NAME="source_abcd"  # Kafka Topic名称
      BROKER="IP地址:9092"  # Kafka Broker服务端的IP地址
      MESSAGE_COUNT=10             # 发送消息条数
      
      # 检查Kafka console-producer.sh是否存在
      if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then
          echo "错误: 未找到 kafka-console-producer.sh,请检查 KAFKA_HOME路径"
          exit 1
      fi
      
      # 生成随机字符串并发送到Kafka
      for ((i=1; i<=$MESSAGE_COUNT; i++)); do
          # 生成4个随机字母(大小写混合) + 空格 + 1
          RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1)
          MESSAGE="${RAND_STR} 1" # 格式: 4字母 + 空格 + 1
      
      # 调用Kafka Producer发送消息
          echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \
              --bootstrap-server "$BROKER" \
              --topic "$TOPIC_NAME"
          echo "已发送: $MESSAGE"
      done
    3. 执行脚本文件,生成测试数据并写入Source Topic。
      ./producer.sh
  7. 构建作业JAR包。
    1. 进入物理机“/opt”路径,创建“/opt/job/src/main/java/com/huawei/boostkit”路径。
      mkdir -p /opt/job/src/main/java/com/huawei/boostkit
      cd /opt/job/
    2. 创建Flink作业Java文件。
      1. 打开“/opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java”.
        vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java
      2. “i”进入编辑模式,添加如下内容。
         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        package com.huawei.boostkit;
        
        import org.apache.flink.api.common.eventtime.WatermarkStrategy;
        import org.apache.flink.api.common.serialization.SimpleStringSchema;
        import org.apache.flink.connector.base.DeliveryGuarantee;
        import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
        import org.apache.flink.connector.kafka.sink.KafkaSink;
        import org.apache.flink.connector.kafka.source.KafkaSource;
        import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.common.serialization.ByteArrayDeserializer;
        
        import java.util.Properties;
        
        public class FlinkWordCount {
            public static void main(String[] args) throws Exception {
                String broker = "ip:port";
                String sourceTopic = "source_abcd";
                String targetTopic = "result";
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                Properties properties = new Properties();
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    ByteArrayDeserializer.class.getCanonicalName());
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    ByteArrayDeserializer.class.getCanonicalName());
                KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers(broker)
                    .setTopics(sourceTopic)
                    .setGroupId("your-group-id")
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .setProperties(properties)
                    .build();
                
                properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
                properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.ByteArraySerializer");
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.ByteArraySerializer");
                properties.put(ProducerConfig.ACKS_CONFIG, "0");
                properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
                properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator");
                KafkaSink<String> sink = KafkaSink.<String>builder()
                    .setBootstrapServers(broker)
                    .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                            .setTopic(targetTopic)
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build())
                    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .setKafkaProducerConfig(properties)
                    .build();
                DataStream<String> source;
                source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining();
                SingleOutputStreamOperator<String> result = source.map(line ->
                    line
                );
                result.sinkTo(sink);
                result.disableChaining();
                env.execute("Wordcount");
            }
        }
        
      3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
    3. 创建pom.xml文件。
      1. 打开“/opt/job/pom.xml”.
        vi /opt/job/pom.xml
      2. “i”进入编辑模式,添加如下内容。
          1
          2
          3
          4
          5
          6
          7
          8
          9
         10
         11
         12
         13
         14
         15
         16
         17
         18
         19
         20
         21
         22
         23
         24
         25
         26
         27
         28
         29
         30
         31
         32
         33
         34
         35
         36
         37
         38
         39
         40
         41
         42
         43
         44
         45
         46
         47
         48
         49
         50
         51
         52
         53
         54
         55
         56
         57
         58
         59
         60
         61
         62
         63
         64
         65
         66
         67
         68
         69
         70
         71
         72
         73
         74
         75
         76
         77
         78
         79
         80
         81
         82
         83
         84
         85
         86
         87
         88
         89
         90
         91
         92
         93
         94
         95
         96
         97
         98
         99
        100
        101
        102
        103
        104
        105
        106
        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
        
            <groupId>com.huawei.boostkit</groupId>
            <artifactId>ziliao</artifactId>
            <version>1.0-SNAPSHOT</version>
            <packaging>jar</packaging>
        
            <properties>
               <flink.version>1.16.3</flink.version>
               <maven.compiler.source>1.8</maven.compiler.source>
               <maven.compiler.target>1.8</maven.compiler.target>
            </properties>
        
            <dependencies>
               <!-- Flink dependencies -->
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kafka</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                  <version>2.5.0</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-statebackend-rocksdb</artifactId>
                  <version>1.16.3</version>
               </dependency>
            </dependencies>
        
            <build>
               <plugins>
                  <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-assembly-plugin</artifactId>
                     <version>3.3.0</version>
                     <configuration>
                        <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                     </configuration>
                     <executions>
                        <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                              <goal>single</goal>
                           </goals>
                        </execution>
                     </executions>
                  </plugin>
               </plugins>
            </build>
        
        </project>
        
      3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
    4. 执行mvn clean package打包命令后,将会在target目录下生成ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar。再将该JAR包上传到flink_jm_8c32g容器的“/usr/local/flink”目录。
      mvn clean package
      docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
  8. 在flink_jm_8c32g容器导出环境变量。
    export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH
    export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$C_INCLUDE_PATH
    export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH
    export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH
  9. 修改UDF配置文件。
    1. 设置运行用例包名udf_package和主类名main_class
      vim /opt/udf-trans-opt/udf-translator/conf/udf_tune.properties
    2. “i”进入编辑模式,修改udf_package和main_class,修改为以下内容。
      udf_package=com.huawei.boostkit
      main_class=com.huawei.boostkit.FlinkWordCount
    3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  10. 翻译测试用例JAR包。
    sh /opt/udf-trans-opt/udf-translator/bin/udf_translate.sh /usr/local/flink/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink
  11. 在flink_jm_8c32g容器提交作业。
    cd /usr/local/flink
    bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar
  12. 查看Sink Topic的数据。
    1. 消费Kafka数据查看作业是否正常运行。
      cd /usr/local/kafka
      bin/kafka-console-consumer.sh --bootstrap-server Kafka服务端的物理机IP地址:9092 --topic result --from-beginning

  13. 在flink_jm_8c32g容器上查看最新的Flink客户端日志flink-root-client-xxx.log。
    1
    cd /usr/local/flink-1.16.3/log
    

    确认无报错信息,表示已经成功使能OmniStream。