开发者
资源
大数据日志分析工具OmniHelper的实践案例

大数据日志分析工具OmniHelper的实践案例

原生开发大数据

发表于 2026/06/12

0

1 背景

随着某公司实时计算平台升级,公司计划将线上的 Flink 作业逐步迁移至自研的 OmniStream 引擎。OmniStream 在性能、资源弹性和运维体验上均有显著提升,但与原生 Flink 存在一定的语法和语义差异。然而,当前在实际运维中面临以下突出挑战:

第一,传统的评估方式存在明显短板只能通过静态扫描或人工review sql语句去识别不兼容的算子、表达式和内置函数。

第二,我们不仅需要知道“用了哪些不兼容的算子/函数”,更需要知道“这些不兼容元素在真实负载下的影响有多大” —— 一个低频调用的小函数,和每秒执行百万次的热点算子,改造紧迫性完全不同。

于是我们开发了Omnihelper支持对用户Flink上的历史任务进行分析并且支持输出含有Omni不支持的算子/表达式/内置函数的报告。能够清晰的识别哪些算子/表达式/内置函数需要改造。明确哪些算子/表达式/内置函数出现频次高优先级高。

2 工具介绍及获取方式

工具简介:

OmniHelper是专为大数据平台设计的日志分析工具,旨在帮助开发者和运维人员高效分析执行日志中的算子、表达式和函数使用情况。该工具作为Omni生态的配套组件,能够识别原生算子与Omni算子的混合执行情况,分析相关参数类型,识别不支持的算子/表达式/函数并生成结构化的分析报告,为性能优化提供数据支持。

工具获取:

https://gitcode.com/openeuler/OmniAdaptor

使用文档:

https://gitcode.com/openeuler/OmniAdaptor/blob/master/omnihelper/README.md

3 使用效果

3.1 环境部署

python环境要求:

  • Python >= 3.9
  • 依赖项:tqdm、numpy、pandas、openpyxl、requests
  • 可选依赖:requests-kerberos(启用Kerberos认证时需要)

Flink日志分析环境要求:

  • 确保Flink集群已启动并可访问REST API。
  • 确保Flink Dashboard URL可访问(默认:http://localhost:8081)。
  • 如果使用Kerberos认证,确保已安装并配置Kerberos相关依赖:pip install requests-kerberos

3.2 工具使用方法

  • 解压BoostKit-omniruntime-omnihelper-*.zip文件:unzip BoostKit-omniruntime-omnihelper-*.zip
  • 进入解压后的文件夹,解压对应压缩包。
  • Arm架构:tar -zxvf omnihelper_release_arm.tar.gz       x86架构:tar -zxvf omnihelper_release_x86.tar.gz

我们以Arm架构为例最终会在目录汇总生成omnihelper相关的内容:


为了提升类型识别的准确性,需导出Flink表结构至resources/flink_table_schema.csv,文件需包含三列:table_name(表名)、field_name(列名)、field_type(数据类型)。含逗号的类型需用双引号包裹。

表结构导出示例: 示例中数据表、字段,以及字段类型是nexmark的部分数据,根据实际的SQL内容填充。

table_name,field_name,field_type
datagen,event_type,int
datagen,person,"ROW<id BIGINT, name VARCHAR, emailAddress VARCHAR, city VARCHAR>"
datagen,auction,"ROW<id BIGINT, itemName VARCHAR, initialBid BIGINT, dateTime TIMESTAMP(3)>" 
datagen,bid,"ROW<auction BIGINT,bidder BIGINT,price BIGINT,dateTime TIMESTAMP(3)>"

字段说明: table_name为表名;field_name为列名;field_type为数据类型,支持基础类型和复杂类型。

复杂类型说明:

ROW 类型表示嵌套结构(子表)。 例如:

datagen,auction,"ROW<id BIGINT,itemName VARCHAR>"

完成resources/flink_table_schema.csv的编写后执行flink命令:

omnihelper flink [-h] --url URL [--jobid JOBID [JOBID ...]] [--input_data INPUT_DATA]
                 [--interval INTERVAL] [--timeout TIMEOUT] [--output_dir OUTPUT_DIR]
                 [--show-op-details] 

Flink分析模式参数说明

表1 Flink基础参数

参数简写是否必选说明
--help-h查看帮助信息.
--url-uFlink Dashboard URL,例如:http://127.0.0.1:8081https://127.0.0.1:8081
--jobid-jFlink作业ID。支持多个作业ID,用空格分隔。如果不提供,将尝试从API获取。
--input_data输入字段和类型的映射文件路径,CSV格式。CSV文件应包含table_name、field_name和field_type列。
--output_dir-o输出目录。

   默认值:./output
--show-op-details-s隐藏算子文件大小、输出行数信息。
--interval-iAPI调用间隔(毫秒)。

   默认值:100。
--timeout-tAPI调用超时时间(秒)。

   默认值:30。

打开生成的日志报告会有如下属性:

通过高效分析执行日志中的算子、表达式、函数使用情况,识别不支持的算子、表达式、函数并生成结构化的分析报告,为性能优化提供数据支持

4 总结

通过分析 Flink EventLog 中的实际执行指标,OmniHelper 能够精准识别不兼容的算子、表达式、函数和数据类型;关联执行频次、耗时、数据量,量化改造优先级;输出结构化 Excel 报告,支撑迁移决策与排期

本页内容