Linux: Canal 数据库日志订阅
- TAGS: Linux
参考:
- 官方文档: https://github.com/alibaba/canal/wiki/AdminGuide
- https://www.cnblogs.com/throwable/p/12483983.html
mysql 要求
canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row.
[mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
检查
mysql> show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+
canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限
CREATE USER canal IDENTIFIED BY 'Canal!123'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
检查权限
show grants for 'canal'
部署
- canal-admin 相当于管理角色,统一canal.properties/instance.properties配置
- canal-server(自动加入)
- 加载canal.properties就绪服务
- 加载instance.properties配置,定义从哪个mysql订单日志,并推送到哪里如kafka,rocketmq等
canal-admin
需要一个canal-admin管里的库
CREATE USER canal IDENTIFIED BY 'Canal!123'; GRANT ALL ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
部署文件
[root@proxy canal-admin]# cat deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: labels: app: canal-admin name: canal-admin namespace: dev1 spec: replicas: 1 selector: matchLabels: app: canal-admin template: metadata: labels: app: canal-admin spec: containers: - env: - name: JAVA_OPTS value: >- -Dspring.datasource.address=canal-manager-mysql.com -Dspring.datasource.database=canal_manager -Dspring.datasource.username=canal -Dspring.datasource.password=Canal!123 -Dcanal.adminPasswd=ZaIjVvX5q8F7LyKu #ui登录密码 image: 'canal/canal-admin:v1.1.7' imagePullPolicy: IfNotPresent livenessProbe: failureThreshold: 3 initialDelaySeconds: 15 periodSeconds: 10 successThreshold: 1 tcpSocket: port: 8089 timeoutSeconds: 1 name: canal-admin resources: requests: cpu: 250m memory: 512Mi startupProbe: failureThreshold: 3 initialDelaySeconds: 15 periodSeconds: 10 successThreshold: 1 tcpSocket: port: 8089 timeoutSeconds: 1 restartPolicy: Always
[root@proxy canal-admin]# cat service.yaml
apiVersion: v1
kind: Service
metadata:
name: canal-admin
namespace: dev1
spec:
ports:
- port: 8089
name: http
protocol: TCP
targetPort: 8089
selector:
app: canal-admin
type: ClusterIP
[root@proxy dev1]# cat ingress/ingress-cms-dev.yaml apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: ingress-cms-dev namespace: dev1 spec: ingressClassName: nginx-dev rules: - host: canal-admin.xxx.com http: paths: - backend: service: name: canal-admin port: number: 8089 path: / pathType: ImplementationSpecific
部署zookeeper
用于管理canale-server集群。如创建集群名 cluster-kafka,指定zookeeper地址
添加canal.properties配置
点击集群操作,修改主配置。可载入模板再其上修改。
如
################################################# ######### common argument ############# ################################################# # tcp bind ip #canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = FB85C3E706468CAEA8E37E43BA5AC422991C78E7 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = #canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = kafka # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = false canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = Canal!123 # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 ################################################# ######### destinations ############# ################################################# canal.destinations = # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = manager canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ Properties ############# ################################################## # aliyun ak/sk , support rds/mq #canal.aliyun.accessKey = #canal.aliyun.secretKey = #canal.aliyun.uid= canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = b-2.logtest.2g3di3.c7.kafka.us-west-2.amazonaws.com:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 kafka.kerberos.enable = false kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf # sasl demo # kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\"; # kafka.sasl.mechanism = SCRAM-SHA-512 # kafka.security.protocol = SASL_PLAINTEXT
canal.admin.passwd = FB85C3E706468CAEA8E37E43BA5AC422991C78E7
参考:https://github.com/alibaba/canal/wiki/Canal-Admin-ServerGuide
select password('ZaIjVvX5q8F7LyKu') # 如果遇到mysql8.0,可以使用select upper(sha1(unhex(sha1('ZaIjVvX5q8F7LyKu'))))
空间指针报错时,禁用了
canal.instance.tsdb.enable = false
canal-server
[root@proxy canal-server]# cat configmap.yaml apiVersion: v1 data: start_canal_server.sh: >- cat > /home/admin/canal-server/conf/canal.properties <<- EOF # register ip # ${HOSTNAME} 为podname,canal-server-discovery-svc-stable为svc名称 # StatefulSet类型pod名称是固定的,k8s集群内pod域名规则为pod_name.svc_name.namespace.svc.cluster.local canal.register.ip = ${POD_IP} # canal admin config canal.admin.manager = canal-admin:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = FB85C3E706468CAEA8E37E43BA5AC422991C78E7 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = cluster-kafka EOF bash /home/admin/app.sh kind: ConfigMap metadata: name: canal-config namespace: dev1
[root@proxy canal-server]# cat deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: canal-server namespace: dev1 labels: app: canal-server version: v1 spec: selector: matchLabels: app: canal-server version: v1 template: metadata: labels: app: canal-server version: v1 spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - canal-server namespaces: - dev1 topologyKey: kubernetes.io/hostname preferredDuringSchedulingIgnoredDuringExecution: - podAffinityTerm: labelSelector: matchLabels: app: canal-admin namespaces: - dev1 topologyKey: kubernetes.io/hostname weight: 10 imagePullSecrets: - name: aliyun-singapore-repo containers: - name: canal-server image: canal/canal-server:v1.1.7 imagePullPolicy: Always command: ["/bin/sh","-c","/start_canal_server.sh"] #command: ['cat'] #tty: true ports: - containerPort: 11111 name: http - containerPort: 11112 name: metrics env: - name: POD_IP valueFrom: fieldRef: fieldPath: status.podIP livenessProbe: failureThreshold: 8 tcpSocket: port: 11110 initialDelaySeconds: 30 periodSeconds: 30 successThreshold: 1 timeoutSeconds: 3 # livenessProbe: # httpGet: # path: /health # port: 11112 # scheme: HTTP # initialDelaySeconds: 60 # periodSeconds: 5 # failureThreshold: 3 # successThreshold: 1 # timeoutSeconds: 5 # readinessProbe: # httpGet: # path: /health # port: 11112 # scheme: HTTP # initialDelaySeconds: 60 # periodSeconds: 5 # failureThreshold: 3 # successThreshold: 1 # timeoutSeconds: 5 resources: limits: cpu: "1" memory: 2Gi requests: cpu: 250m memory: 512Mi volumeMounts: - mountPath: /etc/localtime name: time - mountPath: /start_canal_server.sh name: canal-config subPath: start_canal_server.sh - name: logs mountPath: /home/admin/canal-server/logs volumes: - name: time hostPath: path: /etc/localtime - configMap: defaultMode: 511 name: canal-config name: canal-config - emptyDir: {} name: logs
instance.properties配置
admin 管理界面添加Instance管理
################################################# ## mysql serverId , v1.0.26+ will autoGen #canal.instance.mysql.slaveId=1037 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=test.cp0.rds.amazonaws.com:3306 #canal.instance.master.journal.name= #canal.instance.master.position= #canal.instance.master.timestamp= #canal.instance.master.gtid= # rds oss binlog #canal.instance.rds.accesskey= #canal.instance.rds.secretkey= #canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=false # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=Canal!123 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== #canal.instance.defaultDatabaseName = wealth_management # table regex #canal.instance.filter.regex=test\\..* canal.instance.filter.regex=wealth_management\\.pay_order_request_extend,wealth_management\\.pay_order canal.instance.filter.black.regex= # mq config canal.mq.topic=canal-db-wealth_management canal.mq.partition=0
配置工作做好之后,可以启动Canal服务:
# 查看服务日志 tail -100f /data/canal/logs/canal/canal
在test数据库创建一个订单表,并且执行几个简单的DML
use `test`; CREATE TABLE `order` ( id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键', order_id VARCHAR(64) NOT NULL COMMENT '订单ID', amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', UNIQUE uniq_order_id (`order_id`) ) COMMENT '订单表'; INSERT INTO `order`(order_id, amount) VALUES ('10086', 999); UPDATE `order` SET amount = 10087 WHERE order_id = '10086'; DELETE FROM `order` WHERE order_id = '10086';
这个时候,可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}
- kafka的json结构,各字段含义,如sqlType https://github.com/alibaba/canal/issues/3331