Enabling OmniStream for DataStream
This section describes how to start a Flink cluster and enable OmniStream in DataStream scenarios.
- Optional: If DataStream tasks are running in a multi-Task Manager environment, add omni.batch: true to the flink-conf.yaml file to improve shuffle efficiency and achieve better performance.
- Open the /usr/local/flink/conf/flink-conf.yaml file.
1vi /usr/local/flink/conf/flink-conf.yaml - Press i to enter the insert mode and add the following content to the file:
omni.batch: true
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Open the /usr/local/flink/conf/flink-conf.yaml file.
- Access the flink_jm_8c32g container and start the Flink cluster.
docker exec -it flink_jm_8c32g /bin/bash source /etc/profile cd /usr/local/flink-1.16.3/bin ./start-cluster.sh
Each time you exit and access the container again, you need to run the source /etc/profile command to reload the environment variables. This ensures that the dependencies are properly detected when running tasks.
- Check whether the Job Manager and Task Manager are started successfully.
- Check the flink_jm_8c32g container for the StandaloneSessionClusterEntrypoint process.
source /etc/profile jps
If the StandaloneSessionClusterEntrypoint process exists, the Job Manager is started successfully.

- Access the flink_tm1_8c32g and flink_tm2_8c32g containers and check for the TaskManagerRunner process. The following commands use the flink_tm1_8c32g container as an example:
docker exec -it flink_tm1_8c32g /bin/bash source /etc/profile jps
If the TaskManagerRunner process exists, the Task Manager is started successfully.

- Check the flink_jm_8c32g container for the StandaloneSessionClusterEntrypoint process.
- Create and configure the Kafka consumer and producer configuration files.
- Create the Kafka consumer configuration file kafka_consumer.conf.
fetch.queue.backoff.ms=20 group.id=omni max.poll.records=10000
- Create the Kafka producer configuration file kafka_producer.conf.
queue.buffering.max.messages=2000000 queue.buffering.max.kbytes=20971520 queue.buffering.max.ms=5 linger.ms=5 batch.num.messages=200000 batch.size=3145728 max.push.records=10000
- Access the flink_tm2_8c32g container and perform steps 4.b to 4.d.
docker exec -it flink_tm1_8c32g /bin/bash
- Start ZooKeeper and Kafka on the physical machine. For details, see Kafka Deployment Guide.
- Use Kafka to create topics and generate data.
Replace all the example IP addresses of physical machines in the commands or scripts with the actual IP addresses of the Kafka servers.
- Create topics for the source and sink.
cd /usr/local/kafka bin/kafka-topics.sh --create --bootstrap-server IP_address_of_Kafka_server's_physical_machine:9092 --replication-factor 1 --partitions 1 --topic source_abcd bin/kafka-topics.sh --create --bootstrap-server IP_address_of_Kafka_server's_physical_machine:9092 --replication-factor 1 --partitions 1 --topic result
- Save the following content as the script file producer.sh.
#!/bin/bash # Kafka installation directory (Replace the example directory with the actual one.) KAFKA_HOME="/usr/local/kafka" TOPIC_NAME="source_abcd" # Kafka topic name BROKER="IP_address:9092" # IP address of the Kafka broker server MESSAGE_COUNT=10 # Number of sent messages # Check whether Kafka console-producer.sh exists. if [ ! -f "$KAFKA_HOME/bin/kafka-console-producer.sh" ]; then echo "Error: kafka-console-producer.sh was not found. Check the KAFKA_HOME path." exit 1 fi # Generate a random character string and send it to Kafka. for ((i=1; i<=$MESSAGE_COUNT; i++)); do # Generate four random letters (case-sensitive) + Space + 1. RAND_STR=$(cat /dev/urandom | tr -dc 'a-d' | fold -w 4 | head -n 1) MESSAGE="${RAND_STR} 1" # Format: 4 letters + Space + 1 # Invoke the Kafka producer to send messages. echo "$MESSAGE" | "$KAFKA_HOME/bin/kafka-console-producer.sh" \ --bootstrap-server "$BROKER" \ --topic "$TOPIC_NAME" echo "Sent: $MESSAGE" done - Run the script to generate test data and write it to the source topic.
./producer.sh
- Create topics for the source and sink.
- Build a job JAR package.
- Go to the /opt directory of the physical machine and create the /opt/job/src/main/java/com/huawei/boostkit directory.
mkdir -p /opt/job/src/main/java/com/huawei/boostkit cd /opt/job/
- Create a Java file for the Flink Job.
- Open /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java.
vi /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java
- Press i to enter the insert mode and add the following content:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
package com.huawei.boostkit; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.util.Properties; public class FlinkWordCount { public static void main(String[] args) throws Exception { String broker = "ip:port"; String sourceTopic = "source_abcd"; String targetTopic = "result"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers(broker) .setTopics(sourceTopic) .setGroupId("your-group-id") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setProperties(properties) .build(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); properties.put(ProducerConfig.CLIENT_ID_CONFIG, "DataGenerator"); KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers(broker) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(targetTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setKafkaProducerConfig(properties) .build(); DataStream<String> source; source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").disableChaining(); SingleOutputStreamOperator<String> result = source.map(line -> line ); result.sinkTo(sink); result.disableChaining(); env.execute("Wordcount"); } }
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Open /opt/job/src/main/java/com/huawei/boostkit/FlinkWordCount.java.
- Create a pom.xml file.
- Open /opt/job/pom.xml.
vi /opt/job/pom.xml
- Press i to enter the insert mode and add the following content:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huawei.boostkit</groupId> <artifactId>ziliao</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <flink.version>1.16.3</flink.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <!-- Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>org.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>1.16.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Open /opt/job/pom.xml.
- After the mvn clean package command is executed, the ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar file is generated in the target directory. Upload the JAR package to the /usr/local/flink directory in the flink_jm_8c32g container.
mvn clean package docker cp /opt/job/target/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink_jm_8c32g:/usr/local/flink
- Go to the /opt directory of the physical machine and create the /opt/job/src/main/java/com/huawei/boostkit directory.
- Export environment variables from the flink_jm_8c32g container.
export CPLUS_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$CPLUS_INCLUDE_PATH export C_INCLUDE_PATH=${JAVA_HOME}/include/:${JAVA_HOME}/include/linux:/opt/udf-trans-opt/libbasictypes/include:/opt/udf-trans-opt/libbasictypes/OmniStream/include:/opt/udf-trans-opt/libbasictypes/include/libboundscheck:/opt/udf-trans-opt/libbasictypes/OmniStream/core/include:/usr/local/ksl/include:$C_INCLUDE_PATH export LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LIBRARY_PATH export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/aarch64:${JAVA_HOME}/jre/lib/aarch64/server:/opt/udf-trans-opt/libbasictypes/lib:/usr/local/ksl/lib:$LD_LIBRARY_PATH - Modify the UDF configuration file.
- Set the test case package name (udf_package) and main class name (main_class).
vim /opt/udf-trans-opt/udf-translator/conf/udf_tune.properties
- Press i to enter the insert mode and modify udf_package and main_class as follows:
udf_package=com.huawei.boostkit main_class=com.huawei.boostkit.FlinkWordCount
- Press Esc, type :wq!, and press Enter to save the file and exit.
- Set the test case package name (udf_package) and main class name (main_class).
- Translate the test case JAR package.
sh /opt/udf-trans-opt/udf-translator/bin/udf_translate.sh /usr/local/flink/ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar flink
- Submit the job from the flink_jm_8c32g container.
cd /usr/local/flink bin/flink run -c com.huawei.boostkit.FlinkWordCount ziliao-1.0-SNAPSHOT-jar-with-dependencies.jar
- View the sink topic data.
- Consume Kafka data and check whether the job is running properly.
cd /usr/local/kafka bin/kafka-console-consumer.sh --bootstrap-server IP_address_of_Kafka_server's_physical_machine:9092 --topic result --from-beginning

- Consume Kafka data and check whether the job is running properly.
- In the flink_jm_8c32g container, view the latest Flink client log flink-root-client-xxx.log.
1cd /usr/local/flink-1.16.3/log
If no error information is displayed, OmniStream is enabled successfully.

Parent topic: Using the Feature