云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

MongoDB 数据同步kafka mongodb数据迁移到mysql

jxf315 2024-12-27 14:14:31 教程文章 31 ℃

时光闹钟app开发者,请关注我,后续分享更精彩!

坚持原创,共同进步!

概述

企业系统应用中,有时需要抽取MongoDB中数据到kafka,对数据二次加工再同步到es、hive等其他存储介质中,以便满足不同的业务场景需求。本文将向大家介绍,MongoDB到kafka的数据同步方案。该方案基于MongoDB官方提供的Kafka Source Connector,通过MongoDB的change stream监听机制实现数据的抽取。希望本文对有类似场景需求的朋友,能有所帮助和参考。

前提要求

Kafka Source Connector 使用MongoDB的change stream机制。要求MongoDB和kafka必须满足以下版本要求。

  • MongoDB必须为3.6或以后版本
  • Kafka Connect版本2.1.0或以后版本,对应Kafka 2.3或以后版本

本文测试版本:MongoDB版本3.6.8,kafka_2.12-2.7.1

其中:

MongoDB的安装可以参考文章:MongoDB安装

kafka connector框架介绍可以参考文章:kafka connectors框架入门浅析

特性介绍

上图展示了,使用MongoDB kafka connect从一个MongoDB集群(源集群)抽取数据,写入kafka,另一个MongoDB kafka connect消费kafka再写入MongoDB集群(目标集群)的数据流向。

从数据流向可知,MongoDB kafka connect有两种类型:

  • MongoDB Kafka source connector

MongoDB作为数据来源,connector从MongoDB提取数据写入kafka。

  • MongoDB Kafka sink connector

MongoDB作为数据集目标,connector从kafka消费数据写入MongoDB。

data format数据格式

MongoDB kafka connector 以及他的pipeline支持的数据格式。

官方文档:https://www.mongodb.com/docs/kafka-connector/current/introduction/data-formats/

1.Json

connector中可能存在以下和Json相关的数据格式处理

  • Raw JSON:Json对象String格式
  • BSON:Json对象的binary serialization系列化编码格式
  • JSON Schema:Json的Schema定义格式

2.Avro

Apache Avro 开源的一个序列号和数据传输框架。connector中定义了以下相关格式

  • Avro schema:类似json-base定义的Schema格式
  • Avro binary encoding:binary serialization编码的json Schema格式

3.Byte Arrays

byte array编码格式

Converters转换器

官方文档:https://www.mongodb.com/docs/kafka-connector/current/introduction/converters/

上图是converter在MongoDB kafka connector中的使用。灰色模块为converter对数据格式的转换处理。Source和Sink Connector中Converter的处理位置有所不同。

注意!!!

如果使用了同一类型的converter,在source和sink connector中必须使用相同类型的converter。
例:source中使用Protobuf写入kafka topic,sink connector必须使用Protobuf converter消费kafka的topic。

其中connector支持converter类型:
- Avro Converter
- Protobuf Converter
- JSON Schema Converter
- JSON Converter
- String Converter

安装

这里只介绍MongoDB Kafka source connector的安装使用,MongoDB Kafka sink connector的安装使用请参考官方文档,本文不做详细介绍。

下载kafka版本

#下载kafka,解压到/home/kafka/kafka_2.12-2.7.1目录
wget -c https://archive.apache.org/dist/kafka/2.7.1/kafka_2.12-2.7.1.tgz --no-check-certificate

下载mongo-kafka-connect

# 下载页面选择对应版本,这里以最新版本1.7.0下载
https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
# 选择1.7.0版本,下载all包
https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar

将下载的mongo-kafka-connect-1.7.0-all.jar放到{kafka安装目录}/libs下

MongoDB安装,选择Replication方式

MongoDB的安装请参考MongoDB安装文章。

MongoDB新增shopping库,新增customers collection

新建的customers集合中新增以下测试文档数据

{
  "_id": 1,
  "country": "Mexico",
  "purchases": 2,
  "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
  "_id": 2,
  "country": "Iceland",
  "purchases": 8,
  "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

启动kafka connector集群

kafka connector以distribute模式启动

集群节点(dev2,dev3,dev4)分别新增connector配置

# connect集群所有节点创建connector通用配置文件,这里以mongo-connect-config.properties为例
# 切换到 {kafka安装目录}/config,这里对应目录为
cd /home/kafka/kafka_2.12-2.7.1/config
# 当前目录新建mongo-connect-config.properties,或copy默认配置文件connect-distributed.properties做修改
cat >mongo-connect-config.properties<<eof
#kafka connect依赖的kafka集群的主机和端口配置,多个用,分隔
bootstrap.servers=dev2:9092,dev3:9092,dev4:9092

#相同组的Kafka Connect配置同一id,与consumer group一样概念
group.id=mongo-connect-group

#source connector和sink connector消息key,value序列化方式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#不做json schema格式转换,减少性能开销
key.converter.schemas.enable=false
value.converter.schemas.enable=false

// kafka connect内部需要用到的三个topic
config.storage.topic=mongo-connect-configs
offset.storage.topic=mongo-connect-offsets
status.storage.topic=mongo-connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

#connect对外服务端口暴露,默认8083
#rest.port=8083

# 插件存放目录,默认{kafka安装目录}/libs下
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
eof

集群节点(dev2,dev3,dev4)分别启动connector

cd /home/kafka/kafka_2.12-2.7.1
nohup bin/connect-distributed.sh ./config/mongo-connect-config.properties > ./mongo.log 2>&1 &

注:connect-distributed.sh脚本默认jvm最大堆内存2G,测试环境资源有限时可自行调整KAFKA_HEAP_OPTS值。

提交MongoDB source connector到connectors集群

终端执行以下提交rest api指令

curl 'http://{connector集群任意一台ip}:8083/connectors' -X POST -i -H "Content-Type:application/json" -d   \
    '{ "name":"mongo-source",  
       "config":{"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",  
                "tasks.max":2,  
                "connection.uri":"mongodb://dev2:27017,dev3:27017,dev4:27017/?replicaSet=rs0",  
                "database":"shopping",
                "collection":"customers",
                "copy.existing":true,
                "poll.max.batch.size":100,
                "poll.await.time.ms":5000
                }  
    }' 

上诉接口提交属性说明

属性参考地址:https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/

#MongoDB source connector名称
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector

#执行connector任务数,可指定多个任务。connector框架自动调度并行执行
tasks.max=1

# Connection and source MongoDB数据库配置
connection.uri="mongodb://dev2:27017,dev3:27017,dev4:27017/?replicaSet=rs0"
database=shopping
collection=customers

# 处理MongoDB存在历史数据,默认false
copy.existing=true

topic.prefix=
topic.suffix=
#指定topic映射名。以json key-val 映射。其中key对应MongoDB的"{database}.{collection}"名称
#topic.namespace.map={"myDb.myColl": "topicOne", "myDb": "topicTwo"}

poll.max.batch.size=100
poll.await.time.ms=5000

api提交结果查看

curl 'http://{connector集群任意一台ip}:8083/connectors/mongo-source' -X GET

提交报错时,可通过api查看

curl 'http://{connector集群任意一台ip}:8083/connectors/mongo-source/status' -X GET

api删除connector

curl 'http://{connector集群任意一台ip}:8083/connectors/mongo-source' -X DELETE

connectors api接口列表

#api中name为配置文件中定义的name值,这里为mongo-source

GET /connectors – 返回所有正在运行的connector名。

GET /connectors/{name} – 获取指定connetor的信息。

GET /connectors/{name}/config – 获取指定connector的配置信息。

GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。

GET /connectors/{name}/tasks – 获取指定connector正在运行的task。

GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。

PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理。

PUT /connectors/{name}/resume – 恢复一个被暂停的connector。

POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用

POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。

DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

Tags:

最近发表
标签列表