CDC replication from postgresql using go (golang)

CDC replication from postgresql using go (golang)

Change Data Capture intro

Change Data Capture (CDC) is a technique used to track the changes made to data in a database, allowing you to keep track of the evolution of the data. In PostgreSQL, CDC is achieved using the logical replication feature, which enables the selective replication of changes made to specific tables or columns.

Golang is a programming language that has gained popularity in recent years due to its speed and simplicity. It's also well-suited for working with databases, as it has built-in support for SQL databases and a number of powerful libraries for working with them.

Using Golang and CDC in PostgreSQL can be a powerful combination, as it allows you to easily capture and process changes made to your database in real-time. Here are the basic steps you need to follow to get started with Golang and CDC in PostgreSQL:

  1. First, you'll need to enable logical replication on your PostgreSQL database. This can be done by modifying your PostgreSQL configuration file to include the appropriate settings. Once you've done this, you'll need to create a publication that specifies which tables or columns you want to replicate.

  2. Next, you'll need to write a Golang program that connects to your database and subscribes to the logical replication stream. This can be done using the built-in database/sql package in Golang, along with a third-party library such as pq. Once you've subscribed to the stream, you'll start receiving messages containing the changes made to your database.

  3. Finally, you'll need to process these messages and do something with them. This could involve updating a cache or search index, sending a notification to another system, or simply logging the changes for future analysis. Golang provides a number of powerful tools for working with data, including the standard library's encoding/json package for working with JSON data, and third-party libraries like gjson for parsing JSON.

Overall, using Golang and CDC in PostgreSQL can be a powerful combination that allows you to capture and process changes to your database in real-time. With the right tools and techniques, you can use this combination to build powerful real-time data processing systems that can help you stay on top of your data and make more informed decisions.

PostgreSQL docker compose

Below the example of docker compose file to launch locally postgresql configured for listening on changes in database.

version: '3.7'
services:
  postgres:
    image: postgres:13-alpine
    restart: always
    environment:
      POSTGRES_DB: mydatabase
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_INITDB_ARGS: '--data-checksums'
      PGDATA: /var/lib/postgresql/data/pgdata
    ports:
      - "5434:5432"
    volumes:
      - ./pgdata:/var/lib/postgresql/data/pgdata
    command: postgres -c max_wal_senders=2 -c wal_level=logical -c max_replication_slots=2

Sample go application to listen for changes

This Go script is an example of using the pglogrepl and pgx packages to perform logical replication in PostgreSQL.

The script first defines a PostgreSQL connection string and then creates a connection to the database using the pgconn.Connect function. The script then drops and recreates a publication named pglogrepl_demo for all tables in the database.

Next, the script creates a new replication slot named pglogrepl_demo using the pglogrepl.CreateReplicationSlot function, and starts replication on this slot using the pglogrepl.StartReplication function.

The script then enters a loop that listens for incoming replication messages using the conn.ReceiveMessage function. It checks the type of each message and takes action accordingly.

For example, when it receives a PrimaryKeepaliveMessage, the script logs the message's details and updates the next standby message deadline if necessary.

When it receives an XLogData message, the script parses the message using pglogrepl.ParseXLogData and then logs the parsed data. It then attempts to parse the logical replication message using pglogrepl.Parse and switches on the message type to take the appropriate action.

In this example, the script only handles RelationMessage and InsertMessage types. When it receives an InsertMessage, it logs the values of the inserted row.

package main

import (
    "context"
    "encoding/hex"
    "log"
    "time"

    "github.com/jackc/pglogrepl"
    "github.com/jackc/pgx/v5/pgconn"
    "github.com/jackc/pgx/v5/pgproto3"
    "github.com/jackc/pgx/v5/pgtype"
)

const (
    PGLOGREPL_DEMO_CONN_STRING = "postgres://user:password@127.0.0.1:5434/mydatabase?replication=database"
)

func main() {
    //    const outputPlugin = "test_decoding"
    const outputPlugin = "pgoutput"
    conn, err := pgconn.Connect(context.Background(), PGLOGREPL_DEMO_CONN_STRING)
    if err != nil {
        log.Fatalln("failed to connect to PostgreSQL server:", err)
    }
    defer conn.Close(context.Background())

    result := conn.Exec(context.Background(), "DROP PUBLICATION IF EXISTS pglogrepl_demo;")
    _, err = result.ReadAll()
    if err != nil {
        log.Fatalln("drop publication if exists error", err)
    }

    result = conn.Exec(context.Background(), "CREATE PUBLICATION pglogrepl_demo FOR ALL TABLES;")
    _, err = result.ReadAll()
    if err != nil {
        log.Fatalln("create publication error", err)
    }
    log.Println("create publication pglogrepl_demo")

    var pluginArguments []string
    if outputPlugin == "pgoutput" {
        pluginArguments = []string{"proto_version '1'", "publication_names 'pglogrepl_demo'"}
    } else if outputPlugin == "wal2json" {
        pluginArguments = []string{"\"pretty-print\" 'true'"}
    }

    sysident, err := pglogrepl.IdentifySystem(context.Background(), conn)
    if err != nil {
        log.Fatalln("IdentifySystem failed:", err)
    }
    log.Println("SystemID:", sysident.SystemID, "Timeline:", sysident.Timeline, "XLogPos:", sysident.XLogPos, "DBName:", sysident.DBName)

    slotName := "pglogrepl_demo"

    _, err = pglogrepl.CreateReplicationSlot(context.Background(), conn, slotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{Temporary: true})
    if err != nil {
        log.Fatalln("CreateReplicationSlot failed:", err)
    }
    log.Println("Created temporary replication slot:", slotName)
    err = pglogrepl.StartReplication(context.Background(), conn, slotName, sysident.XLogPos, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
    if err != nil {
        log.Fatalln("StartReplication failed:", err)
    }
    log.Println("Logical replication started on slot", slotName)

    clientXLogPos := sysident.XLogPos
    standbyMessageTimeout := time.Second * 10
    nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
    relations := map[uint32]*pglogrepl.RelationMessage{}
    typeMap := pgtype.NewMap()

    for {
        if time.Now().After(nextStandbyMessageDeadline) {
            err = pglogrepl.SendStandbyStatusUpdate(context.Background(), conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: clientXLogPos})
            if err != nil {
                log.Fatalln("SendStandbyStatusUpdate failed:", err)
            }
            log.Println("Sent Standby status message")
            nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
        }

        ctx, cancel := context.WithDeadline(context.Background(), nextStandbyMessageDeadline)
        rawMsg, err := conn.ReceiveMessage(ctx)
        cancel()
        if err != nil {
            if pgconn.Timeout(err) {
                continue
            }
            log.Fatalln("ReceiveMessage failed:", err)
        }

        if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
            log.Fatalf("received Postgres WAL error: %+v", errMsg)
        }

        msg, ok := rawMsg.(*pgproto3.CopyData)
        if !ok {
            log.Printf("Received unexpected message: %T\n", rawMsg)
            continue
        }

        switch msg.Data[0] {
        case pglogrepl.PrimaryKeepaliveMessageByteID:
            pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])
            if err != nil {
                log.Fatalln("ParsePrimaryKeepaliveMessage failed:", err)
            }
            log.Println("Primary Keepalive Message =>", "ServerWALEnd:", pkm.ServerWALEnd, "ServerTime:", pkm.ServerTime, "ReplyRequested:", pkm.ReplyRequested)

            if pkm.ReplyRequested {
                nextStandbyMessageDeadline = time.Time{}
            }

        case pglogrepl.XLogDataByteID:
            xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
            if err != nil {
                log.Fatalln("ParseXLogData failed:", err)
            }
            log.Printf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s WALData:\n%s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime, hex.Dump(xld.WALData))
            logicalMsg, err := pglogrepl.Parse(xld.WALData)
            if err != nil {
                log.Fatalf("Parse logical replication message: %s", err)
            }
            log.Printf("Receive a logical replication message: %s", logicalMsg.Type())
            switch logicalMsg := logicalMsg.(type) {
            case *pglogrepl.RelationMessage:
                relations[logicalMsg.RelationID] = logicalMsg

            case *pglogrepl.BeginMessage:
                // Indicates the beginning of a group of changes in a transaction. This is only sent for committed transactions. You won't get any events from rolled back transactions.

            case *pglogrepl.CommitMessage:

            case *pglogrepl.InsertMessage:
                rel, ok := relations[logicalMsg.RelationID]
                if !ok {
                    log.Fatalf("unknown relation ID %d", logicalMsg.RelationID)
                }
                values := map[string]interface{}{}
                for idx, col := range logicalMsg.Tuple.Columns {
                    colName := rel.Columns[idx].Name
                    switch col.DataType {
                    case 'n': // null
                        values[colName] = nil
                    case 'u': // unchanged toast
                        // This TOAST value was not changed. TOAST values are not stored in the tuple, and logical replication doesn't want to spend a disk read to fetch its value for you.
                    case 't': //text
                        val, err := decodeTextColumnData(typeMap, col.Data, rel.Columns[idx].DataType)
                        if err != nil {
                            log.Fatalln("error decoding column data:", err)
                        }
                        values[colName] = val
                    }
                }
                log.Printf("INSERT INTO %s.%s: %v", rel.Namespace, rel.RelationName, values)

            case *pglogrepl.UpdateMessage:
                // ...
            case *pglogrepl.DeleteMessage:
                // ...
            case *pglogrepl.TruncateMessage:
                // ...

            case *pglogrepl.TypeMessage:
            case *pglogrepl.OriginMessage:
            default:
                log.Printf("Unknown message type in pgoutput stream: %T", logicalMsg)
            }

            clientXLogPos = xld.WALStart + pglogrepl.LSN(len(xld.WALData))
        }
    }
}

func decodeTextColumnData(mi *pgtype.Map, data []byte, dataType uint32) (interface{}, error) {
    if dt, ok := mi.TypeForOID(dataType); ok {
        return dt.Codec.DecodeValue(mi, dataType, pgtype.TextFormatCode, data)
    }
    return string(data), nil
}

source