Rate This Document
Findability
Accuracy
Completeness
Readability

Verifying Primary/Secondary Data Consistency

Check the data consistency between the Kunpeng and x86 nodes to ensure the accuracy of data synchronization.

Before the primary/secondary switchover, the "primary database" in this section refers to the x86 primary database. After the primary/secondary switchover, the "primary database" refers to the new Kunpeng primary database.

  1. Start the PostgreSQL service on the primary database server.
    /usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile start
  2. Optional: On the primary database, use sysbench to simulate data write operation to test the real-time data synchronization.

    In the actual situation, if data has been written to the primary database when the database provides services for external systems, skip this step.

    The read_write mode is used for the test.

    sysbench \
    --db-driver=pgsql \
    --pgsql-host=127.0.0.1 \
    --pgsql-port=5432 \
    --pgsql-user=postgres \
    --pgsql-password=123456 \
    --pgsql-db=sysbench \
    --table_size=1000000 \
    --tables=10 \
    --time=20 \
    --threads=6 \
    --report-interval=1 oltp_read_write run

  3. Wait until the Kunpeng secondary database synchronizes all the incremental modifications on the primary database and check the primary/secondary synchronization status.
    1. Log in to the primary database on the primary database server.
      /usr/local/pgsql-13.2/bin/psql -U postgres
    2. Run the following SQL statement to query the status of the primary database and Kunpeng secondary database:
      select * from pg_stat_replication; select pg_current_wal_lsn();

  4. Verify data consistency between the primary and Kunpeng secondary databases.

    Data checksums of PostgreSQL are used for data verification. Data consistency verification must be performed when the PostgreSQL service is disabled.

    1. Stop the PostgreSQL service on the primary and Kunpeng secondary databases.
      /usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile stop
    2. Enable data checksums on the primary and Kunpeng secondary databases.
      /usr/local/pgsql-13.2/bin/pg_checksums -e -D /data/pg-13.2/data
    3. Verify the checksums of the data directory.
      /usr/local/pgsql-13.2/bin/pg_checksums -c -D /data/pg-13.2/data
    4. Optional: Enabling data checksums has a great impact on the service performance. Disable data checksums before restarting the service.
      /usr/local/pgsql-13.2/bin/pg_checksums -d -D /data/pg-13.2/data

    5. After the verification is complete, restart the primary database and then the Kunpeng secondary database.
      /usr/local/pgsql-13.2/bin/pg_ctl -D /data/pg-13.2/data -l logfile start
  5. Verify the data consistency of a table.

    The customized Python script pg-table-check.py can be used to verify the data consistency of a table by comparing the CSV files.

    In the PostgreSQL database environment, there is no data consistency verification tool such as the pt-table-check tool of Percona Toolkit. Therefore, a simplified Python script can be designed to verify data consistency. The prerequisites for using the script are as follows:
    • Ensure that all write operation is suspended before the script is executed on the table to be checked.
    • Ensure that the primary/secondary replication status is normal and the LSNs of the primary and secondary databases are the same.
    • The primary and secondary databases use the same port (5432 by default) to provide services for external systems.
    • The Python 3 environment must be configured on the host where the script is executed.
    1. Log in to the primary database on the primary database server.
      /usr/local/pgsql-13.2/bin/psql -U postgres
    2. Create and write the pg-table-check.py script file.
      1. Create the file.
        vi pg-table-check.py
      2. Press i to enter the insert mode and add the following content to the file:
        import argparse
        import psycopg2
        import csv
        import hashlib
        def GetArgs():
            parser = argparse.ArgumentParser()
            parser.add_argument('-H', '--host', default='127.0.0.1', help='input host address')
            parser.add_argument('-P', '--port', type=int, default=5432, help='input host port')
            parser.add_argument('-D', '--dbname', help='input database name')
            parser.add_argument('-T', '--table', help='input table name')
            parser.add_argument('-U', '--user', default='postgres', help='input user')
            parser.add_argument('-p', '--password', default='postgres', help='input password')
            parser.add_argument('-O', '--output_dir', default='/data/', help='output directory for CSV files')
            args = parser.parse_args()
            return args
        def QuerySQL(conn, sql):
            try:
                cur = conn.cursor()
                cur.execute(sql)
                rows = cur.fetchall()
                columns = [column[0] for column in cur.description]
                results_with_columns = [dict(zip(columns, row)) for row in rows]
                rst = []
                for row in results_with_columns:
                    rst.append(row)
                cur.close()
                return rst
            except psycopg2.Error as e:
                print(e)
                cur.close()
                conn.reset()
                return 1
        argInfo = GetArgs()
        # Connect to the primary database.
        conn_master = psycopg2.connect(
            dbname=argInfo.dbname,
            user=argInfo.user,
            password=argInfo.password,
            host=argInfo.host,
            port=argInfo.port
        )
        cur_master = conn_master.cursor()
        # Connect to the primary database and obtain its LSN.
        rst = QuerySQL(conn_master, "select pg_current_wal_lsn() as lsn")
        if (rst == 1):
            exit()
        masterLSN = rst[0]["lsn"]
        # Execute the SQL query.
        cur_master.execute(f"SELECT * FROM {argInfo.table}")
        # Fetch the query result.
        results_master = cur_master.fetchall()
        # Write the results to a CSV file.
        output_file_master = f"{argInfo.output_dir}/output_master.csv"
        with open(output_file_master, 'w', newline='') as csvfile_master:
            csvwriter_master = csv.writer(csvfile_master)
            csvwriter_master.writerow([desc[0] for desc in cur_master.description])
            csvwriter_master.writerows(results_master)
        # Calculate the MD5 value of the CSV file of the primary database.
        hash_master = hashlib.md5(open(output_file_master, 'rb').read()).hexdigest()
        # Obtain the secondary database connection information and connect to it. Check whether the value of replay_lsn is the same as the current LSN of the primary database.
        rst = QuerySQL(conn_master, "select * from pg_stat_replication")
        if (rst == 1):
            exit()
        slaveConnsInfo = []
        slaveConns = []
        for rt in rst:
            slaveLSN = rt["replay_lsn"]
            if (slaveLSN != masterLSN) :
                print()
                exit()
            connInfo = {
                "host": rt["client_addr"],
                "port": argInfo.port,
                "dbname": argInfo.dbname,
                "user": argInfo.user,
                "password": argInfo.password,
            }
            slaveConns.append(psycopg2.connect(**connInfo))
            slaveConnsInfo.append(connInfo)
        # Connect to the secondary database.
        for slave_conn_info in slaveConnsInfo:
            conn_slave = psycopg2.connect(
                dbname=slave_conn_info["dbname"],
                user=slave_conn_info["user"],
                password=slave_conn_info["password"],
                host=slave_conn_info["host"],
                port=slave_conn_info["port"]
            )
            cur_slave = conn_slave.cursor()
            cur_slave.execute(f"SELECT * FROM {argInfo.table}")
            results_slave = cur_slave.fetchall()
            output_file_slave = f"{argInfo.output_dir}/output_slave_{slave_conn_info['host']}.csv"
            with open(output_file_slave, 'w', newline='') as csvfile_slave:
                csvwriter_slave = csv.writer(csvfile_slave)
                csvwriter_slave.writerow([desc[0] for desc in cur_slave.description])
                csvwriter_slave.writerows(results_slave)
            hash_slave = hashlib.md5(open(output_file_slave, 'rb').read()).hexdigest()
            if hash_slave != hash_master:
                print (f"{slave_conn_info['host']} is inconsistent with that in the primary database")
            else:
                print (f"{slave_conn_info['host']} is consistent with that in the primary database")
      3. Press Esc, type :wq!, and press Enter to save the file and exit.
    3. Check whether the table data in the primary and secondary databases is consistent.

      Execute the script.

      python3 .\pg-table-check.py -H IP address of the primary database server -P 5432 -D sysbench -T sbtest3 -U postgres -p123456

      Modify the following parameters as required:

      • -H specifies the IP address of the primary database server.
      • -P specifies the listening port number used by the primary database service. The default value is 5432. The listening port number of the secondary database must be the same as that of the primary database. Otherwise, the script cannot connect to the secondary database for data check.
      • -D specifies the name of the database where the table to be checked is located.
      • -T specifies the name of the table to be checked.
      • -U specifies the user name for logging in to the database.
      • -p specifies the password for logging in to the database.

      If the system displays a message indicating that the data is consistent after the script is executed, the data in the primary database is consistent with that in the Kunpeng secondary database at the logical and physical layers.