网站首页 > 教程文章 正文
时光闹钟app开发者,请关注我,后续分享更精彩!
坚持原创,共同进步!
概述
企业系统应用中,有时需要抽取MongoDB中数据到kafka,对数据二次加工再同步到es、hive等其他存储介质中,以便满足不同的业务场景需求。本文将向大家介绍,MongoDB到kafka的数据同步方案。该方案基于MongoDB官方提供的Kafka Source Connector,通过MongoDB的change stream监听机制实现数据的抽取。希望本文对有类似场景需求的朋友,能有所帮助和参考。
前提要求
Kafka Source Connector 使用MongoDB的change stream机制。要求MongoDB和kafka必须满足以下版本要求。
- MongoDB必须为3.6或以后版本
- Kafka Connect版本2.1.0或以后版本,对应Kafka 2.3或以后版本
本文测试版本:MongoDB版本3.6.8,kafka_2.12-2.7.1。
其中:
MongoDB的安装可以参考文章:MongoDB安装
kafka connector框架介绍可以参考文章:kafka connectors框架入门浅析
特性介绍
上图展示了,使用MongoDB kafka connect从一个MongoDB集群(源集群)抽取数据,写入kafka,另一个MongoDB kafka connect消费kafka再写入MongoDB集群(目标集群)的数据流向。
从数据流向可知,MongoDB kafka connect有两种类型:
- MongoDB Kafka source connector
MongoDB作为数据来源,connector从MongoDB提取数据写入kafka。
- MongoDB Kafka sink connector
MongoDB作为数据集目标,connector从kafka消费数据写入MongoDB。
data format数据格式
MongoDB kafka connector 以及他的pipeline支持的数据格式。
官方文档:https://www.mongodb.com/docs/kafka-connector/current/introduction/data-formats/
1.Json
connector中可能存在以下和Json相关的数据格式处理
- Raw JSON:Json对象String格式
- BSON:Json对象的binary serialization系列化编码格式
- JSON Schema:Json的Schema定义格式
2.Avro
Apache Avro 开源的一个序列号和数据传输框架。connector中定义了以下相关格式
- Avro schema:类似json-base定义的Schema格式
- Avro binary encoding:binary serialization编码的json Schema格式
3.Byte Arrays
byte array编码格式
Converters转换器
官方文档:https://www.mongodb.com/docs/kafka-connector/current/introduction/converters/
上图是converter在MongoDB kafka connector中的使用。灰色模块为converter对数据格式的转换处理。Source和Sink Connector中Converter的处理位置有所不同。
注意!!!
如果使用了同一类型的converter,在source和sink connector中必须使用相同类型的converter。
例:source中使用Protobuf写入kafka topic,sink connector必须使用Protobuf converter消费kafka的topic。
其中connector支持converter类型:
- Avro Converter
- Protobuf Converter
- JSON Schema Converter
- JSON Converter
- String Converter
安装
这里只介绍MongoDB Kafka source connector的安装使用,MongoDB Kafka sink connector的安装使用请参考官方文档,本文不做详细介绍。
下载kafka版本
#下载kafka,解压到/home/kafka/kafka_2.12-2.7.1目录
wget -c https://archive.apache.org/dist/kafka/2.7.1/kafka_2.12-2.7.1.tgz --no-check-certificate
下载mongo-kafka-connect
# 下载页面选择对应版本,这里以最新版本1.7.0下载
https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
# 选择1.7.0版本,下载all包
https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar
将下载的mongo-kafka-connect-1.7.0-all.jar放到{kafka安装目录}/libs下
MongoDB安装,选择Replication方式
MongoDB的安装请参考MongoDB安装文章。
MongoDB新增shopping库,新增customers collection
新建的customers集合中新增以下测试文档数据
{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}
启动kafka connector集群
kafka connector以distribute模式启动
集群节点(dev2,dev3,dev4)分别新增connector配置
# connect集群所有节点创建connector通用配置文件,这里以mongo-connect-config.properties为例
# 切换到 {kafka安装目录}/config,这里对应目录为
cd /home/kafka/kafka_2.12-2.7.1/config
# 当前目录新建mongo-connect-config.properties,或copy默认配置文件connect-distributed.properties做修改
cat >mongo-connect-config.properties<<eof
#kafka connect依赖的kafka集群的主机和端口配置,多个用,分隔
bootstrap.servers=dev2:9092,dev3:9092,dev4:9092
#相同组的Kafka Connect配置同一id,与consumer group一样概念
group.id=mongo-connect-group
#source connector和sink connector消息key,value序列化方式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#不做json schema格式转换,减少性能开销
key.converter.schemas.enable=false
value.converter.schemas.enable=false
// kafka connect内部需要用到的三个topic
config.storage.topic=mongo-connect-configs
offset.storage.topic=mongo-connect-offsets
status.storage.topic=mongo-connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
#connect对外服务端口暴露,默认8083
#rest.port=8083
# 插件存放目录,默认{kafka安装目录}/libs下
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
eof
集群节点(dev2,dev3,dev4)分别启动connector
cd /home/kafka/kafka_2.12-2.7.1
nohup bin/connect-distributed.sh ./config/mongo-connect-config.properties > ./mongo.log 2>&1 &
注:connect-distributed.sh脚本默认jvm最大堆内存2G,测试环境资源有限时可自行调整KAFKA_HEAP_OPTS值。
提交MongoDB source connector到connectors集群
终端执行以下提交rest api指令
curl 'http://{connector集群任意一台ip}:8083/connectors' -X POST -i -H "Content-Type:application/json" -d \
'{ "name":"mongo-source",
"config":{"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max":2,
"connection.uri":"mongodb://dev2:27017,dev3:27017,dev4:27017/?replicaSet=rs0",
"database":"shopping",
"collection":"customers",
"copy.existing":true,
"poll.max.batch.size":100,
"poll.await.time.ms":5000
}
}'
上诉接口提交属性说明:
属性参考地址:https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/
#MongoDB source connector名称
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
#执行connector任务数,可指定多个任务。connector框架自动调度并行执行
tasks.max=1
# Connection and source MongoDB数据库配置
connection.uri="mongodb://dev2:27017,dev3:27017,dev4:27017/?replicaSet=rs0"
database=shopping
collection=customers
# 处理MongoDB存在历史数据,默认false
copy.existing=true
topic.prefix=
topic.suffix=
#指定topic映射名。以json key-val 映射。其中key对应MongoDB的"{database}.{collection}"名称
#topic.namespace.map={"myDb.myColl": "topicOne", "myDb": "topicTwo"}
poll.max.batch.size=100
poll.await.time.ms=5000
api提交结果查看
curl 'http://{connector集群任意一台ip}:8083/connectors/mongo-source' -X GET
提交报错时,可通过api查看
curl 'http://{connector集群任意一台ip}:8083/connectors/mongo-source/status' -X GET
api删除connector
curl 'http://{connector集群任意一台ip}:8083/connectors/mongo-source' -X DELETE
connectors api接口列表
#api中name为配置文件中定义的name值,这里为mongo-source
GET /connectors – 返回所有正在运行的connector名。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
猜你喜欢
- 2024-12-27 2021年,作为一名合格的Java后端开发程序员,必须掌握哪些框架?
- 2024-12-27 关系数据库和文档型数据库有什么区别?
- 2024-12-27 「文档数据库之争」MongoDB和CouchDB的比较
- 2024-12-27 mongodb的优缺点及应用场景 mongodb有哪些应用领域
- 2024-12-27 libbson:C语言中的BSON数据处理利器
- 2024-12-27 BSON使用和快速入门 bsn怎么样
- 最近发表
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- md5 sha1 (32)
- mybatis plus page (35)
- semaphore 使用详解 (32)
- update from 语句 (32)
- vue @scroll (38)
- 堆栈区别 (33)
- 在线子域名爆破 (32)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)