(DataStream场景)使能OmniStream
在DataStream场景下,详细描述从启动Flink集群到完成OmniStream使能的操作步骤。
- 可选:如果是在多Task Manager场景下运行DataStream任务,需要在flink-conf.yaml文件中添加配置omni.batch: true,以提升多该场景下的shuffle效率,以达到更优性能。
- 打开“/usr/local/flink/conf/flink-conf.yaml”文件。
1vi /usr/local/flink/conf/flink-conf.yaml - 按“i”进入编辑模式,增加如下配置。
omni.batch: true
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“/usr/local/flink/conf/flink-conf.yaml”文件。
- 进入flink_jm_8c32g容器,启动Flink集群。
docker exec -it flink_jm_8c32g /bin/bash source /etc/profile cd /usr/local/flink-1.16.3/bin ./start-cluster.sh
每次退出并重新进入容器后,需要执行source /etc/profile命令重新注入环境变量,避免运行任务找不到依赖组件。
- 查看Job Manager和Task Manager是否启动成功。
- 在flink_jm_8c32g容器中查看是否存在StandaloneSessionClusterEntrypoint进程。
source /etc/profile jps
存在StandaloneSessionClusterEntrypoint进程,表示Job Manager启动成功。

- 分别进入flink_tm1_8c32g、flink_tm2_8c32g容器查看是否存在TaskManagerRunner进程。下述命令以flink_tm1_8c32g容器为例。
docker exec -it flink_tm1_8c32g /bin/bash source /etc/profile jps
存在TaskManagerRunner进程,表示Task Manager启动成功。

- 在flink_jm_8c32g容器中查看是否存在StandaloneSessionClusterEntrypoint进程。
- 创建并配置Kafka消费者和生产者配置文件。
- 新增Kafka消费者配置文件kafka_consumer.conf。
fetch.queue.backoff.ms=20 group.id=omni max.poll.records=10000
- 新增Kafka生产者配置文件kafka_producer.conf。
queue.buffering.max.messages=2000000 queue.buffering.max.kbytes=20971520 queue.buffering.max.ms=5 linger.ms=5 batch.num.messages=200000 batch.size=3145728 max.push.records=10000
- 进入flink_tm2_8c32g,执行步骤4.b~4.d。
docker exec -it flink_tm1_8c32g /bin/bash
- 在物理机上启动ZooKeeper和Kafka,详情请参见《Kafka 部署指南》。
- 使用Kafka创建Topic并生成数据。
实际操作过程中,请将命令或脚本中所有物理机IP地址替换为实际的Kafka服务端的IP地址。
- 创建Source和Sink的Topic。
cd /usr/local/kafka bin/kafka-topics.sh --create --bootstrap-server Kafka服务端的物理机IP地址:9092 --replication-factor 1 --partitions 1 --topic source_abcd bin/kafka-topics.sh --create --bootstrap-server Kafka服务端的物理机IP地址:9092 --replication-factor 1 --partitions 1 --topic result
- 将下面的内容保存为脚本文件producer.sh。
#!/bin/bash # Kafka安装目录(请根据实际路径修改) KAFKA_HOME="/usr/local/kafka" TOPIC_NAME="source_abcd" # Kafka Topic名称 BROKER="IP地址:9092" # Kafka Broker服务端的IP地址 MESSAGE_COUNT=10 # 发送消息条数 # 检查Kafka console-producer.sh是否存在 if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then echo "错误: 未找到 kafka-console-producer.sh,请检查 KAFKA_HOME路径" exit 1 fi # 生成随机字符串并发送到Kafka for ((i=1; i<=$MESSAGE_COUNT; i++)); do # 生成4个随机字母(大小写混合) + 空格 + 1 RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1) MESSAGE="${RAND_STR} 1" # 格式: 4字母 + 空格 + 1 # 调用Kafka Producer发送消息 echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \ --bootstrap-server "$BROKER" \ --topic "$TOPIC_NAME" echo "已发送: $MESSAGE" done - 执行脚本文件,生成测试数据并写入Source Topic。
./producer.sh
- 创建Source和Sink的Topic。
- 构建作业JAR包。
- 进入物理机“/opt”路径,创建“/opt/job/src/main/java/com/huawei/boostkit”路径。
mkdir -p /opt/job/src/main/java/com/huawei/boostkit cd /opt/job/
- 创建Flink作业Java文件。
- 打开“/opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java”.
vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java
- 按“i”进入编辑模式,添加如下内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
package com.huawei.boostkit; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.util.Properties; public class FlinkWordCount { public static void main(String[] args) throws Exception { String broker = "ip:port"; String sourceTopic = "source_abcd"; String targetTopic = "result"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers(broker) .setTopics(sourceTopic) .setGroupId("your-group-id") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setProperties(properties) .build(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator"); KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers(broker) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(targetTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setKafkaProducerConfig(properties) .build(); DataStream<String> source; source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining(); SingleOutputStreamOperator<String> result = source.map(line -> line ); result.sinkTo(sink); result.disableChaining(); env.execute("Wordcount"); } }
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“/opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java”.
- 创建pom.xml文件。
- 打开“/opt/job/pom.xml”.
vi /opt/job/pom.xml
- 按“i”进入编辑模式,添加如下内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huawei.boostkit</groupId> <artifactId>ziliao</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <flink.version>1.16.3</flink.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <!-- Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>1.16.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“/opt/job/pom.xml”.
- 执行mvn clean package打包命令后,将会在target目录下生成ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar。再将该JAR包上传到flink_jm_8c32g容器的“/usr/local/flink”目录。
mvn clean package docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
- 进入物理机“/opt”路径,创建“/opt/job/src/main/java/com/huawei/boostkit”路径。
- 在flink_jm_8c32g容器导出环境变量。
export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$C_INCLUDE_PATH export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH - 修改UDF配置文件。
- 设置运行用例包名udf_package和主类名main_class
vim /opt/udf-trans-opt/udf-translator/conf/udf_tune.properties
- 按“i”进入编辑模式,修改udf_package和main_class,修改为以下内容。
udf_package=com.huawei.boostkit main_class=com.huawei.boostkit.FlinkWordCount
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 设置运行用例包名udf_package和主类名main_class
- 翻译测试用例JAR包。
sh /opt/udf-trans-opt/udf-translator/bin/udf_translate.sh /usr/local/flink/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink
- 在flink_jm_8c32g容器提交作业。
cd /usr/local/flink bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar
- 查看Sink Topic的数据。
- 消费Kafka数据查看作业是否正常运行。
cd /usr/local/kafka bin/kafka-console-consumer.sh --bootstrap-server Kafka服务端的物理机IP地址:9092 --topic result --from-beginning

- 消费Kafka数据查看作业是否正常运行。
- 在flink_jm_8c32g容器上查看最新的Flink客户端日志flink-root-client-xxx.log。
1cd /usr/local/flink-1.16.3/log
确认无报错信息,表示已经成功使能OmniStream。

父主题: 使用特性