Kafka使能CRC32加速算法

目的

本章以Kafka 2.5.0版本使能鲲鹏BoostKit分布式存储算法加速库CRC32加速算法,其他版本可参考本章节使能加速算法。使能该加速算法用于加速Kafka网络传输的CRC32算法计算性能,提升端到端性能。

方法

基于开源版本Kafka应用功能代码Patch(对于闭源深度定制Kafka查看Patch并对应修改),并修改Kafka编译脚本。Kafka组件通过CRC32C决定CRC32C的工厂类,在其静态代码块中,新增分支针对Arm架构的机器,通过JNI调用KSAL算法加速库来使能。

软件要求

表1 软件要求

类别

支持版本

Java

OpenJDK

操作系统

openEuler 22.03 LTS SP3

Kafka

1.0.0~3.8.0

Gradle

6.1.1

软件获取

Kafka使能鲲鹏BoostKit分布式存储算法加速库CRC32加速算法所需软件包及其获取方式如表2所示。

表2 Kafka使能鲲鹏加速库CRC32加速算法软件获取列表

名称

软件包名称

发布类型

说明

获取地址

KSAL(Kunpeng Storage Acceleration Library

BoostKit-KSAL_1.6.0.zip

闭源

华为自研的存储算法加速库,采用鲲鹏优化的算法代替主流开源算法,提升存储性能。当前包括EC(Erasure Code)算法、CRC16 T10DIF(Cyclic Redundancy Check 16 T10 Data Integrity Field)算法和CRC32(Cyclic Redundancy Check 32)算法

Link

KSAL的Kafka Patch

ksal-crc32-code.patch

开源

集成KSAL加速库的Patch,编译Kafka的时候应用。

Link

配置编译环境

  1. 安装OpenJDK。

    1. 下载安装包。
      1
      wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u252-b09/OpenJDK8U-jdk_aarch64_linux_hotspot_8u252b09.tar.gz --no-check-certificate
      
    2. 解压到指定目录“/opt/tools/installed”
      1
      2
      3
      tar -zxf OpenJDK8U-jdk_aarch64_linux_hotspot_8u252b09.tar.gz
      mkdir -p /opt/tools/installed/
      mv jdk8u252-b09 /opt/tools/installed/
      
    3. 配置环境变量。
      1. 打开“profile”文件。
        1
        vi /etc/profile
        
      2. “i”进入编辑模式,在文件中配置以下内容。
        1
        2
        export JAVA_HOME=/opt/tools/installed/jdk8u252-b09
        export PATH=$JAVA_HOME/bin:$PATH
        
      3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
    4. 生效环境变量。
      1
      source /etc/profile
      
    5. 查看是否生效。
      JDK版本显示是1.8.0_252表示安装已成功。
      1
      java -version
      

  2. 安装Gradle。

    1. 下载Gradle安装包。
      1
      wget https://services.gradle.org/distributions/gradle-6.1.1-bin.zip --no-check-certificate
      
    2. 解压到指定目录“/opt/tools/installed”
      1
      2
      unzip gradle-6.1.1-bin.zip
      mv gradle-6.1.1 /opt/tools/installed/
      
    3. 配置Gradle环境变量。
      1. 打开“profile”文件。
        1
        vi /etc/profile
        
      2. “i”进入编辑模式,在文件中配置以下内容。
        1
        2
        export GRADLE_HOME=/opt/tools/installed/gradle-6.1.1
        export PATH=$GRADLE_HOME/bin:$PATH
        
      3. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
    4. 生效环境变量。
      1
      source /etc/profile
      
    5. 查看是否生效。
      1
      gradle -version
      

  3. 安装KSAL。

    1. 下载KSAL二进制包。
      1
      wget https://kunpeng-repo.obs.cn-north-4.myhuaweicloud.com/Kunpeng%20BoostKit/Kunpeng%20BoostKit%2024.0.RC2/BoostKit-KSAL_1.6.0.zip --no-check-certificate
      
    2. 安装KSAL。
      1
      2
      unzip BoostKit-KSAL_1.6.0.zip
      rpm -ivh libksal-release-1.6.0.oe1.aarch64.rpm
      
    3. 查看是否安装成功。
      如下图所示,目录中存在KSAL的so包,即安装成功。
      1
      ll /lib64/*ksal*
      

编译Kafka

  1. 下载Kafka使能鲲鹏加速库加速CRC32加速算法Patch。

    1. 创建安装目录。
      1
      mkdir -p /opt/crc32
      
    2. 下载鲲鹏加速库CRC32加速算法并解压压缩包。
      1
      2
      3
      cd /opt/crc32 
      wget https://gitee.com/kunpengcompute/boostkit-bigdata/repository/archive/main.zip --no-check-certificate 
      unzip main.zip
      

  2. 下载Kafka,应用Patch并编译。

    1. 下载并解压Kafka。
      1
      2
      3
      cd /opt/crc32 
      wget https://github.com/apache/kafka/archive/refs/tags/2.5.0.zip --no-check-certificate 
      unzip 2.5.0.zip
      
    2. 应用集成KSAL的Patch。
      1
      2
      cd kafka-2.5.0
      git apply /opt/crc32/boostkit-bigdata-main/kafka/ksal-crc32-code.patch
      
    3. 修改Gradle编译脚本。
      开源Kafka 2.5.0版本应用Patch修改build.gradle。
      1
      git apply /opt/crc32/boostkit-bigdata-main/kafka/ksal-crc32-build-code.patch
      
      • Kafka其它版本修改编译脚本文件。
        1. 打开build.gradle。
          1
          vi build.gradle
          
        2. 通过/project(':clients') {定位到相应修改处。
        3. “i”进入编辑模式,在文件project(':clients')模块中新增以下内容。
          1
          2
          3
          4
          5
          6
          7
          task compileCrcJNI(type:Exec) {
              def javaHome = System.getenv('JAVA_HOME')
              commandLine 'gcc', '-fPIC', '-I', javaHome + '/include', '-I', javaHome + '/include/linux',
                      '-shared', '-o', 'src/main/resources/libNativeCrc32C.so',
                      'src/main/native/org/apache/kafka/common/utils/KsalCrc32C.c', '-lksal'
          } 
          compileJava.dependsOn 'compileCrcJNI'
          
        4. “Esc”键,输入:wq!,按“Enter”保存并退出编辑。
      • Kafka 2.7.2版本,配置的Scala版本编译时报错代码不兼容,需修改gradle.properties文件中为低版本的Scala,建议2.12.10。
    4. 执行编译。
      1
      gradle -g /$UserHome/gradleRepository releaseTarGz -info
      

    5. 获取Kafka二进制包kafka_2.12-2.5.0.tgz。
      1
      ll core/build/distributions/
      

使能CRC32加速算法

  1. 解压Kafka二进制包。

    1
    tar -zxvf core/build/distributions/kafka_2.12-2.5.0.tgz -C /usr/local
    

  2. 启动ZooKeeper和Kafka。

    1
    2
    3
    cd /usr/local/kafka_2.12-2.5.0
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    bin/kafka-server-start.sh -daemon config/server.properties
    

  3. 创建topic。

    1
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 70 --topic test
    

  4. 发送消息。

    1
    2
    bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
    randomtest
    

    该脚本为Kafka自带脚本,用于往指定Kafka的topic发送消息。

    • bootstrap-server表示kafka broker的IP地址加端口。
    • topic表示写入的topic。

    该操作在执行后,可一直输入任意消息,按“Enter”键发送。若想中止,可按“ctrl+c”退出。

  5. 中止发送并查看Kafka日志,出现KSAL相关日志即生效。

    1
    cat logs/server.log | grep KSAL