我要评分
获取效率
正确性
完整性
易理解

DataStream

  1. Access the flink_jm_8c32g container and start the JobManager and TaskManager in cluster mode. Run the source /etc/profile command each time you access the container again.
    docker exec -it flink_jm_8c32g /bin/bash
    source /etc/profile
    cd /usr/local/flink-1.16.3/bin
    ./start-cluster.sh
  2. In the flink_tm1_8c32g and flink_tm2_8c32g containers, create the /opt/conf directory and a configuration file.
    mkdir /opt/conf
    cd /opt/conf
    1. Create the Kafka consumer configuration file kafka_consumer.conf.
      fetch.queue.backoff.ms=20
      group.id=omni
      max.poll.records=10000
    2. Create the Kafka producer configuration file kafka_producer.conf.
      queue.buffering.max.messages=2000000
      queue.buffering.max.kbytes=20971520
      queue.buffering.max.ms=5
      linger.ms=5
      batch.num.messages=200000
      batch.size=3145728
      max.push.records=10000
  3. Check whether the JobManager and TaskManager are started.
    1. Check the flink_jm_8c32g container for the StandaloneSessionClusterEntrypoint process.
      source /etc/profile
      jps

    2. Check the flink_tm1_8c32g and flink_tm2_8c32g containers for the TaskManagerRunner process. The following uses flink_tm1_8c32g as an example.
      docker exec -it flink_tm1_8c32g /bin/bash
      source /etc/profile
      jps

  4. Start Zookeeper and Kafka on the physical machine. For details, see Kafka Deployment Guide.
  5. Use Kafka to create topics and generate data.
    1. Create topics for the source and sink.
      cd /usr/local/kafka
      bin/kafka-topics.sh --create --bootstrap-server Physical_machine_IP_address:9092 --replication-factor 1 --partitions 1 --topic source_abcd
      bin/kafka-topics.sh --create --bootstrap-server Physical_machine_IP_address:9092 --replication-factor 1 --partitions 1 --topic result
    2. Save the following content as the producer.sh script and run the script to import data to the source topic.
      #!/bin/bash
      
      # Kafka installation directory (Replace the example directory with the actual one.)
      KAFKA_HOME="/usr/local/kafka"
      TOPIC_NAME="source_abcd"  # Kafka topic name
      BROKER="Physical_machine_IP_address:9092"  # Kafka Broker address
      MESSAGE_COUNT=10             # Number of sent messages
      
      # Check whether Kafka console-producer.sh exists.
      if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then
          echo "Error: kafka-console-producer.sh was not found. Check the KAFKA_HOME path."
          exit 1
      fi
      
      # Generate a random character string and send it to Kafka.
      for ((i=1; i<=$MESSAGE_COUNT; i++)); do
          # Generate four random letters (case-sensitive) + Space + 1.
          RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1)
          MESSAGE="${RAND_STR} 1"  # Format: 4 letters + Space + 1
      
          # Invoke the Kafka producer to send messages.
          echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \
              --bootstrap-server "$BROKER" \
              --topic "$TOPIC_NAME"
          echo "Sent: $MESSAGE"
      done
  6. Build a job JAR package.
    1. Go to the /opt directory of the physical machine and create the /opt/job/src/main/java/com/huawei/boostkit directory.
      mkdir -p /opt/job/src/main/java/com/huawei/boostkit
      cd /opt/job/
    2. Create a Java file for the Flink Job.
      1. Open /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java.
        vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java
      2. Press i to enter the insert mode and add the following content:
        package com.huawei.boostkit;
        
        import org.apache.flink.api.common.eventtime.WatermarkStrategy;
        import org.apache.flink.api.common.serialization.SimpleStringSchema;
        import org.apache.flink.connector.base.DeliveryGuarantee;
        import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
        import org.apache.flink.connector.kafka.sink.KafkaSink;
        import org.apache.flink.connector.kafka.source.KafkaSource;
        import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.common.serialization.ByteArrayDeserializer;
        
        import java.util.Properties;
        
        public class FlinkWordCount {
            public static void main(String[] args) throws Exception {
                String broker = "ip:port";
                String sourceTopic = "source_abcd";
                String targetTopic = "result";
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                Properties properties = new Properties();
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    ByteArrayDeserializer.class.getCanonicalName());
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    ByteArrayDeserializer.class.getCanonicalName());
                KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers(broker)
                    .setTopics(sourceTopic)
                    .setGroupId("your-group-id")
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .setProperties(properties)
                    .build();
                
                properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
                properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.ByteArraySerializer");
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.ByteArraySerializer");
                properties.put(ProducerConfig.ACKS_CONFIG, "0");
                properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
                properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator");
                KafkaSink<String> sink = KafkaSink.<String>builder()
                    .setBootstrapServers(broker)
                    .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                            .setTopic(targetTopic)
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build())
                    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .setKafkaProducerConfig(properties)
                    .build();
                DataStream<String> source;
                source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining();
                SingleOutputStreamOperator<String> result = source.map(line ->
                    line
                );
                result.sinkTo(sink);
                result.disableChaining();
                env.execute("Wordcount");
            }
        }
    3. Create a POM file.
      1. Open /opt/job/pom.xml.
        vi /opt/job/pom.xml
      2. Press i to enter the insert mode and add the following content:
        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
        
            <groupId>com.huawei.boostkit</groupId>
            <artifactId>ziliao</artifactId>
            <version>1.0-SNAPSHOT</version>
            <packaging>jar</packaging>
        
            <properties>
               <flink.version>1.16.3</flink.version>
               <maven.compiler.source>1.8</maven.compiler.source>
               <maven.compiler.target>1.8</maven.compiler.target>
            </properties>
        
            <dependencies>
               <!-- Flink dependencies -->
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kafka</artifactId>
                  <version>${flink.version}</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                  <version>2.5.0</version>
                  <exclusions>
                     <exclusion>
                        <groupId>org.lz4</groupId>
                        <artifactId>lz4</artifactId>
                     </exclusion>
                  </exclusions>
               </dependency>
               <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-statebackend-rocksdb</artifactId>
                  <version>1.16.3</version>
               </dependency>
            </dependencies>
        
            <build>
               <plugins>
                  <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-assembly-plugin</artifactId>
                     <version>3.3.0</version>
                     <configuration>
                        <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                     </configuration>
                     <executions>
                        <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                              <goal>single</goal>
                           </goals>
                        </execution>
                     </executions>
                  </plugin>
               </plugins>
            </build>
        
        </project>
    4. Run the mvn clean package command. The ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar file is generated in the target directory. Upload the file to the /usr/local/flink directory of the flink_jm_8c32g container.
      mvn clean package
      docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
  7. Export environment variables from the flink_jm_8c32g container.
    export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH
    export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/usr/local/ksl/include:$C_INCLUDE_PATH
    export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH
    export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH
  8. Submit the job from the flink_jm_8c32g container.
    cd /usr/local/flink
    bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar
  9. View the sink topic data.
    1. Consume Kafka data and check whether the job is running properly.
      cd /usr/local/kafka
      bin/kafka-console-consumer.sh --bootstrap-server Physical_machine_IP_address:9092 --topic result --from-beginning

  10. In the flink_jm_8c32g container, view the latest Flink client log flink-root-client-xxx.log.
    1
    cd /usr/local/flink-1.16.3/log