校验鲲鹏节点上的数据与x86节点上数据的一致性,确保数据同步的准确性。
在完成主备切换之前,本章节中的“主库”指x86主库;完成主备切换之后,“主库”指鲲鹏新主库。
/usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile start
如果在用户的实际环境中,主库已经在对外提供服务时发生数据写入操作,可忽略本步骤。
这里使用read_write模式进行测试。
sysbench \ --db-driver=pgsql \ --pgsql-host=127.0.0.1 \ --pgsql-port=5432 \ --pgsql-user=postgres \ --pgsql-password=123456 \ --pgsql-db=sysbench \ --table_size=1000000 \ --tables=10 \ --time=20 \ --threads=6 \ --report-interval=1 oltp_read_write run
/usr/local/pgsql-13.2/bin/psql -U postgres
select * from pg_stat_replication; select pg_current_wal_lsn();
主要使用PostgreSQL的Data Checksums进行数据校验,必须要在关闭PostgreSQL服务的条件下进行数据一致性校验。
/usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile stop
/usr/local/pgsql-13.2/bin/pg_checksums -e -D /data/pg-13.2/data
/usr/local/pgsql-13.2/bin/pg_checksums -c -D /data/pg-13.2/data
/usr/local/pgsql-13.2/bin/pg_checksums -d -D /data/pg-13.2/data
/usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile start
对于单表数据一致性的校验,可以使用自定义的Python脚本pg-table-check.py,该脚本通过对比CSV文件进行校验。
/usr/local/pgsql-13.2/bin/psql -U postgres
vi pg-table-check.py
import argparse import psycopg2 import csv import hashlib def GetArgs(): parser = argparse.ArgumentParser() parser.add_argument('-H', '--host', default='127.0.0.1', help='input host address') parser.add_argument('-P', '--port', type=int, default=5432, help='input host port') parser.add_argument('-D', '--dbname', help='input database name') parser.add_argument('-T', '--table', help='input table name') parser.add_argument('-U', '--user', default='postgres', help='input user') parser.add_argument('-p', '--password', default='postgres', help='input password') parser.add_argument('-O', '--output_dir', default='/data/', help='output directory for CSV files') args = parser.parse_args() return args def QuerySQL(conn, sql): try: cur = conn.cursor() cur.execute(sql) rows = cur.fetchall() columns = [column[0] for column in cur.description] results_with_columns = [dict(zip(columns, row)) for row in rows] rst = [] for row in results_with_columns: rst.append(row) cur.close() return rst except psycopg2.Error as e: print(e) cur.close() conn.reset() return 1 argInfo = GetArgs() # 连接到主数据库 conn_master = psycopg2.connect( dbname=argInfo.dbname, user=argInfo.user, password=argInfo.password, host=argInfo.host, port=argInfo.port ) cur_master = conn_master.cursor() # 连接主库 并获取主库LSN rst = QuerySQL(conn_master, "select pg_current_wal_lsn() as lsn") if (rst == 1): exit() masterLSN = rst[0]["lsn"] # 执行 SQL 查询 cur_master.execute(f"SELECT * FROM {argInfo.table}") # 提取查询结果 results_master = cur_master.fetchall() # 将结果写入CSV文件 output_file_master = f"{argInfo.output_dir}/output_master.csv" with open(output_file_master, 'w', newline='') as csvfile_master: csvwriter_master = csv.writer(csvfile_master) csvwriter_master.writerow([desc[0] for desc in cur_master.description]) csvwriter_master.writerows(results_master) # 计算主数据库CSV文件的MD5值 hash_master = hashlib.md5(open(output_file_master, 'rb').read()).hexdigest() # 获取从库连接信息并连接从库,检查replay_lsn是否与master当前LSN一致 rst = QuerySQL(conn_master, "select * from pg_stat_replication") if (rst == 1): exit() slaveConnsInfo = [] slaveConns = [] for rt in rst: slaveLSN = rt["replay_lsn"] if (slaveLSN != masterLSN) : print() exit() connInfo = { "host": rt["client_addr"], "port": argInfo.port, "dbname": argInfo.dbname, "user": argInfo.user, "password": argInfo.password, } slaveConns.append(psycopg2.connect(**connInfo)) slaveConnsInfo.append(connInfo) # 连接到从数据库 for slave_conn_info in slaveConnsInfo: conn_slave = psycopg2.connect( dbname=slave_conn_info["dbname"], user=slave_conn_info["user"], password=slave_conn_info["password"], host=slave_conn_info["host"], port=slave_conn_info["port"] ) cur_slave = conn_slave.cursor() cur_slave.execute(f"SELECT * FROM {argInfo.table}") results_slave = cur_slave.fetchall() output_file_slave = f"{argInfo.output_dir}/output_slave_{slave_conn_info['host']}.csv" with open(output_file_slave, 'w', newline='') as csvfile_slave: csvwriter_slave = csv.writer(csvfile_slave) csvwriter_slave.writerow([desc[0] for desc in cur_slave.description]) csvwriter_slave.writerows(results_slave) hash_slave = hashlib.md5(open(output_file_slave, 'rb').read()).hexdigest() if hash_slave != hash_master: print(f"{slave_conn_info['host']}与主库数据不一致") else: print(f"{slave_conn_info['host']}与主库数据一致")
python3 .\pg-table-check.py -H 主库服务器的IP地址 -P 5432 -D sysbench -T sbtest3 -U postgres -p123456
以下参数请根据实际情况修改:
执行脚本后提示数据一致,表示主库和鲲鹏从库之间的数据在逻辑和物理层面都是一致的。