DataStream使能特性
- 进入flink_jm_8c32g容器,集群模式启动JobManager和TaskManager,每次重新进入容器后需要执行source /etc/profile命令。
docker exec -it flink_jm_8c32g /bin/bash source /etc/profile cd /usr/local/flink-1.16.3/bin ./start-cluster.sh
- 进入flink_tm1_8c32g和flink_tm2_8c32g创建“/opt/conf”目录并新增配置文件。
mkdir /opt/conf cd /opt/conf
- 新增Kafka消费者配置文件kafka_consumer.conf。
fetch.queue.backoff.ms=20 group.id=omni max.poll.records=10000
- 新增Kafka生产者配置文件kafka_producer.conf。
queue.buffering.max.messages=2000000 queue.buffering.max.kbytes=20971520 queue.buffering.max.ms=5 linger.ms=5 batch.num.messages=200000 batch.size=3145728 max.push.records=10000
- 新增Kafka消费者配置文件kafka_consumer.conf。
- 查看JobManager和TaskManager是否启动成功。
- 在flink_jm_8c32g容器中查看是否存在StandaloneSessionClusterEntrypoint进程。
source /etc/profile jps
- 分别进入flink_tm1_8c32g、flink_tm2_8c32g容器查看是否存在TaskManagerRunner进程,下面以flink_tm1_8c32g容器为例。
docker exec -it flink_tm1_8c32g /bin/bash source /etc/profile jps
- 在flink_jm_8c32g容器中查看是否存在StandaloneSessionClusterEntrypoint进程。
- 在物理机上启动Zookeeper、Kafka,详情请参见《Kafka 部署指南》。
- 使用kafka创建topic并生成数据。
- 创建Source和Sink的topic。
cd /usr/local/kafka bin/kafka-topics.sh --create --bootstrap-server 物理机ip:9092 --replication-factor 1 --partitions 1 --topic source_abcd bin/kafka-topics.sh --create --bootstrap-server 物理机ip:9092 --replication-factor 1 --partitions 1 --topic result
- 将下面的内容保存为脚本文件producer.sh,并执行用于向source topic导入数据。
#!/bin/bash # Kafka 安装目录(请根据实际路径修改) KAFKA_HOME="/usr/local/kafka" TOPIC_NAME="source_abcd" # Kafka Topic 名称 BROKER="物理机ip:9092" # Kafka Broker 地址 MESSAGE_COUNT=10 # 发送消息条数 # 检查 Kafka console-producer.sh 是否存在 if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then echo "错误: 未找到 kafka-console-producer.sh,请检查 KAFKA_HOME 路径" exit 1 fi # 生成随机字符串并发送到 Kafka for ((i=1; i<=$MESSAGE_COUNT; i++)); do # 生成 4 个随机字母(大小写混合) + 空格 + 1 RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1) MESSAGE="${RAND_STR} 1" # 格式: 4字母 + 空格 + 1 # 调用 Kafka Producer 发送消息 echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \ --bootstrap-server "$BROKER" \ --topic "$TOPIC_NAME" echo "已发送: $MESSAGE" done
- 创建Source和Sink的topic。
- 构建作业JAR包。
- 进入物理机“/opt”路径,创建“/opt/job/src/main/java/com/huawei/boostkit”路径。
mkdir -p /opt/job/src/main/java/com/huawei/boostkit cd /opt/job/
- 创建flink作业java文件
- 打开“/opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java”.
vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java
- 按“i”进入编辑模式,添加如下内容。
package com.huawei.boostkit; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.util.Properties; public class FlinkWordCount { public static void main(String[] args) throws Exception { String broker = "ip:port"; String sourceTopic = "source_abcd"; String targetTopic = "result"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers(broker) .setTopics(sourceTopic) .setGroupId("your-group-id") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setProperties(properties) .build(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator"); KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers(broker) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(targetTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setKafkaProducerConfig(properties) .build(); DataStream<String> source; source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining(); SingleOutputStreamOperator<String> result = source.map(line -> line ); result.sinkTo(sink); result.disableChaining(); env.execute("Wordcount"); } }
- 打开“/opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java”.
- 创建pom文件
- 打开“/opt/job/pom.xml”.
vi /opt/job/pom.xml
- 按“i”进入编辑模式,添加如下内容。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huawei.boostkit</groupId> <artifactId>ziliao</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <flink.version>1.16.3</flink.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <!-- Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>1.16.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
- 打开“/opt/job/pom.xml”.
- 执行mvn clean package,target目录下会生成ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar并上传到flink_jm_8c32g容器的“/usr/local/flink”。
mvn clean package docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
- 进入物理机“/opt”路径,创建“/opt/job/src/main/java/com/huawei/boostkit”路径。
- flink_jm_8c32g容器导出环境变量。
export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/usr/local/ksl/include:$C_INCLUDE_PATH export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH
- flink_jm_8c32g容器提交作业。
cd /usr/local/flink bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar
- 查看Sink topic的数据。
- 消费Kafka数据查看作业是否正常运行。
cd /usr/local/kafka bin/kafka-console-consumer.sh --bootstrap-server 物理机ip:9092 --topic result --from-beginning
- 消费Kafka数据查看作业是否正常运行。
- 在flink_jm_8c32g容器上查看Flink最新的客户端日志flink-root-client-xxx.log。
1
cd /usr/local/flink-1.16.3/log
父主题: 使用特性