主从数据一致性检验
校验鲲鹏节点上的数据与x86节点上数据的一致性,确保数据同步的准确性。

在完成主备切换之前,本章节中的“主库”指x86主库;完成主备切换之后,“主库”指鲲鹏新主库。
- 在主库服务器上,启动PostgreSQL服务。
/usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile start
- 可选:在主库上,可以使用Sysbench工具模拟数据写入操作,以验证数据同步的实时性。
如果在用户的实际环境中,主库已经在对外提供服务时发生数据写入操作,可忽略本步骤。
这里使用read_write模式进行测试。
sysbench \ --db-driver=pgsql \ --pgsql-host=127.0.0.1 \ --pgsql-port=5432 \ --pgsql-user=postgres \ --pgsql-password=123456 \ --pgsql-db=sysbench \ --table_size=1000000 \ --tables=10 \ --time=20 \ --threads=6 \ --report-interval=1 oltp_read_write run
- 等待鲲鹏从库同步完所有主库上的增量修改,检查主备同步状态。
- 在主库服务器上,登录主库数据库。
/usr/local/pgsql-13.2/bin/psql -U postgres
- 使用如下SQL查询鲲鹏从库状态情况和主库状态情况。
select * from pg_stat_replication; select pg_current_wal_lsn();
- 在主库服务器上,登录主库数据库。
- 校验主库和鲲鹏从库的数据一致性。
主要使用PostgreSQL的Data Checksums进行数据校验,必须要在关闭PostgreSQL服务的条件下进行数据一致性校验。
- 停止主库和鲲鹏从库上的PostgreSQL服务。
/usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile stop
- 在主库和鲲鹏从库上启用Data Checksums。
/usr/local/pgsql-13.2/bin/pg_checksums -e -D /data/pg-13.2/data
- 校验数据目录的Checksums。
/usr/local/pgsql-13.2/bin/pg_checksums -c -D /data/pg-13.2/data
- 可选:开启Data Checksums对服务的性能影响较大,可以在重启服务前关闭Data Checksums。
/usr/local/pgsql-13.2/bin/pg_checksums -d -D /data/pg-13.2/data
- 校验完成后,先重启主库,再重启鲲鹏从库。
/usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile start
- 停止主库和鲲鹏从库上的PostgreSQL服务。
- 单表数据一致性的校验。
对于单表数据一致性的校验,可以使用自定义的Python脚本pg-table-check.py,该脚本通过对比CSV文件进行校验。
针对PostgreSQL数据库环境,鉴于没有直接等同于Percona Toolkit中的pt-table-check工具来校验数据一致性,可以设计一个简化版的Python脚本来实现数据一致性校验功能。该脚本的使用前提条件为:- 确保在被检查的表上执行该脚本前,所有的写操作已暂停。
- 确认数据库的主从复制状态正常,且主库与从库的当前日志序列号(LSN)完全一致。
- 主库的数据库服务和从库的数据库服务都使用相同的端口号(默认为5432)对外提供服务。
- 执行脚本的机器需配置有Python 3环境。
- 在主库服务器上,登录主库数据库。
/usr/local/pgsql-13.2/bin/psql -U postgres
- 创建并写入pg-table-check.py脚本文件。
- 创建文件。
vi pg-table-check.py
- 按“i”进入编辑模式,在文件中添加如下内容。
import argparse import psycopg2 import csv import hashlib def GetArgs(): parser = argparse.ArgumentParser() parser.add_argument('-H', '--host', default='127.0.0.1', help='input host address') parser.add_argument('-P', '--port', type=int, default=5432, help='input host port') parser.add_argument('-D', '--dbname', help='input database name') parser.add_argument('-T', '--table', help='input table name') parser.add_argument('-U', '--user', default='postgres', help='input user') parser.add_argument('-p', '--password', default='postgres', help='input password') parser.add_argument('-O', '--output_dir', default='/data/', help='output directory for CSV files') args = parser.parse_args() return args def QuerySQL(conn, sql): try: cur = conn.cursor() cur.execute(sql) rows = cur.fetchall() columns = [column[0] for column in cur.description] results_with_columns = [dict(zip(columns, row)) for row in rows] rst = [] for row in results_with_columns: rst.append(row) cur.close() return rst except psycopg2.Error as e: print(e) cur.close() conn.reset() return 1 argInfo = GetArgs() # 连接到主数据库 conn_master = psycopg2.connect( dbname=argInfo.dbname, user=argInfo.user, password=argInfo.password, host=argInfo.host, port=argInfo.port ) cur_master = conn_master.cursor() # 连接主库 并获取主库LSN rst = QuerySQL(conn_master, "select pg_current_wal_lsn() as lsn") if (rst == 1): exit() masterLSN = rst[0]["lsn"] # 执行 SQL 查询 cur_master.execute(f"SELECT * FROM {argInfo.table}") # 提取查询结果 results_master = cur_master.fetchall() # 将结果写入CSV文件 output_file_master = f"{argInfo.output_dir}/output_master.csv" with open(output_file_master, 'w', newline='') as csvfile_master: csvwriter_master = csv.writer(csvfile_master) csvwriter_master.writerow([desc[0] for desc in cur_master.description]) csvwriter_master.writerows(results_master) # 计算主数据库CSV文件的MD5值 hash_master = hashlib.md5(open(output_file_master, 'rb').read()).hexdigest() # 获取从库连接信息并连接从库,检查replay_lsn是否与master当前LSN一致 rst = QuerySQL(conn_master, "select * from pg_stat_replication") if (rst == 1): exit() slaveConnsInfo = [] slaveConns = [] for rt in rst: slaveLSN = rt["replay_lsn"] if (slaveLSN != masterLSN) : print() exit() connInfo = { "host": rt["client_addr"], "port": argInfo.port, "dbname": argInfo.dbname, "user": argInfo.user, "password": argInfo.password, } slaveConns.append(psycopg2.connect(**connInfo)) slaveConnsInfo.append(connInfo) # 连接到从数据库 for slave_conn_info in slaveConnsInfo: conn_slave = psycopg2.connect( dbname=slave_conn_info["dbname"], user=slave_conn_info["user"], password=slave_conn_info["password"], host=slave_conn_info["host"], port=slave_conn_info["port"] ) cur_slave = conn_slave.cursor() cur_slave.execute(f"SELECT * FROM {argInfo.table}") results_slave = cur_slave.fetchall() output_file_slave = f"{argInfo.output_dir}/output_slave_{slave_conn_info['host']}.csv" with open(output_file_slave, 'w', newline='') as csvfile_slave: csvwriter_slave = csv.writer(csvfile_slave) csvwriter_slave.writerow([desc[0] for desc in cur_slave.description]) csvwriter_slave.writerows(results_slave) hash_slave = hashlib.md5(open(output_file_slave, 'rb').read()).hexdigest() if hash_slave != hash_master: print(f"{slave_conn_info['host']}与主库数据不一致") else: print(f"{slave_conn_info['host']}与主库数据一致")
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 创建文件。
- 检验主从库某表数据是否一致。
python3 .\pg-table-check.py -H 主库服务器的IP地址 -P 5432 -D sysbench -T sbtest3 -U postgres -p123456
以下参数请根据实际情况修改:
- -H指定主库服务器的IP地址。
- -P指定主库数据库服务使用的侦听端口号,默认为5432。从库必须使用与主库相同的侦听端口号,否则本脚本无法连接从库进行数据检查。
- -D指定本次检查表所在的数据库名称。
- -T指定本次检查的表的名称。
- -U指定本次检查使用的数据库登录用户名。
- -p指定本次检查使用的数据库登录密码。
执行脚本后提示数据一致,表示主库和鲲鹏从库之间的数据在逻辑和物理层面都是一致的。
父主题: 将鲲鹏服务器加入主从部署