Linux: Canal 数据库日志订阅
- TAGS: Middleware
参考:
- 官方文档: https://github.com/alibaba/canal/wiki/AdminGuide
- https://www.cnblogs.com/throwable/p/12483983.html
mysql 要求
canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row.
[mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
检查
mysql> show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+
canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限
CREATE USER canal IDENTIFIED BY 'Canal!123'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
检查权限
show grants for 'canal'
部署
- canal-admin 相当于管理角色,统一canal.properties/instance.properties配置
- canal-server(自动加入)
- 加载canal.properties就绪服务
- 加载instance.properties配置,定义从哪个mysql订单日志,并推送到哪里如kafka,rocketmq等
canal-admin
需要一个canal-admin管里的库
CREATE USER canal IDENTIFIED BY 'Canal!123'; GRANT ALL ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
部署文件
[root@proxy canal-admin]# cat deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: canal-admin
name: canal-admin
namespace: dev1
spec:
replicas: 1
selector:
matchLabels:
app: canal-admin
template:
metadata:
labels:
app: canal-admin
spec:
containers:
- env:
- name: JAVA_OPTS
value: >-
-Dspring.datasource.address=canal-manager-mysql.com
-Dspring.datasource.database=canal_manager
-Dspring.datasource.username=canal
-Dspring.datasource.password=Canal!123
-Dcanal.adminPasswd=ZaIjVvX5q8F7LyKu #ui登录密码
image: 'canal/canal-admin:v1.1.7'
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 3
initialDelaySeconds: 15
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: 8089
timeoutSeconds: 1
name: canal-admin
resources:
requests:
cpu: 250m
memory: 512Mi
startupProbe:
failureThreshold: 3
initialDelaySeconds: 15
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: 8089
timeoutSeconds: 1
restartPolicy: Always
[root@proxy canal-admin]# cat service.yaml
apiVersion: v1
kind: Service
metadata:
name: canal-admin
namespace: dev1
spec:
ports:
- port: 8089
name: http
protocol: TCP
targetPort: 8089
selector:
app: canal-admin
type: ClusterIP
[root@proxy dev1]# cat ingress/ingress-cms-dev.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-cms-dev
namespace: dev1
spec:
ingressClassName: nginx-dev
rules:
- host: canal-admin.xxx.com
http:
paths:
- backend:
service:
name: canal-admin
port:
number: 8089
path: /
pathType: ImplementationSpecific
部署zookeeper
用于管理canale-server集群。如创建集群名 cluster-kafka,指定zookeeper地址
添加canal.properties配置
点击集群操作,修改主配置。可载入模板再其上修改。
如
#################################################
######### common argument #############
#################################################
# tcp bind ip
#canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = FB85C3E706468CAEA8E37E43BA5AC422991C78E7
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
#canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = Canal!123
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
#canal.aliyun.accessKey =
#canal.aliyun.secretKey =
#canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = b-2.logtest.2g3di3.c7.kafka.us-west-2.amazonaws.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf
# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT
canal.admin.passwd = FB85C3E706468CAEA8E37E43BA5AC422991C78E7
参考:https://github.com/alibaba/canal/wiki/Canal-Admin-ServerGuide
select password('ZaIjVvX5q8F7LyKu') # 如果遇到mysql8.0,可以使用select upper(sha1(unhex(sha1('ZaIjVvX5q8F7LyKu'))))
空间指针报错时,禁用了
canal.instance.tsdb.enable = false
canal-server
[root@proxy canal-server]# cat configmap.yaml
apiVersion: v1
data:
start_canal_server.sh: >-
cat > /home/admin/canal-server/conf/canal.properties <<- EOF
# register ip
# ${HOSTNAME} 为podname,canal-server-discovery-svc-stable为svc名称
# StatefulSet类型pod名称是固定的,k8s集群内pod域名规则为pod_name.svc_name.namespace.svc.cluster.local
canal.register.ip = ${POD_IP}
# canal admin config
canal.admin.manager = canal-admin:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = FB85C3E706468CAEA8E37E43BA5AC422991C78E7
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = cluster-kafka
EOF
bash /home/admin/app.sh
kind: ConfigMap
metadata:
name: canal-config
namespace: dev1
[root@proxy canal-server]# cat deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: canal-server
namespace: dev1
labels:
app: canal-server
version: v1
spec:
selector:
matchLabels:
app: canal-server
version: v1
template:
metadata:
labels:
app: canal-server
version: v1
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- canal-server
namespaces:
- dev1
topologyKey: kubernetes.io/hostname
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
app: canal-admin
namespaces:
- dev1
topologyKey: kubernetes.io/hostname
weight: 10
imagePullSecrets:
- name: aliyun-singapore-repo
containers:
- name: canal-server
image: canal/canal-server:v1.1.7
imagePullPolicy: Always
command: ["/bin/sh","-c","/start_canal_server.sh"]
#command: ['cat']
#tty: true
ports:
- containerPort: 11111
name: http
- containerPort: 11112
name: metrics
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
livenessProbe:
failureThreshold: 8
tcpSocket:
port: 11110
initialDelaySeconds: 30
periodSeconds: 30
successThreshold: 1
timeoutSeconds: 3
# livenessProbe:
# httpGet:
# path: /health
# port: 11112
# scheme: HTTP
# initialDelaySeconds: 60
# periodSeconds: 5
# failureThreshold: 3
# successThreshold: 1
# timeoutSeconds: 5
# readinessProbe:
# httpGet:
# path: /health
# port: 11112
# scheme: HTTP
# initialDelaySeconds: 60
# periodSeconds: 5
# failureThreshold: 3
# successThreshold: 1
# timeoutSeconds: 5
resources:
limits:
cpu: "1"
memory: 2Gi
requests:
cpu: 250m
memory: 512Mi
volumeMounts:
- mountPath: /etc/localtime
name: time
- mountPath: /start_canal_server.sh
name: canal-config
subPath: start_canal_server.sh
- name: logs
mountPath: /home/admin/canal-server/logs
volumes:
- name: time
hostPath:
path: /etc/localtime
- configMap:
defaultMode: 511
name: canal-config
name: canal-config
- emptyDir: {}
name: logs
instance.properties配置
admin 管理界面添加Instance管理
################################################# ## mysql serverId , v1.0.26+ will autoGen #canal.instance.mysql.slaveId=1037 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=test.cp0.rds.amazonaws.com:3306 #canal.instance.master.journal.name= #canal.instance.master.position= #canal.instance.master.timestamp= #canal.instance.master.gtid= # rds oss binlog #canal.instance.rds.accesskey= #canal.instance.rds.secretkey= #canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=false # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=Canal!123 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== #canal.instance.defaultDatabaseName = wealth_management # table regex #canal.instance.filter.regex=test\\..* canal.instance.filter.regex=wealth_management\\.pay_order_request_extend,wealth_management\\.pay_order canal.instance.filter.black.regex= # mq config canal.mq.topic=canal-db-wealth_management canal.mq.partition=0
配置工作做好之后,可以启动Canal服务:
# 查看服务日志 tail -100f /data/canal/logs/canal/canal
在test数据库创建一个订单表,并且执行几个简单的DML
use `test`; CREATE TABLE `order` ( id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键', order_id VARCHAR(64) NOT NULL COMMENT '订单ID', amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', UNIQUE uniq_order_id (`order_id`) ) COMMENT '订单表'; INSERT INTO `order`(order_id, amount) VALUES ('10086', 999); UPDATE `order` SET amount = 10087 WHERE order_id = '10086'; DELETE FROM `order` WHERE order_id = '10086';
这个时候,可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}
- kafka的json结构,各字段含义,如sqlType https://github.com/alibaba/canal/issues/3331