DataStream
- Access the flink_jm_8c32g container and start the JobManager and TaskManager in cluster mode. Run the source /etc/profile command each time you access the container again.
docker exec -it flink_jm_8c32g /bin/bash source /etc/profile cd /usr/local/flink-1.16.3/bin ./start-cluster.sh
- In the flink_tm1_8c32g and flink_tm2_8c32g containers, create the /opt/conf directory and a configuration file.
mkdir /opt/conf cd /opt/conf
- Create the Kafka consumer configuration file kafka_consumer.conf.
fetch.queue.backoff.ms=20 group.id=omni max.poll.records=10000
- Create the Kafka producer configuration file kafka_producer.conf.
queue.buffering.max.messages=2000000 queue.buffering.max.kbytes=20971520 queue.buffering.max.ms=5 linger.ms=5 batch.num.messages=200000 batch.size=3145728 max.push.records=10000
- Create the Kafka consumer configuration file kafka_consumer.conf.
- Check whether the JobManager and TaskManager are started.
- Check the flink_jm_8c32g container for the StandaloneSessionClusterEntrypoint process.
source /etc/profile jps

- Check the flink_tm1_8c32g and flink_tm2_8c32g containers for the TaskManagerRunner process. The following uses flink_tm1_8c32g as an example.
docker exec -it flink_tm1_8c32g /bin/bash source /etc/profile jps

- Check the flink_jm_8c32g container for the StandaloneSessionClusterEntrypoint process.
- Start Zookeeper and Kafka on the physical machine. For details, see Kafka Deployment Guide.
- Use Kafka to create topics and generate data.
- Create topics for the source and sink.
cd /usr/local/kafka bin/kafka-topics.sh --create --bootstrap-server Physical_machine_IP_address:9092 --replication-factor 1 --partitions 1 --topic source_abcd bin/kafka-topics.sh --create --bootstrap-server Physical_machine_IP_address:9092 --replication-factor 1 --partitions 1 --topic result
- Save the following content as the producer.sh script and run the script to import data to the source topic.
#!/bin/bash # Kafka installation directory (Replace the example directory with the actual one.) KAFKA_HOME="/usr/local/kafka" TOPIC_NAME="source_abcd" # Kafka topic name BROKER="Physical_machine_IP_address:9092" # Kafka Broker address MESSAGE_COUNT=10 # Number of sent messages # Check whether Kafka console-producer.sh exists. if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then echo "Error: kafka-console-producer.sh was not found. Check the KAFKA_HOME path." exit 1 fi # Generate a random character string and send it to Kafka. for ((i=1; i<=$MESSAGE_COUNT; i++)); do # Generate four random letters (case-sensitive) + Space + 1. RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1) MESSAGE="${RAND_STR} 1" # Format: 4 letters + Space + 1 # Invoke the Kafka producer to send messages. echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \ --bootstrap-server "$BROKER" \ --topic "$TOPIC_NAME" echo "Sent: $MESSAGE" done
- Create topics for the source and sink.
- Build a job JAR package.
- Go to the /opt directory of the physical machine and create the /opt/job/src/main/java/com/huawei/boostkit directory.
mkdir -p /opt/job/src/main/java/com/huawei/boostkit cd /opt/job/
- Create a Java file for the Flink Job.
- Open /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java.
vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java
- Press i to enter the insert mode and add the following content:
package com.huawei.boostkit; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.util.Properties; public class FlinkWordCount { public static void main(String[] args) throws Exception { String broker = "ip:port"; String sourceTopic = "source_abcd"; String targetTopic = "result"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers(broker) .setTopics(sourceTopic) .setGroupId("your-group-id") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setProperties(properties) .build(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator"); KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers(broker) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(targetTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setKafkaProducerConfig(properties) .build(); DataStream<String> source; source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining(); SingleOutputStreamOperator<String> result = source.map(line -> line ); result.sinkTo(sink); result.disableChaining(); env.execute("Wordcount"); } }
- Open /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java.
- Create a POM file.
- Open /opt/job/pom.xml.
vi /opt/job/pom.xml
- Press i to enter the insert mode and add the following content:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huawei.boostkit</groupId> <artifactId>ziliao</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <flink.version>1.16.3</flink.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <!-- Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>1.16.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
- Open /opt/job/pom.xml.
- Run the mvn clean package command. The ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar file is generated in the target directory. Upload the file to the /usr/local/flink directory of the flink_jm_8c32g container.
mvn clean package docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
- Go to the /opt directory of the physical machine and create the /opt/job/src/main/java/com/huawei/boostkit directory.
- Export environment variables from the flink_jm_8c32g container.
export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/usr/local/ksl/include:$C_INCLUDE_PATH export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH - Submit the job from the flink_jm_8c32g container.
cd /usr/local/flink bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar
- View the sink topic data.
- Consume Kafka data and check whether the job is running properly.
cd /usr/local/kafka bin/kafka-console-consumer.sh --bootstrap-server Physical_machine_IP_address:9092 --topic result --from-beginning

- Consume Kafka data and check whether the job is running properly.
- In the flink_jm_8c32g container, view the latest Flink client log flink-root-client-xxx.log.
1cd /usr/local/flink-1.16.3/log

Parent topic: Using the Feature