CDC replication from mysql using go (golang)

CDC replication from mysql using go (golang)

Change Data Capture (CDC) is a technique that allows applications to monitor and respond to changes in a database in real-time. In the context of MySQL, CDC can be used to capture changes to the database in the form of binlog events and react to them as they occur.

Go is a popular programming language that is well-suited for building real-time applications. In this post, we'll explore how to use Go and the github.com/go-mysql-org/go-mysql/canal library to implement CDC for a MySQL database.

Prerequisites

Before we can get started, we'll need to have a MySQL database set up with binary logging enabled. To do it as ease to set up as possible we can use the below docker-compose.

version: '3.8'

services:
  mysql_test:
    platform: linux/amd64
    image: mysql:5.7
    container_name: 'mysql_test'
    restart: always
    environment:
      MYSQL_DATABASE: 'db'
      MYSQL_ROOT_PASSWORD: 'password'
      MYSQL_ROOT: 'root'
    ports:
      - '3306:3306'
    expose:
      - '3306'
    command: [ "mysqld", "--log-bin=mysql-bin", "--server-id=1", "--binlog-format=ROW",
               "--binlog-row-image=FULL", "--expire-logs-days=10", "--max_allowed_packet=32505856","--gtid-mode=ON","--enforce-gtid-consistency=ON","--sql_mode=ALLOW_INVALID_DATES" ]

Setting Up the Canal Instance

To get started with CDC, we'll need to set up a Canal instance to connect to our MySQL database and capture binlog events. Here's an example of how we can do that:

package main

import (
    "github.com/go-mysql-org/go-mysql/canal"
    "github.com/go-mysql-org/go-mysql/mysql"
    "github.com/go-mysql-org/go-mysql/replication"
    "log"
)

func main() {
    // Define default dump configuration
    dumpCfg := canal.DumpConfig{
        ExecutionPath: "",
    }

    // Set up canal to connect to MySQL database
    cfg := canal.NewDefaultConfig()
    cfg.Addr = "localhost:3306"
    cfg.User = "root"
    cfg.Password = "password"
    cfg.ServerID = 101
    cfg.Flavor = "mysql"
    cfg.Dump = dumpCfg
    cfg.IncludeTableRegex = []string{"db*"}

    // Create an instance of the eventHandler struct
    eventHandler := &eventHandler{}

    // Create a new Canal instance with the specified configuration and event handler
    can, err := canal.NewCanal(cfg)
    if err != nil {
        log.Fatalf("Failed to create Canal: %v", err)
    }

    // Register the event handler with the Canal instance
    can.SetEventHandler(eventHandler)

    // Start canal and subscribe to all binlog events
    err = can.Run()
    if err != nil {
        log.Fatalf("Failed to start Canal: %v", err)
    }

    // Wait for SIGINT or SIGTERM signals to stop the program
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    <-sigs

    // Stop canal and exit program
    can.Close()
    log.Println("Canal stopped")
}

// Define a custom event handler to process binlog events
type eventHandler struct{}

func (h *eventHandler) String() string {
    //TODO implement me
    panic("implement me")
}

func (h *eventHandler) OnRotate(header *replication.EventHeader, r *replication.RotateEvent) error {
    // Do nothing
    return nil
}

func (h *eventHandler) OnTableChanged(header *replication.EventHeader, schema string, table string) error {
    // Do nothing
    return nil
}

func (h *eventHandler) OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
    // Print the DDL statement to the console
    log.Printf("DDL statement: %v", string(queryEvent.Query))

    return nil
}

func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
    // Print the row event to the console
    log.Printf("Row event: %v", e)

    return nil
}

func (h *eventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) error {
    // Do nothing
    return nil
}

func (h *eventHandler) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error {
    // Do nothing
    return nil
}

func (h *eventHandler) OnXID(*replication.EventHeader, mysql.Position) error {
    // Do nothing
    return nil
}

func (h *eventHandler) OnUnmarshal(data []byte) (interface{}, error) {
    // Do nothing
    return nil, nil
}

func (h *eventHandler) OnRawEvent(event *replication.BinlogEvent) error {
    // Do nothing
    return nil
}

Step by step explanation

The first part of the code defines the default dump configuration and sets up the Canal configuration with the necessary parameters to connect to the MySQL database:

dumpCfg := canal.DumpConfig{
    ExecutionPath: "",
}

cfg := canal.NewDefaultConfig()
cfg.Addr = "localhost:3306"
cfg.User = "root"
cfg.Password = "password"
cfg.ServerID = 101
cfg.Flavor = "mysql"
cfg.Dump = dumpCfg
cfg.IncludeTableRegex = []string{"db*"}

The canal.NewDefaultConfig() function returns a new canal.Config instance with default values for most of the configuration parameters. The cfg.Addr, cfg.User, and cfg.Password parameters specify the host, username, and password for the MySQL database. The cfg.ServerID parameter specifies the server ID for this Canal instance, which must be unique among all connected Canal instances. The cfg.Flavor parameter specifies the flavor of MySQL being used (e.g., mysql, mariadb, etc.), and cfg.IncludeTableRegex specifies the regular expression for tables that should be included in the binlog events.

Next, the code defines a custom eventHandler struct with methods to handle various binlog events, including OnRotate, OnTableChanged, OnDDL, OnRow, OnGTID, OnPosSynced, OnXID, OnUnmarshal, and OnRawEvent. The OnDDL and OnRow methods log the corresponding events to the console.

Then, the code creates an instance of the eventHandler struct and registers it with the Canal instance using the can.SetEventHandler method:

eventHandler := &eventHandler{}

can, err := canal.NewCanal(cfg)
if err != nil {
    log.Fatalf("Failed to create Canal: %v", err)
}

can.SetEventHandler(eventHandler)

The canal.NewCanal(cfg) function creates a new canal.Canal instance with the specified configuration. The can.SetEventHandler(eventHandler) method registers the eventHandler instance to receive and process binlog events.

Finally, the code starts the Canal instance using can.Run() and waits for SIGINT or SIGTERM signals to stop the program. When a DDL or OnRow event is received, the corresponding method in the eventHandler struct logs the event to the console.

err = can.Run()
if err != nil {
    log.Fatalf("Failed to start Canal: %v", err)
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs

can.Close()
log.Println("Canal stopped")

In summary, this code sets up a Canal instance to connect to a MySQL database, registers a custom event handler to process binlog events, and logs DDL and OnRow events to the console. This code can be extended to perform various operations on the MySQL database in response to specific binlog events.