云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

使用Python来实现MySQL与PostgerSQL之间的数据实时同步?

jxf315 2025-06-15 17:12:13 教程文章 4 ℃

实现MySQL与PostgreSQL之间的数据实时同步是一项复杂的任务涉及到数据库的更改捕获(Change Data Capture, CDC)、消息队列以及数据写入机制。下面我们就通过一个简单的步骤来演示如何使用Python来实现这种数据同步。

实现数据同步逻辑

  • 第一、使用CDC工具(如Debezium)监控MySQL数据库的更改,并将这些更改捕获到消息队列(如Kafka)。
  • 第二、编写Python脚本来读取消息队列中的更改数据,并将其应用到PostgreSQL数据库。

设置数据库连接

安装所需的库

pip install mysql-connector-python psycopg2 pandas

实现同步脚本

以下是一个基本的Python脚本,用于从MySQL读取数据并插入到PostgreSQL中。

import mysql.connector
import psycopg2
import pandas as pd

# MySQL数据库配置
mysql_config = {
    'user': 'mysql_user',
    'password': 'mysql_password',
    'host': 'mysql_host',
    'database': 'mysql_database'
}

# PostgreSQL数据库配置
postgres_config = {
    'user': 'postgres_user',
    'password': 'postgres_password',
    'host': 'postgres_host',
    'database': 'postgres_database'
}

# 连接到MySQL数据库
mysql_conn = mysql.connector.connect(**mysql_config)
mysql_cursor = mysql_conn.cursor(dictionary=True)

# 连接到PostgreSQL数据库
postgres_conn = psycopg2.connect(**postgres_config)
postgres_cursor = postgres_conn.cursor()

# 从MySQL读取数据
mysql_cursor.execute("SELECT * FROM your_table")
rows = mysql_cursor.fetchall()

# 将数据转换为Pandas DataFrame
df = pd.DataFrame(rows)

# 将数据插入到PostgreSQL
for index, row in df.iterrows():
    insert_query = """
    INSERT INTO your_table (column1, column2, column3)
    VALUES (%s, %s, %s)
    ON CONFLICT (primary_key_column)
    DO UPDATE SET
        column1 = EXCLUDED.column1,
        column2 = EXCLUDED.column2,
        column3 = EXCLUDED.column3;
    """
    postgres_cursor.execute(insert_query, (row['column1'], row['column2'], row['column3']))
    postgres_conn.commit()

# 关闭数据库连接
mysql_cursor.close()
mysql_conn.close()
postgres_cursor.close()
postgres_conn.close()

这个示例并不包括CDC和消息队列的实现,但展示了基本的数据库操作。

实现更改捕获和消息队列

为了实现实时同步,需要设置Debezium,配置Kafka消费者。Debezium是一个CDC工具,可以捕获MySQL的更改并将其发送到Kafka等消息队列,编写一个Kafka消费者脚本,读取Kafka中的消息并将更改应用到PostgreSQL。

Kafka消费者示例(Python脚本)

from kafka import KafkaConsumer
import json
import psycopg2

# Kafka配置
kafka_topic = 'your_topic'
kafka_bootstrap_servers = 'localhost:9092'

# PostgreSQL数据库配置
postgres_config = {
    'user': 'postgres_user',
    'password': 'postgres_password',
    'host': 'postgres_host',
    'database': 'postgres_database'
}

# 连接到PostgreSQL数据库
postgres_conn = psycopg2.connect(**postgres_config)
postgres_cursor = postgres_conn.cursor()

# Kafka消费者配置
consumer = KafkaConsumer(
    kafka_topic,
    bootstrap_servers=[kafka_bootstrap_servers],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='your_group_id',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# 处理Kafka消息
for message in consumer:
    record = message.value
    # 提取并处理MySQL的更改记录
    if 'payload' in record:
        payload = record['payload']
        if payload['op'] == 'c' or payload['op'] == 'u':  # Create or Update operation
            new_data = payload['after']
            insert_query = """
            INSERT INTO your_table (column1, column2, column3)
            VALUES (%s, %s, %s)
            ON CONFLICT (primary_key_column)
            DO UPDATE SET
                column1 = EXCLUDED.column1,
                column2 = EXCLUDED.column2,
                column3 = EXCLUDED.column3;
            """
            postgres_cursor.execute(insert_query, (new_data['column1'], new_data['column2'], new_data['column3']))
            postgres_conn.commit()

# 关闭数据库连接
postgres_cursor.close()
postgres_conn.close()

结论

上述代码仅仅提供了一个基本框架用于实现MySQL与PostgreSQL之间的数据实时同步,实际应用中,需结合Debezium、Kafka等工具来实现更改捕获和消息传递。进一步的优化和错误处理也需要根据实际情况进行调整。

最近发表
标签列表