鲲鹏社区首页
中文
注册
开发者
基于鲲鹏KAE的MinIO压缩加速参考实践

基于鲲鹏KAE的MinIO压缩加速参考实践

数据存储KAE

发表于 2025/12/11

0

1 非商用声明

本实践中提供的代码和调优方法仅供用户参考使用,提供的demo代码仅用于用户验证方案可行性以及性能效果对比,用户可参考构建自己的软件。

2 方案介绍

2.1 背景

MinIO 作为一款高性能的开源对象存储系统,凭借其对 Amazon S3 协议的全面兼容、分布式架构设计以及轻量化的部署特性,已成为云原生时代主流的存储解决方案。其高吞吐、低延迟的性能优势,能够高效支撑日志处理、AI 数据湖等多种应用场景;尤其在大语言模型(LLM)领域,结合检索增强生成(RAG)技术,可实现高效的数据检索与响应。通过对 MinIO 进行针对性调优,可充分释放鲲鹏服务器的硬件潜能,提升应用性能。

2.2 方案简介

本参考实践推荐使用以下方案在鲲鹏服务器上对MinIO进行性能优化:

  1. 系统性能调优:通过配置服务器性能模式,应用绑核等方法,提升MinIO在鲲鹏服务器上的性能;
  2. 压缩传输方案:通过将原始文件使用KAE进行高性能压缩后再传输,优化网络带宽和磁盘空间的占用,该方案对文本日志类文件压缩效果较好,图片视频类等已压缩过的文件则没有效果,不建议使用本方案;

本参考实践的优化方案架构图如图所示,在MinIO Server服务器通过系统性能调优提升MinIO程序性能,在存储服务层通过KAE高性能压缩方案优化网络传输和空间占用。

2.3 部署方案

典型硬件配置

节点类型

服务器配置

数量

部署内容

MinIO Server节点

CPU:鲲鹏处理器(本实践采用7270Z)

内存:256GB或以上

存储:NVME盘

1台或以上

部署MinIO Server,用于存储数据。

MinIO Client节点

CPU:鲲鹏处理器(本实践采用7260)

内存:256GB或以上

存储:NVME盘

以实际业务需求为准

部署MinIO Client,对内与MinIO Server通信,对外为User APP提供封装后的服务接口。

软件版本

本参考实践使用的软件配套版本如下:

软件

版本

说明

MinIO

2025-09-07T16-13-09Z

MinIO Server

MinIO Python SDK

7.2.18

MinIO Client组件

KAE

2.0

硬件加速模块

Python

3.9


OS

openEuler release 22.03 (LTS-SP4)

物理机OS

3 部署指导

3.1 软件安装

MinIO安装指导(Server服务器)

本参考实践介绍使用源码编译的方式安装MinIO Server,属于示例Demo,使用其它方式(例如容器部署,二进制部署)也可以正常安装部署MinIO,且同样适用本实践的优化方案。如果需要商用部署,请参考官方MinIO部署文档(https://github.com/minio/minio)。

部署编译MinIO所需的go环境:

# 安装基础编译工具链
sudo dnf groupinstall "Development Tools" -y
sudo dnf install git wget curl gcc make pkgconfig -y

# 从镜像站下载go的包
wget -O go1.24.2.linux-arm64.tar.gz 'https://mirrors.aliyun.com/golang/go1.24.2.linux-arm64.tar.gz'

# 解压安装go
tar -C /usr/local -xzf go1.24.2.linux-arm64.tar.gz

# 配置环境变量(对当前用户生效)
echo 'export PATH=$PATH:/usr/local/go/bin' >> ~/.bashrc
echo 'export GOPATH=$HOME/go' >> ~/.bashrc
echo 'export PATH=$PATH:$GOPATH/bin' >> ~/.bashrc

# 加载环境变量
source ~/.bashrc

# 验证安装
go version
# 应输出:
# go version go1.24.2 linux/arm64

拉取MinIO源码,并编译二进制文件(MinIO Server安装):

# 克隆 MinIO 源码(官方仓库)
git clone https://github.com/minio/minio.git
cd minio

# 可选:切换到稳定版本分支(推荐)
# 可以选择较新的稳定版,本实践使用的RELEASE.2024-09-10T17-32-36Z
git checkout RELEASE.2025-09-07T16-13-09Z

# 编译 MinIO
make build

# 验证生成的可执行文件
./minio --version

KAE安装指导(Client服务器)

本实践基于7260和7270Z的鲲鹏 CPU+openEuler22.03-LTS-SP1操作系统环境进行。如果使用其它鲲鹏CPU和其他操作系统,需要申请License并安装配套版本的KAE,详情可参考KAE GitCode开源地址:https://gitcode.com/boostkit/KAE

采用KAE压缩方案时,需在Client服务器上安装KAE。注意,根据不同的操作系统版本,可能需要选择不同的KAE安装版本。

源码安装KAE2.0:

# 安装前检查
lspci | grep -i engine
# 如果有ZIP Engine则可正常进行后续安装动作,如果没有可能为设备不支持或者需要申请license


# 安装依赖
yum install -y make kernel-devel-`uname -r` libtool numactl-devel openssl-devel lz4-devel libzstd-devel chrpath cmake libunwind-devel

# 配置环境变量
export OPENSSL_ENGINES=/usr/local/lib/engines-1.1
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib

# 安装KAE
git clone https://gitcode.com/boostkit/KAE.git -b kae2
cd KAE
sh build.sh all

# 安装后检验kaezlib压缩
# 功能测试
cd KAEZlib/test/gtest/
sh build.sh
./kaezlibtest --gtest_filter=*Case*
# 预期结果,SmallCase与LargeCase两个用例的执行结果均为passed

# 编译性能测试工具
cd KAEZlib/test/perftest
make

# 使用系统zip压缩
./zip_perf -m 8 -l 10240 -n 1000  

# 使用kaezip压缩
./kaezip_perf -m 8 -l 10240 -n 1000 
# 预期结果,压缩速度大幅提升,如果没有大幅提升,则系统使用的CPU压缩,没有使能KAE硬件加速

3.2 MinIO Server部署

MinIO Server的服务拉起命令如下:

# 通过环境变量,指定登陆MinIO登陆信息
export MINIO_ROOT_USER=minioadmin
export MINIO_ROOT_PASSWORD=********
# 示例中2个值都配置为minioadmin

# 单机部署场景,根据存储位置修改命令,拉起MinIO服务
./minio server /data/minio/data00 /data/minio/data01 /data/minio/data02 --console-address :9001

# 可通过该写法简化部署命令
./minio server /data/minio/data0{0,1,2} --console-address :9001

# 分布式多机部署场景,需要根据每台机器的IP和存储位置修改命令,示例命令中为示意IP
# 需要在每台服务上,都执行该命令,最终拉起分布式MinIO服务
# 命令中的xx.xx.xx.1/2/3分别代指3台服务器IP,根据实际使用情况修改
 ./minio server http://xx.xx.xx.1/data/minio/data0{0,1}  http://xx.xx.xx.2/data/minio/data0{0,1}  http://xx.xx.xx.3/data/minio/data0{0,1} --console-address :9001
# 拉起成功后,会有以下格式打屏,其中包含API/WebUI的登陆信息和登陆地址
MinIO Object Storage Server
Copyright: 2015-2025 MinIO, Inc.
License: GNU AGPLv3 - https://www.gnu.org/licenses/agpl-3.0.html
Version: DEVELOPMENT.2025-09-07T16-13-09Z (go1.24.2 linux/arm64)

API: http://xx.xx.xx.xx:9000   
   RootUser: minioadmin 
   RootPass: minioadmin 

WebUI: http://xx.xx.xx.xx:9001                       
   RootUser: minioadmin 
   RootPass: minioadmin 

CLI: https://docs.min.io/community/minio-object-store/reference/minio-mc.html#quickstart
   $ mc alias set 'myminio' 'http://xx.xx.xx.xx:9000' 'minioadmin' 'minioadmin'

Docs: https://docs.min.io

可以根据打屏信息,登录到http://your_host_ip:9001/访问MinIO的web页面。

考虑到部分用户在部署 MinIO 服务时,并未独占整台服务器,而是与其他业务共用同一主机,建议采用 CPU 绑核的方式运行 MinIO,以限制其占用的 CPU 资源。这样不仅可以实现资源的精细化分配,还能有效降低不同程序之间的相互干扰,提升系统整体稳定性与可预测性,绑核的基础参考命令如下:

# 使用0到15核,拉起MinIO服务
taskset -c 0-15 ./minio server /data/minio/data00 /data/minio/data01 /data/minio/data02 --console-address :9001

3.3 MinIO Client部署

MinIO Python SDK应用于存储服务,本实践介绍如何通过 MinIO Python SDK 部署 MinIO 客户端,实现文件的上传与下载功能。使用其他兼容 Amazon S3 协议的客户端(例如 boto3)或直接通过 MinIO Web 控制台进行操作,同样能够完成所介绍的功能。

MinIO-Python-SDK的安装方式:

# 使用pip安装MinIO-Pytyon-SDK,本实践使用的Python版本为py3.9
pip install minio

# 查看MinIO-Python-SDK版本
pip list | grep minio
# 本实践使用的版本为7.2.18

参考的python脚本内容如下,具体操作流程为,导入MinIO库,通过服务地址和登陆信息,实例化一个客户端,然后使用fput_object和fget_object方法来进行文件的上传和下载:

import time
from minio import Minio

# 配置基础信息:
# MINIO_ENDPOINT为已经拉起的MinIO服务,ACCESS_KEY和SECRET_KEY为登陆信息(拉起服务时配置的MINIO_ROOT_USER和MINIO_ROOT_PASSWORD),BUCKET_NAME为存储桶的名称(可以类比为传统文件系统中的“文件夹”)
# 其中OBJECT_NAME为上传后的保存名称,LOCAL_FILE_PATH为本地文件名称,2个值可以配置相同,a.txt可以自行创建
MINIO_ENDPOINT = "xx.xx.xx.xx:9000"
ACCESS_KEY = "minioadmin"
SECRET_KEY = "********" # 此处*为示例,实际使用时需填入真实值,后续都使用*进行表示
BUCKET_NAME = "test-bucket"
OBJECT_NAME = "a.txt"
LOCAL_FILE_PATH="a.txt"

# 初始化 MinIO 客户端(不使用 TLS)
client = Minio(
    endpoint=MINIO_ENDPOINT,
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    secure=False,  # 如果使用 HTTPS,则设为 True
)

# 如果bucket不存在,则创建一个bucket
if not client.bucket_exists(bucket_name=BUCKET_NAME):
    client.make_bucket(bucket_name=BUCKET_NAME)

# ===== FPUT 操作(文件上传)=====
print("Uploading file...")
start_time = time.perf_counter()
print(start_time)
try:
    result = client.fput_object(
        bucket_name=BUCKET_NAME,
        object_name=OBJECT_NAME,
        file_path=LOCAL_FILE_PATH
            )
    print("fput object_name is: ", result.object_name)
    OBJECT_NAME_DOWNLOAD = result.object_name
    put_time =  time.perf_counter() - start_time
    print(f"✅ Upload completed in {put_time:.4f} seconds")
except S3Error as e:
    print("❌ Upload failed:", e)

# ===== FGET 操作(文件下载)=====
downloaded_file = "./downloaded_a.txt"
print("Downloading file...")
start_time = time.perf_counter()
try:
    client.fget_object(
        bucket_name=BUCKET_NAME,
        object_name=OBJECT_NAME_DOWNLOAD,
        file_path=downloaded_file
    )
    get_time = time.perf_counter() - start_time
    print(f"✅ Download completed in {get_time:.4f} seconds")
except S3Error as e:
    print("❌ Download failed:", e)

可通过该脚本,验证MinIO Client的上传下载能力。

上述操作,也可以通过WebUI界面来完成,拉起服务后,登陆到MinIO Serer的IP,端口使用9001,即可访问MinIO的Web页面(拉起服务时可配,或者查看服务拉起打屏的内容),可以进行文件查阅,也可以上传和下载(如果未显示文件,可以尝试点击刷新按钮,即可看到目标文件)。


4 MinIO性能测试和优化实践

4.1 系统优化配置

不同机型的BIOS配置方式有差异,具体操作按实际机器BIOS配置项为准,本实践采用7270Z的机器作为Server进行介绍,以下系统优化均在Server节点进行配置。

4.1.1 配置服务器为高性能模式

调优策略:

配置服务器为高性能模式,让CPU运行在标称频率,不动态调频。


操作指导:

步骤1:

进入BIOS,依次选择“BIOS -> Advanced -> Power And Performance Configuration -> Power Policy”

步骤2:

设置“Power Policy”选项为“Performance”,然后保存配置。

4.1.2 开启服务器超线程

调优策略:

使能超线程,提高处理器的并行处理能力和资源利用率,在多任务或多线程工作负载下提升整体性能。


操作指导:

步骤1:

进入BIOS,依次选择“BIOS -> Advanced -> Power And Performance Configuration -> CPU PM Control -> SMT2”

步骤2:

设置“SMT2”选型为“Enable”,然后保存配置


开启超线程后,一个物理CPU核心会变成两个逻辑核心。在进行应用程序绑核时要注意,如果把不同的应用分别绑到同一个物理核心对应的两个逻辑核心上,它们之间可能会互相干扰,影响性能。

为了实现绑核不跨NUMA,可以参考以下操作:

# 使用命令,查看CPU物理核与逻辑核的对应关系
lscpu -e

# 示例输出如下
CPU NODE SOCKET CORE L1d:L1i:L2:L3 ONLINE    MAXMHZ   MINMHZ MHZ
  0    0      0    0 0:0:0:0          yes 2900.0000 400.0000   -
  1    0      0    0 0:0:0:0          yes 2900.0000 400.0000   -
  2    0      0    1 1:1:1:0          yes 2900.0000 400.0000   -
  3    0      0    1 1:1:1:0          yes 2900.0000 400.0000   -
# CPU列,代表的逻辑核,也就是绑核时指定的核
# CORE列,代表的物理核,示例中每个物理核出现2次,代表对应2个逻辑核
# 示例中,逻辑核0/1对应的物理核0,建议应用绑核运行时,同时包含0/1核

4.1.3 网卡亲和的NUMA绑核

调优目的:

MinIO Server进程所使用的CPU核,与MinIO Server所使用的网卡,建议处于同一个NUMA节点下,提高程序性能。


操作指导:

# 使用ifconfig,查询IP所对应的网卡
ifconfig

# 查看该网卡的基本信息(可选)
ethtool enp65s0f1np1

# 查询该网卡所属的numa节点
cat /sys/class/net/enp65s0f1np1/device/numa_node
# 得到非负值则继续
# 例如得到1,代表处于node1的NUMA节点

# 使用lscpu,查看该NUMA对应的cpu核
lscpu |grep node1

# 例如如下输出,再启动MinIO服务时,将其绑在对应核上
NUMA node1 CPU(s):                  64-127

# 例如,使用64到79核,拉起MinIO服务
taskset -c 64-79 ./minio server /data/minio/data00 /data/minio/data01 /data/minio/data02 --console-address :9001

4.1.4 使用tuned进行系统参数配置

调优目的:

使用tuned进行系统参数配置,提高系统性能。配置文件可参考MinIO官方提供的配置文件(参考链接:https://github.com/minio/minio/tree/master/docs/tuning)。


操作指导:

# 安装tuned,本实践采用的tuned版本为2.19.0
yum install tuned

# 获取tuned.conf文件 
wget https://raw.githubusercontent.com/minio/minio/master/docs/tuning/tuned.conf

# 创建目录,将配置文件移动到指定目录
sudo mkdir -p /usr/lib/tuned/minio/
# 注意,不同tuned版本,配置文件存放路径可能不同
# 例如2.24.0的tuned需要在/usr/lib/tuned/profiles下创建路径并放置conf文件

# 将配置文件移动到指定目录
sudo mv tuned.conf /usr/lib/tuned/minio

# 使能tuned为minio模式
sudo tuned-adm profile minio

4.2 基于KAE的压缩加速方案

可以使用将原文件进行KAE压缩后存储的方案,实现MinIO性能的整体优化。具体实现方式为:

  1. Client端将原文件使用KAE压缩后再写入MinIO Server,MinIO Server将压缩文件进行存储,这样可同时减少存储空间和对网络传输带宽的占用。
  2. 从MinIO Server读文件时,将压缩文件进行返回,Client端将接收到的文件使用KAE解压后返回给应用,也可实现减少网络带宽占用。

该方案的加速效果和文件的压缩率呈现正相关,压缩率60%时的性能数据如表所示:


每秒传输文件数(obj/s)

提升比

优化前

280


KAE压缩加速后

466

66%

测试条件如下,测试方式为并发摸高最大吞吐测试,相关测试脚本见附录:

  1. 测试使用10MB的文本文件(https://github.com/zlib-ng/corpora/blob/master/silesia/dickens),使用KAE的zlib压缩后约为6MB,压缩率为60%。
  2. 测试环境为25Gbps网络带宽,使用4GB/s写速率的NVME磁盘(单机三盘),CPU采用20核绑核(7270Z)。


使用该方案,需要提前按照软件安装章节中的内容,在Client端完成KAE的安装,压缩和解压过程都在Client端进行实现。

为了提高易用性,本实践将MinIO-Python-SDK的API进行封装,在MinIO原有的put_object/get_object/fput_object/fget_object接口中,增加了自动调用KAE进行压缩/解压缩的操作,原API的参数不变,新增参数compress_mode/uncompress_mode来使能KAE压缩/解压缩功能。

方案的整体使用步骤如下:

步骤1:部署KAE:

先按照参考实践中的KAE安装章节,进行KAE部署,如果已部署完成则忽略此步骤。

步骤2:导入KAE环境变量:

在业务应用的运行环境中,导入KAE环境变量。

步骤3:部署压缩方案脚本:

将附录中示例脚本(minio_kae_extension.py),放到和应用脚本同一路径下,或者通过其它方式实现导入示例脚本。

步骤4:存储服务使能KAE压缩:

从示例脚本中导入MinIO使用,替代原有的从MInIO-Python-SDK中导入的MinIO类。导入后,正常初始化Client,然后在写文件/读文件调用时,增加使能压缩/解压的参数,实现快速调用KAE进行加速(注意,压缩文件会自动补充压缩后缀,读文件时时需要使用带压缩后缀的文件名)。


具体使用方式的参考如下:

从示例脚本中导入MInIO,并正常实例化Client:

# 将从MinIO-Python-SDK中导入的类,改为从示例脚本中导入
# 将原内容进行替换
# from minio import Minio
# 修改后(拓展脚本名称为minio_kae_extension.py,放在和client脚本相同路径):
from minio_kae_extension import Minio

# 实例化client的用法保持不变
client = Minio(
    endpoint=MINIO_ENDPOINT,
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    secure=False,  # 如果使用 HTTPS,则设为 True
)

写文件时,接口方法保持不变,通过新增参数使能压缩功能:

# 使用写文件功能时,增加compress_mode="on"使能KAE压缩后上传,若不配置该参数则行为和原minio相同
# 使能压缩后,会对原始文件进行KAE压缩,将存储文件自动增加压缩后缀,并将压缩信息写入metadata
result = client.fput_object(
        bucket_name=BUCKET_NAME,
        object_name=OBJECT_NAME,
        file_path=LOCAL_FILE_PATH,
        compress_mode="on",
    )

# 可以通过返回值中的object_name得到真实存储的文件名,用于查询和下载
# 若不希望自动添加压缩后缀,可自行对拓展脚本进行修改,但是注意对文件是否压缩做好管理和识别
real_save_name = result.object_name

读文件时,注意使用带压缩后缀的文件名,接口方法保持不变,通过新增参数使能解压缩:

# 使用读文件功能时,增加uncompress_mode="on"使能下载后使用KAE解压,若不配置该参数则行为和原minio相同
# 使能解压后,会对对象文件的metadata获取压缩信息(是否压缩,压缩格式),然后进行对应解压操作(若无压缩信息,则不进行解压操作)
 client.fget_object(
        bucket_name=BUCKET_NAME,
        object_name=OBJECT_NAME_DOWNLOAD,
        file_path=downloaded_file,
        uncompress_mode="on",
    )


5 附录

5.1 MinIO官方性能测试工具使用指导

可以使用MinIO官方提供的warp工具,进行MinIO的性能测试。可以直接下载二进制文件,解压即用。

下载地址:https://github.com/minio/warp/releases

下载tar.gz压缩包,解压出warp文件即可,本实践用的warp版本为v1.3.1

# 查看整体功能
 ./warp --help

# 查看具体针对某一操作的使用方法,例如查看上传功能(put)
./warp put --help

# 测试put操作,并发10,文件大小10MB,持续时间1分钟,进行性能测试:
./warp put --host=xx.xx.xx.xx:9000 --access-key=minioadmin --secret-key=********   --concurrent 10 --obj.size 10MiB --duration 60s

# 测试get操作,会先上传一定的文件(通过objects参数指定,默认2500),然后从中开始随机选取文件进行get测试
./warp get --host=xx.xx.xx.xx:9000 --access-key=minioadmin --secret-key=********   --concurrent 10 --obj.size 10MiB --objects 200 --duration 60s

执行成功后,会出现如下图所示的性能打屏


5.2 负载均衡参考

在分布式部署 MinIO 服务时,每个客户端通常只与一个服务节点建立连接并进行数据传输。如果大量客户端同时访问同一个节点,会导致节点间负载不均,使该节点成为性能瓶颈,进而拖累整个系统的处理效率。

根据实际应用场景,用户可自行决定负载均衡的实现方式,例如MinIO官方提供的Sidekick负载均衡工具(https://github.com/minio/sidekick),或者自行使用Nginx搭建,搭建方式可以参考MinIO官方提供的文档(https://docs.min.io/enterprise/aistor-object-store/installation/linux/load-balancing/),从而实现维持各节点间请求的均匀分布,提升系统整体性能与稳定性。


5.3 KAE压缩加速方案参考实现与性能测试脚本

5.3.1 KAE压缩加速方案参考实现脚本

完整的KAE压缩加速方案脚本(minio_kae_extension.py)参考内容为:

from minio import Minio as _OriginalMinio
from minio.error import S3Error
import os
import zlib
from pathlib import Path
from typing import Optional, Union, List, Any, Generator
from io import BytesIO

class Minio(_OriginalMinio):
    """
    继承 Minio 客户端,增加自动压缩/解压功能。
    """
    def __init__(
        self,
        # 新增压缩相关参数
        compress_format: str = "zlib",         # 支持 zlib格式
        compress_level: int = 6,               # zlib 压缩等级
        *args,
        **kwargs
    ):
        super().__init__(
            *args,
            **kwargs
        )
        self.compress_format = compress_format.lower()
        self.compress_level = compress_level

        # 支持的压缩格式,可根据实际情况拓展
        self.compress_format_support = {'zlib': '.zlib' }

        if self.compress_format not in (self.compress_format_support):
            raise ValueError("compress format is supported currently.")

    def fput_object(
        self,
        bucket_name: str,
        object_name: str,
        file_path: Union[str, bytes, Path],
        metadata: Optional[dict] = None,
        compress_mode: str = "off",        
        *args,
        **kwargs
    ):
        """
        注意:当 compress_mode="on" 时,需提供 `length`(原始未压缩大小),
        用于 metadata 记录。fput_object 会自动提供此值。
        """
        file_path = Path(file_path)
        original_metadata = metadata or {}

        if compress_mode.lower() == "on":
            # 获取原始文件大小(用于 put_object 的 length 参数)
            original_size = file_path.stat().st_size

            # 以流方式打开文件
            with open(file_path, "rb") as file_stream:
                return self.put_object(
                    bucket_name=bucket_name,
                    object_name=object_name,
                    data=file_stream,
                    length=original_size,
                    metadata=original_metadata,
                    compress_mode="on",  # 触发压缩逻辑
                    *args,
                    **kwargs
                )
        else:
            # 不压缩,直接调用父类方法(原生流式上传)
            return super().fput_object(
                bucket_name=bucket_name,
                object_name=object_name,
                file_path=file_path,
                metadata=original_metadata,
                *args,
                **kwargs
            )

    def fget_object(
        self,
        bucket_name: str,
        object_name: str,
        file_path: Union[str, Path],
        uncompress_mode: str = "off",
        *args,
        **kwargs,
    ) -> None:
        """
        通过 get_object 同步检查 metadata,决定是否解压。
        - 若对象含 X-Amz-Meta-Kae-Compressed: true → 通过 get_object 获取流,若需解压则自动流式解压,全程不加载全文件到内存。
        """
        output_path = Path(file_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # 调用自定义 get_object(可能返回原始流或解压流)
        response = self.get_object(
            bucket_name=bucket_name,
            object_name=object_name,
            uncompress_mode=uncompress_mode,
            *args,
            **kwargs
        )
        try:
            with open(output_path, "wb") as f:
                # 流式写入,chunk size 可配置(这里用 10MB)
                for chunk in response.stream(amt=10 * 1024 * 1024):
                    f.write(chunk)
        finally:
            response.close()
            response.release_conn()

    def put_object(
        self,
        bucket_name: str,
        object_name: str,
        data: Any,
        length: int,
        metadata: Optional[dict] = None,
        compress_mode: str = "off", 
        *args,
        **kwargs,
    ):
        """
        覆盖原 put_object:若满足条件,则先压缩再上传,并添加压缩后缀(例如.zlib)和 metadata 标记。
        使用流式压缩(ZlibCompressStream),避免全量加载到内存。
        """
        original_metadata = metadata or {}

        if compress_mode.lower() == "on":
            should_compress = True
        else:
            should_compress = False

        if should_compress and self.compress_format=="zlib":
            # 使用流式压缩包装器
            compress_stream = ZlibCompressStream(data, compress_level=self.compress_level)

            # 修改 object_name 添加后缀
            suffix = str(self.compress_format_support.get(self.compress_format, ".zlib"))
            new_object_name = object_name + suffix

            # 添加 KAE metadata 标记
            kae_metadata = {
                "X-Amz-Meta-Kae-Compressed": "true",
                "X-Amz-Meta-Kae-Format": self.compress_format,
                "X-Amz-Meta-Kae-Level": str(self.compress_level),
                "X-Amz-Meta-Kae-Original-Size": str(length),
            }
            merged_metadata = {**original_metadata, **kae_metadata}

            # 注意:流式压缩无法预知最终长度,因此必须设 length=-1 并指定 part_size
            # 如果调用方未提供 part_size,使用默认值(如 10MB)
            if "part_size" not in kwargs:
                kwargs["part_size"] = 10 * 1024 * 1024  # MinIO 要求 ≥5MB,可配

            return super().put_object(
                bucket_name=bucket_name,
                object_name=new_object_name,
                data=compress_stream,
                length=-1,  # 流式压缩长度未知
                metadata=merged_metadata,
                *args,
                **kwargs
            )
        else:
            # 不压缩,直接调用父类方法
            return super().put_object(
                bucket_name=bucket_name,
                object_name=object_name,
                data=data,
                length=length,
                metadata=original_metadata,
                *args,
                **kwargs
            )

    def get_object(
        self,
        bucket_name: str,
        object_name: str,
        uncompress_mode: str = "off",
        *args,
        **kwargs,
    ):
        """
        若对象含压缩标记,则返回流式解压后的对象(兼容 HTTPResponse 接口);
        否则返回原生响应。
        全程流式处理,不将完整解压内容加载到内存。
        """
        response = super().get_object(bucket_name, object_name, *args, **kwargs)

        try:
            is_kae_compressed = (uncompress_mode.lower() == "on" and response.headers.get("x-amz-meta-kae-compressed") == "true")
            if is_kae_compressed and response.headers.get("x-amz-meta-kae-format") == "zlib":
                # 返回流式解压包装器,不聚合数据
                return ZlibDecompressStream(response)
            else:
                return response
        except Exception as e:
            # 出错时确保释放原始连接
            try:
                response.close()
                response.release_conn()
            except Exception:
                pass
            raise RuntimeError(f"Failed to handle object {object_name}: {e}") from e

class ZlibCompressStream:
    def __init__(self, source_data, chunk_size=1 * 1024 * 1024, compress_level=6):
        """
        :param source_data: 字节数据或可读流(需具备 .read() 方法)
        :param chunk_size: 每次从源数据中读取的字节数
        """
        self.source = source_data if hasattr(source_data, 'read') else BytesIO(source_data)
        self.chunk_size = chunk_size
        self.compressor = zlib.compressobj(level=compress_level)
        self.buffer = b""
        self.finished = False

    def read(self, size: int) -> bytes:
        """为 put_object() 实现类似文件的 .read(size) 方法"""
        while len(self.buffer) < size and not self.finished:
            raw_chunk = self.source.read(self.chunk_size)
            if not raw_chunk:
                # 输入结束,flush compressor
                self.buffer += self.compressor.flush()
                self.finished = True
                break
            compressed = self.compressor.compress(raw_chunk)
            if compressed:
                self.buffer += compressed

        result = self.buffer[:size]
        self.buffer = self.buffer[size:]
        return result

class ZlibDecompressStream:
    """
    流式 zlib 解压包装器,模拟 urllib3.HTTPResponse 行为。
    支持 .read(), .stream(), .close(), .release_conn()。
    """
    def __init__(self, compressed_stream, chunk_size: int = 1 * 1024 * 1024):
        """
        :param compressed_stream: 原始压缩数据流(如 MinIO get_object() 返回的响应)
        :param chunk_size: 每次从压缩流读取的字节数
        """
        self._compressed_stream = compressed_stream
        self._chunk_size = chunk_size
        self._decompressor = zlib.decompressobj()
        self._buffer = b""
        self._finished = False
        self.closed = False

    def read(self, size: int = -1) -> bytes:
        if self.closed:
            raise ValueError("I/O operation on closed file.")

        if size == -1:
            # 读取剩余全部内容(仍流式进行)
            parts = []
            while True:
                chunk = self.read(1 * 1024 * 1024)
                if not chunk:
                    break
                parts.append(chunk)
            return b"".join(parts)

        # 按需填充缓冲区
        while len(self._buffer) < size and not self._finished:
            compressed_chunk = self._compressed_stream.read(self._chunk_size)
            if not compressed_chunk:
                # 压缩流结束,flush 解压器
                leftover = self._decompressor.flush()
                if leftover:
                    self._buffer += leftover
                self._finished = True
                break

            decompressed = self._decompressor.decompress(compressed_chunk)
            if decompressed:
                self._buffer += decompressed

        # 返回请求的字节数
        result = self._buffer[:size]
        self._buffer = self._buffer[size:]
        return result

    def stream(self, amt=1 * 1024 * 1024, decode_content=None):
        """兼容 urllib3.HTTPResponse.stream()"""
        while True:
            chunk = self.read(amt)
            if not chunk:
                break
            yield chunk

    def close(self):
        if not self.closed:
            try:
                self._compressed_stream.close()
                self._compressed_stream.release_conn()
            except Exception:
                pass  # 忽略关闭异常
            self.closed = True

    def release_conn(self):
        """兼容接口,实际由 close() 处理"""
        self.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


5.3.2 KAE压缩加速方案测试参考脚本

若想使用该方案进行并发性能测试,本实践提供参考的并发性能测试脚本,其中吞吐量的计算采用原文件大小进行统计。

使用示例:

python minio_perf_test.py \
  --endpoint xx.xx.xx.xx:9000 \
  --access-key minioadmin \
  --secret-key ******** \
  --bucket perf-test \
  --file dickens.txt \
  --duration 30 \
  --concurrency 10 \
  --test both

注意事项:

1. get测试时会先落盘到tem路径后删除,如果tem空间不足,可修改指定落盘路径。

2. get测试时,会先使用1s上传文件,如果测试大文件时1s不足,则可修改脚本延长时间。

3. put测试完后,不会自动删除上传的测试文件,需要手动删除。


KAE压缩加速方案测速示例脚本minio_perf_test.py:

import os
import time
import argparse
import tempfile
import multiprocessing as mp
from pathlib import Path
from minio_kae_extension import Minio
from minio.error import S3Error

def upload_worker(args, worker_id, result_queue):
    client = Minio(
    endpoint=args.endpoint,
    access_key=args.access_key,
    secret_key=args.secret_key,
    secure=False,  # 如果使用 HTTPS,则设为 True
    )

    if not client.bucket_exists(args.bucket):
        client.make_bucket(args.bucket)

    local_file = Path(args.file)
    file_size_bytes = local_file.stat().st_size
    file_name = local_file.name

    start_time = time.time()
    op_count = 0
    total_latency = 0.0
    errors = 0

    while time.time() - start_time < args.duration:
        round_id = op_count + 1
        remote_name = f"work_{worker_id:04d}_times_{round_id:08d}_{file_name}"

        try:
            t0 = time.perf_counter()
            client.fput_object(args.bucket, remote_name, str(local_file),part_size=10*1024*1024,compress_mode="on")
            latency = time.perf_counter() - t0
            total_latency += latency
            op_count += 1
        except Exception as e:
            errors += 1
            print(f"[Upload Worker {worker_id}] Error: {e}")

    result_queue.put(("upload", worker_id, op_count, total_latency, errors, file_size_bytes))


def download_worker(args, worker_id, result_queue):
    client = Minio(
    endpoint=args.endpoint,
    access_key=args.access_key,
    secret_key=args.secret_key,
    secure=False,  # 如果使用 HTTPS,则设为 True
    )

    local_file = Path(args.file)
    original_file_name = local_file.name
    file_size_bytes = local_file.stat().st_size

    # 先上传一个基准文件用于下载(由 worker 0 负责)
    if worker_id == 1:
        remote_name = f"baseline_{original_file_name}"
        client.fput_object(args.bucket, remote_name, str(local_file),compress_mode="on")
    else:
        # 等待 baseline 文件就绪(简单 sleep,实际可加锁或信号,如果为大文件也可延长时间)
        time.sleep(1)

    start_time = time.time()
    op_count = 0
    total_latency = 0.0
    errors = 0

    with tempfile.TemporaryDirectory() as tmpdir:
        while time.time() - start_time < args.duration:
            round_id = op_count + 1
            remote_name = f"baseline_{original_file_name}"
            local_path = Path(tmpdir) / f"dl_{worker_id:02d}_{round_id:03d}_{original_file_name}" # 如果tem空间有限,可在此处指定Path,配置到空间充足的路径

            try:
                t0 = time.perf_counter()
                client.fget_object(args.bucket, remote_name+".zlib", str(local_path),uncompress_mode="on")
                latency = time.perf_counter() - t0

                # 重新读取实际下载的文件大小(不依赖 metadata)
                actual_size = local_path.stat().st_size
                if actual_size != file_size_bytes:
                    raise ValueError(f"Size mismatch: expected {file_size_bytes}, got {actual_size}")

                total_latency += latency
                op_count += 1
            except Exception as e:
                errors += 1
                print(f"[Download Worker {worker_id}] Error: {e}")

    result_queue.put(("download", worker_id, op_count, total_latency, errors, file_size_bytes))


def main():
    parser = argparse.ArgumentParser(description="MinIO Throughput Benchmark using fput/fget")
    parser.add_argument("--endpoint", required=True, help="MinIO endpoint (e.g., localhost:9000)")
    parser.add_argument("--access-key", required=True)
    parser.add_argument("--secret-key", required=True)
    parser.add_argument("--bucket", default="perf-test-bucket")
    parser.add_argument("--file", required=True, help="Local file to upload/download")
    parser.add_argument("--duration", type=int, default=60, help="Test duration in seconds")
    parser.add_argument("--concurrency", type=int, default=4, help="Number of concurrent processes")
    parser.add_argument("--secure", default=False, help="Use HTTP instead of HTTPS")
    parser.add_argument("--test", choices=["upload", "download", "both"], default="both")

    args = parser.parse_args()

    if not os.path.isfile(args.file):
        print(f"Error: File {args.file} does not exist.")
        return

    result_queue = mp.Queue()
    processes = []

    if args.test in ("upload", "both"):
        for i in range(args.concurrency):
            p = mp.Process(target=upload_worker, args=(args, i + 1, result_queue))
            processes.append(p)
            p.start()

    if args.test in ("download", "both"):
        for i in range(args.concurrency):
            p = mp.Process(target=download_worker, args=(args, i + 1, result_queue))
            processes.append(p)
            p.start()

    for p in processes:
        p.join()

    # Collect results
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    # Aggregate by type
    upload_ops = 0
    upload_total_lat = 0.0
    upload_errors = 0
    upload_file_size = 0

    download_ops = 0
    download_total_lat = 0.0
    download_errors = 0
    download_file_size = 0

    for typ, wid, ops, lat, err, size in results:
        if typ == "upload":
            upload_ops += ops
            upload_total_lat += lat
            upload_errors += err
            upload_file_size = size
        else:
            download_ops += ops
            download_total_lat += lat
            download_errors += err
            download_file_size = size

    print("\n" + "="*60)
    if args.test in ("upload", "both"):
        if upload_ops > 0:
            total_data_mb = (upload_ops * upload_file_size) / (1024 * 1024)
            avg_latency = upload_total_lat / upload_ops
            throughput = total_data_mb / args.duration
            print(f"[UPLOAD] Duration: {args.duration}s | Ops: {upload_ops} | Errors: {upload_errors}")
            print(f"[UPLOAD] Total Data: {total_data_mb:.2f} MB | Throughput: {throughput:.2f} MB/s")
            print(f"[UPLOAD] Avg Latency: {avg_latency*1000:.2f} ms")
        else:
            print("[UPLOAD] No successful operations.")

    if args.test in ("download", "both"):
        if download_ops > 0:
            total_data_mb = (download_ops * download_file_size) / (1024 * 1024)
            avg_latency = download_total_lat / download_ops
            throughput = total_data_mb / args.duration
            print(f"[DOWNLOAD] Duration: {args.duration}s | Ops: {download_ops} | Errors: {download_errors}")
            print(f"[DOWNLOAD] Total Data: {total_data_mb:.2f} MB | Throughput: {throughput:.2f} MB/s")
            print(f"[DOWNLOAD] Avg Latency: {avg_latency*1000:.2f} ms")
        else:
            print("[DOWNLOAD] No successful operations.")

    print("="*60)


if __name__ == "__main__":
    main()


本页内容