鲲鹏社区首页
EN
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助

主从数据一致性检验

校验鲲鹏节点上的数据与x86节点上数据的一致性,确保数据同步的准确性。

在完成主备切换之前,本章节中的“主库”指x86主库;完成主备切换之后,“主库”指鲲鹏新主库。

  1. 在主库服务器上,启动PostgreSQL服务。
    /usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile start
  2. 可选:在主库上,可以使用Sysbench工具模拟数据写入操作,以验证数据同步的实时性。

    如果在用户的实际环境中,主库已经在对外提供服务时发生数据写入操作,可忽略本步骤。

    这里使用read_write模式进行测试。

    sysbench \
    --db-driver=pgsql \
    --pgsql-host=127.0.0.1 \
    --pgsql-port=5432 \
    --pgsql-user=postgres \
    --pgsql-password=123456 \
    --pgsql-db=sysbench \
    --table_size=1000000 \
    --tables=10 \
    --time=20 \
    --threads=6 \
    --report-interval=1 oltp_read_write run

  3. 等待鲲鹏从库同步完所有主库上的增量修改,检查主备同步状态。
    1. 在主库服务器上,登录主库数据库。
      /usr/local/pgsql-13.2/bin/psql -U postgres
    2. 使用如下SQL查询鲲鹏从库状态情况和主库状态情况。
      select * from pg_stat_replication; select pg_current_wal_lsn();

  4. 校验主库和鲲鹏从库的数据一致性。

    主要使用PostgreSQL的Data Checksums进行数据校验,必须要在关闭PostgreSQL服务的条件下进行数据一致性校验。

    1. 停止主库和鲲鹏从库上的PostgreSQL服务。
      /usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile stop
    2. 在主库和鲲鹏从库上启用Data Checksums。
      /usr/local/pgsql-13.2/bin/pg_checksums -e -D /data/pg-13.2/data
    3. 校验数据目录的Checksums。
      /usr/local/pgsql-13.2/bin/pg_checksums -c -D /data/pg-13.2/data
    4. 可选:开启Data Checksums对服务的性能影响较大,可以在重启服务前关闭Data Checksums。
      /usr/local/pgsql-13.2/bin/pg_checksums -d -D /data/pg-13.2/data

    5. 校验完成后,先重启主库,再重启鲲鹏从库。
      /usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile start
  5. 单表数据一致性的校验。

    对于单表数据一致性的校验,可以使用自定义的Python脚本pg-table-check.py,该脚本通过对比CSV文件进行校验。

    针对PostgreSQL数据库环境,鉴于没有直接等同于Percona Toolkit中的pt-table-check工具来校验数据一致性,可以设计一个简化版的Python脚本来实现数据一致性校验功能。该脚本的使用前提条件为:
    • 确保在被检查的表上执行该脚本前,所有的写操作已暂停。
    • 确认数据库的主从复制状态正常,且主库与从库的当前日志序列号(LSN)完全一致。
    • 主库的数据库服务和从库的数据库服务都使用相同的端口号(默认为5432)对外提供服务。
    • 执行脚本的机器需配置有Python 3环境。
    1. 在主库服务器上,登录主库数据库。
      /usr/local/pgsql-13.2/bin/psql -U postgres
    2. 创建并写入pg-table-check.py脚本文件。
      1. 创建文件。
        vi pg-table-check.py
      2. 按“i”进入编辑模式,在文件中添加如下内容。
        import argparse
        import psycopg2
        import csv
        import hashlib
        def GetArgs():
            parser = argparse.ArgumentParser()
            parser.add_argument('-H', '--host', default='127.0.0.1', help='input host address')
            parser.add_argument('-P', '--port', type=int, default=5432, help='input host port')
            parser.add_argument('-D', '--dbname', help='input database name')
            parser.add_argument('-T', '--table', help='input table name')
            parser.add_argument('-U', '--user', default='postgres', help='input user')
            parser.add_argument('-p', '--password', default='postgres', help='input password')
            parser.add_argument('-O', '--output_dir', default='/data/', help='output directory for CSV files')
            args = parser.parse_args()
            return args
        def QuerySQL(conn, sql):
            try:
                cur = conn.cursor()
                cur.execute(sql)
                rows = cur.fetchall()
                columns = [column[0] for column in cur.description]
                results_with_columns = [dict(zip(columns, row)) for row in rows]
                rst = []
                for row in results_with_columns:
                    rst.append(row)
                cur.close()
                return rst
            except psycopg2.Error as e:
                print(e)
                cur.close()
                conn.reset()
                return 1
        argInfo = GetArgs()
        # 连接到主数据库
        conn_master = psycopg2.connect(
            dbname=argInfo.dbname,
            user=argInfo.user,
            password=argInfo.password,
            host=argInfo.host,
            port=argInfo.port
        )
        cur_master = conn_master.cursor()
        # 连接主库 并获取主库LSN
        rst = QuerySQL(conn_master, "select pg_current_wal_lsn() as lsn")
        if (rst == 1):
            exit()
        masterLSN = rst[0]["lsn"]
        # 执行 SQL 查询
        cur_master.execute(f"SELECT * FROM {argInfo.table}")
        # 提取查询结果
        results_master = cur_master.fetchall()
        # 将结果写入CSV文件
        output_file_master = f"{argInfo.output_dir}/output_master.csv"
        with open(output_file_master, 'w', newline='') as csvfile_master:
            csvwriter_master = csv.writer(csvfile_master)
            csvwriter_master.writerow([desc[0] for desc in cur_master.description])
            csvwriter_master.writerows(results_master)
        # 计算主数据库CSV文件的MD5值
        hash_master = hashlib.md5(open(output_file_master, 'rb').read()).hexdigest()
        # 获取从库连接信息并连接从库,检查replay_lsn是否与master当前LSN一致
        rst = QuerySQL(conn_master, "select * from pg_stat_replication")
        if (rst == 1):
            exit()
        slaveConnsInfo = []
        slaveConns = []
        for rt in rst:
            slaveLSN = rt["replay_lsn"]
            if (slaveLSN != masterLSN) :
                print()
                exit()
            connInfo = {
                "host": rt["client_addr"],
                "port": argInfo.port,
                "dbname": argInfo.dbname,
                "user": argInfo.user,
                "password": argInfo.password,
            }
            slaveConns.append(psycopg2.connect(**connInfo))
            slaveConnsInfo.append(connInfo)
        # 连接到从数据库
        for slave_conn_info in slaveConnsInfo:
            conn_slave = psycopg2.connect(
                dbname=slave_conn_info["dbname"],
                user=slave_conn_info["user"],
                password=slave_conn_info["password"],
                host=slave_conn_info["host"],
                port=slave_conn_info["port"]
            )
            cur_slave = conn_slave.cursor()
            cur_slave.execute(f"SELECT * FROM {argInfo.table}")
            results_slave = cur_slave.fetchall()
            output_file_slave = f"{argInfo.output_dir}/output_slave_{slave_conn_info['host']}.csv"
            with open(output_file_slave, 'w', newline='') as csvfile_slave:
                csvwriter_slave = csv.writer(csvfile_slave)
                csvwriter_slave.writerow([desc[0] for desc in cur_slave.description])
                csvwriter_slave.writerows(results_slave)
            hash_slave = hashlib.md5(open(output_file_slave, 'rb').read()).hexdigest()
            if hash_slave != hash_master:
                print(f"{slave_conn_info['host']}与主库数据不一致")
            else:
                print(f"{slave_conn_info['host']}与主库数据一致")
      3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
    3. 检验主从库某表数据是否一致。

      执行如下脚本。

      python3 .\pg-table-check.py -H 主库服务器的IP地址 -P 5432 -D sysbench -T sbtest3 -U postgres -p123456

      以下参数请根据实际情况修改:

      • -H指定主库服务器的IP地址。
      • -P指定主库数据库服务使用的侦听端口号,默认为5432。从库必须使用与主库相同的侦听端口号,否则本脚本无法连接从库进行数据检查。
      • -D指定本次检查表所在的数据库名称。
      • -T指定本次检查的表的名称。
      • -U指定本次检查使用的数据库登录用户名。
      • -p指定本次检查使用的数据库登录密码。

      执行脚本后提示数据一致,表示主库和鲲鹏从库之间的数据在逻辑和物理层面都是一致的。