鲲鹏社区首页
中文
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助

DataStream使能特性

  1. 进入flink_jm_8c32g容器,集群模式启动JobManager和TaskManager,每次重新进入容器后需要执行source /etc/profile命令。
    docker exec -it flink_jm_8c32g /bin/bash
    source /etc/profile
    cd /usr/local/flink-1.16.3/bin
    ./start-cluster.sh
  2. 进入flink_tm1_8c32g和flink_tm2_8c32g创建“/opt/conf”目录并新增配置文件。
    mkdir /opt/conf
    cd /opt/conf
    1. 新增Kafka消费者配置文件kafka_consumer.conf。
      fetch.queue.backoff.ms=20
      group.id=omni
      max.poll.records=10000
    2. 新增Kafka生产者配置文件kafka_producer.conf。
      queue.buffering.max.messages=2000000
      queue.buffering.max.kbytes=20971520
      queue.buffering.max.ms=5
      linger.ms=5
      batch.num.messages=200000
      batch.size=3145728
      max.push.records=10000
  3. 查看JobManager和TaskManager是否启动成功。
    1. 在flink_jm_8c32g容器中查看是否存在StandaloneSessionClusterEntrypoint进程。
      source /etc/profile
      jps

    2. 分别进入flink_tm1_8c32g、flink_tm2_8c32g容器查看是否存在TaskManagerRunner进程,下面以flink_tm1_8c32g容器为例。
      docker exec -it flink_tm1_8c32g /bin/bash
      source /etc/profile
      jps

  4. 在物理机上启动Zookeeper、Kafka,详情请参见《Kafka 部署指南》。
  5. 使用kafka创建topic并生成数据。
    1. 创建Source和Sink的topic。
      cd /usr/local/kafka
      bin/kafka-topics.sh --create --bootstrap-server 物理机ip:9092 --replication-factor 1 --partitions 1 --topic source_abcd
      bin/kafka-topics.sh --create --bootstrap-server 物理机ip:9092 --replication-factor 1 --partitions 1 --topic result
    2. 将下面的内容保存为脚本文件producer.sh,并执行用于向source topic导入数据。
      #!/bin/bash
      
      # Kafka 安装目录(请根据实际路径修改)
      KAFKA_HOME="/usr/local/kafka"
      TOPIC_NAME="source_abcd"  # Kafka Topic 名称
      BROKER="物理机ip:9092"  # Kafka Broker 地址
      MESSAGE_COUNT=10             # 发送消息条数
      
      # 检查 Kafka console-producer.sh 是否存在
      if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then
          echo "错误: 未找到 kafka-console-producer.sh,请检查 KAFKA_HOME 路径"
          exit 1
      fi
      
      # 生成随机字符串并发送到 Kafka
      for ((i=1; i<=$MESSAGE_COUNT; i++)); do
          # 生成 4 个随机字母(大小写混合) + 空格 + 1
          RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1)
          MESSAGE="${RAND_STR} 1"  # 格式: 4字母 + 空格 + 1
      
          # 调用 Kafka Producer 发送消息
          echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \
              --bootstrap-server "$BROKER" \
              --topic "$TOPIC_NAME"
          echo "已发送: $MESSAGE"
      done
  6. 构建作业JAR包。
    1. 进入物理机“/opt”路径,创建“/opt/job/src/main/java/com/huawei/boostkit”路径。
      mkdir -p /opt/job/src/main/java/com/huawei/boostkit
      cd /opt/job/
    2. 创建flink作业java文件
      1. 打开“/opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java”.
        vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java
      2. “i”进入编辑模式,添加如下内容。
        package com.huawei.boostkit;
        
        import org.apache.flink.api.common.eventtime.WatermarkStrategy;
        import org.apache.flink.api.common.serialization.SimpleStringSchema;
        import org.apache.flink.connector.base.DeliveryGuarantee;
        import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
        import org.apache.flink.connector.kafka.sink.KafkaSink;
        import org.apache.flink.connector.kafka.source.KafkaSource;
        import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.common.serialization.ByteArrayDeserializer;
        
        import java.util.Properties;
        
        public class FlinkWordCount {
            public static void main(String[] args) throws Exception {
                String broker = "ip:port";
                String sourceTopic = "source_abcd";
                String targetTopic = "result";
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                Properties properties = new Properties();
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    ByteArrayDeserializer.class.getCanonicalName());
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    ByteArrayDeserializer.class.getCanonicalName());
                KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers(broker)
                    .setTopics(sourceTopic)
                    .setGroupId("your-group-id")
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .setProperties(properties)
                    .build();
                
                properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
                properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.ByteArraySerializer");
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.ByteArraySerializer");
                properties.put(ProducerConfig.ACKS_CONFIG, "0");
                properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
                properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator");
                KafkaSink<String> sink = KafkaSink.<String>builder()
                    .setBootstrapServers(broker)
                    .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                            .setTopic(targetTopic)
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build())
                    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .setKafkaProducerConfig(properties)
                    .build();
                DataStream<String> source;
                source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining();
                SingleOutputStreamOperator<String> result = source.map(line ->
                    line
                );
                result.sinkTo(sink);
                result.disableChaining();
                env.execute("Wordcount");
            }
        }
    3. 创建pom文件
      1. 打开“/opt/job/pom.xml”.
        vi /opt/job/pom.xml
      2. “i”进入编辑模式,添加如下内容。
        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
        
            <groupId>com.huawei.boostkit</groupId>
            <artifactId>ziliao</artifactId>
            <version>1.0-SNAPSHOT</version>
            <packaging>jar</packaging>
        
            <properties>
               <flink.version>1.16.3</flink.version>
               <maven.compiler.source>1.8</maven.compiler.source>
               <maven.compiler.target>1.8</maven.compiler.target>
            </properties>
        
            <dependencies>
               <!-- Flink dependencies -->
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kafka</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                  <version>2.5.0</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-statebackend-rocksdb</artifactId>
                  <version>1.16.3</version>
               </dependency>
            </dependencies>
        
            <build>
               <plugins>
                  <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-assembly-plugin</artifactId>
                     <version>3.3.0</version>
                     <configuration>
                        <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                     </configuration>
                     <executions>
                        <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                              <goal>single</goal>
                           </goals>
                        </execution>
                     </executions>
                  </plugin>
               </plugins>
            </build>
        
        </project>
    4. 执行mvn clean package,target目录下会生成ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar并上传到flink_jm_8c32g容器的“/usr/local/flink”
      mvn clean package
      docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
  7. flink_jm_8c32g容器导出环境变量。
    export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH
    export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/usr/local/ksl/include:$C_INCLUDE_PATH
    export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH
    export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH
  8. flink_jm_8c32g容器提交作业。
    cd /usr/local/flink
    bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar
  9. 查看Sink topic的数据。
    1. 消费Kafka数据查看作业是否正常运行。
      cd /usr/local/kafka
      bin/kafka-console-consumer.sh --bootstrap-server 物理机ip:9092 --topic result --from-beginning

  10. 在flink_jm_8c32g容器上查看Flink最新的客户端日志flink-root-client-xxx.log
    1
    cd /usr/local/flink-1.16.3/log