Change Data Capture (CDC) is a technique that allows applications to monitor and respond to changes in a database in real-time. In the context of MySQL, CDC can be used to capture changes to the database in the form of binlog events and react to them as they occur.
Go is a popular programming language that is well-suited for building real-time applications. In this post, we'll explore how to use Go and the github.com/go-mysql-org/go-mysql/canal
library to implement CDC for a MySQL database.
Prerequisites
Before we can get started, we'll need to have a MySQL database set up with binary logging enabled. To do it as ease to set up as possible we can use the below docker-compose.
version: '3.8'
services:
mysql_test:
platform: linux/amd64
image: mysql:5.7
container_name: 'mysql_test'
restart: always
environment:
MYSQL_DATABASE: 'db'
MYSQL_ROOT_PASSWORD: 'password'
MYSQL_ROOT: 'root'
ports:
- '3306:3306'
expose:
- '3306'
command: [ "mysqld", "--log-bin=mysql-bin", "--server-id=1", "--binlog-format=ROW",
"--binlog-row-image=FULL", "--expire-logs-days=10", "--max_allowed_packet=32505856","--gtid-mode=ON","--enforce-gtid-consistency=ON","--sql_mode=ALLOW_INVALID_DATES" ]
Setting Up the Canal Instance
To get started with CDC, we'll need to set up a Canal instance to connect to our MySQL database and capture binlog events. Here's an example of how we can do that:
package main
import (
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"log"
)
func main() {
// Define default dump configuration
dumpCfg := canal.DumpConfig{
ExecutionPath: "",
}
// Set up canal to connect to MySQL database
cfg := canal.NewDefaultConfig()
cfg.Addr = "localhost:3306"
cfg.User = "root"
cfg.Password = "password"
cfg.ServerID = 101
cfg.Flavor = "mysql"
cfg.Dump = dumpCfg
cfg.IncludeTableRegex = []string{"db*"}
// Create an instance of the eventHandler struct
eventHandler := &eventHandler{}
// Create a new Canal instance with the specified configuration and event handler
can, err := canal.NewCanal(cfg)
if err != nil {
log.Fatalf("Failed to create Canal: %v", err)
}
// Register the event handler with the Canal instance
can.SetEventHandler(eventHandler)
// Start canal and subscribe to all binlog events
err = can.Run()
if err != nil {
log.Fatalf("Failed to start Canal: %v", err)
}
// Wait for SIGINT or SIGTERM signals to stop the program
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
// Stop canal and exit program
can.Close()
log.Println("Canal stopped")
}
// Define a custom event handler to process binlog events
type eventHandler struct{}
func (h *eventHandler) String() string {
//TODO implement me
panic("implement me")
}
func (h *eventHandler) OnRotate(header *replication.EventHeader, r *replication.RotateEvent) error {
// Do nothing
return nil
}
func (h *eventHandler) OnTableChanged(header *replication.EventHeader, schema string, table string) error {
// Do nothing
return nil
}
func (h *eventHandler) OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
// Print the DDL statement to the console
log.Printf("DDL statement: %v", string(queryEvent.Query))
return nil
}
func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
// Print the row event to the console
log.Printf("Row event: %v", e)
return nil
}
func (h *eventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) error {
// Do nothing
return nil
}
func (h *eventHandler) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error {
// Do nothing
return nil
}
func (h *eventHandler) OnXID(*replication.EventHeader, mysql.Position) error {
// Do nothing
return nil
}
func (h *eventHandler) OnUnmarshal(data []byte) (interface{}, error) {
// Do nothing
return nil, nil
}
func (h *eventHandler) OnRawEvent(event *replication.BinlogEvent) error {
// Do nothing
return nil
}
Step by step explanation
The first part of the code defines the default dump configuration and sets up the Canal configuration with the necessary parameters to connect to the MySQL database:
dumpCfg := canal.DumpConfig{
ExecutionPath: "",
}
cfg := canal.NewDefaultConfig()
cfg.Addr = "localhost:3306"
cfg.User = "root"
cfg.Password = "password"
cfg.ServerID = 101
cfg.Flavor = "mysql"
cfg.Dump = dumpCfg
cfg.IncludeTableRegex = []string{"db*"}
The canal.NewDefaultConfig()
function returns a new canal.Config
instance with default values for most of the configuration parameters. The cfg.Addr
, cfg.User
, and cfg.Password
parameters specify the host, username, and password for the MySQL database. The cfg.ServerID
parameter specifies the server ID for this Canal instance, which must be unique among all connected Canal instances. The cfg.Flavor
parameter specifies the flavor of MySQL being used (e.g., mysql
, mariadb
, etc.), and cfg.IncludeTableRegex
specifies the regular expression for tables that should be included in the binlog events.
Next, the code defines a custom eventHandler
struct with methods to handle various binlog events, including OnRotate
, OnTableChanged
, OnDDL
, OnRow
, OnGTID
, OnPosSynced
, OnXID
, OnUnmarshal
, and OnRawEvent
. The OnDDL
and OnRow
methods log the corresponding events to the console.
Then, the code creates an instance of the eventHandler
struct and registers it with the Canal instance using the can.SetEventHandler
method:
eventHandler := &eventHandler{}
can, err := canal.NewCanal(cfg)
if err != nil {
log.Fatalf("Failed to create Canal: %v", err)
}
can.SetEventHandler(eventHandler)
The canal.NewCanal(cfg)
function creates a new canal.Canal
instance with the specified configuration. The can.SetEventHandler(eventHandler)
method registers the eventHandler
instance to receive and process binlog events.
Finally, the code starts the Canal instance using can.Run
()
and waits for SIGINT or SIGTERM signals to stop the program. When a DDL
or OnRow
event is received, the corresponding method in the eventHandler
struct logs the event to the console.
err = can.Run()
if err != nil {
log.Fatalf("Failed to start Canal: %v", err)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
can.Close()
log.Println("Canal stopped")
In summary, this code sets up a Canal instance to connect to a MySQL database, registers a custom event handler to process binlog events, and logs DDL
and OnRow
events to the console. This code can be extended to perform various operations on the MySQL database in response to specific binlog events.