Change Data Capture intro
Change Data Capture (CDC) is a technique used to track the changes made to data in a database, allowing you to keep track of the evolution of the data. In PostgreSQL, CDC is achieved using the logical replication feature, which enables the selective replication of changes made to specific tables or columns.
Golang is a programming language that has gained popularity in recent years due to its speed and simplicity. It's also well-suited for working with databases, as it has built-in support for SQL databases and a number of powerful libraries for working with them.
Using Golang and CDC in PostgreSQL can be a powerful combination, as it allows you to easily capture and process changes made to your database in real-time. Here are the basic steps you need to follow to get started with Golang and CDC in PostgreSQL:
First, you'll need to enable logical replication on your PostgreSQL database. This can be done by modifying your PostgreSQL configuration file to include the appropriate settings. Once you've done this, you'll need to create a publication that specifies which tables or columns you want to replicate.
Next, you'll need to write a Golang program that connects to your database and subscribes to the logical replication stream. This can be done using the built-in database/sql package in Golang, along with a third-party library such as pq. Once you've subscribed to the stream, you'll start receiving messages containing the changes made to your database.
Finally, you'll need to process these messages and do something with them. This could involve updating a cache or search index, sending a notification to another system, or simply logging the changes for future analysis. Golang provides a number of powerful tools for working with data, including the standard library's encoding/json package for working with JSON data, and third-party libraries like gjson for parsing JSON.
Overall, using Golang and CDC in PostgreSQL can be a powerful combination that allows you to capture and process changes to your database in real-time. With the right tools and techniques, you can use this combination to build powerful real-time data processing systems that can help you stay on top of your data and make more informed decisions.
PostgreSQL docker compose
Below the example of docker compose file to launch locally postgresql configured for listening on changes in database.
version: '3.7'
services:
postgres:
image: postgres:13-alpine
restart: always
environment:
POSTGRES_DB: mydatabase
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_INITDB_ARGS: '--data-checksums'
PGDATA: /var/lib/postgresql/data/pgdata
ports:
- "5434:5432"
volumes:
- ./pgdata:/var/lib/postgresql/data/pgdata
command: postgres -c max_wal_senders=2 -c wal_level=logical -c max_replication_slots=2
Sample go application to listen for changes
This Go script is an example of using the pglogrepl
and pgx
packages to perform logical replication in PostgreSQL.
The script first defines a PostgreSQL connection string and then creates a connection to the database using the pgconn.Connect
function. The script then drops and recreates a publication named pglogrepl_demo
for all tables in the database.
Next, the script creates a new replication slot named pglogrepl_demo
using the pglogrepl.CreateReplicationSlot
function, and starts replication on this slot using the pglogrepl.StartReplication
function.
The script then enters a loop that listens for incoming replication messages using the conn.ReceiveMessage
function. It checks the type of each message and takes action accordingly.
For example, when it receives a PrimaryKeepaliveMessage
, the script logs the message's details and updates the next standby message deadline if necessary.
When it receives an XLogData
message, the script parses the message using pglogrepl.ParseXLogData
and then logs the parsed data. It then attempts to parse the logical replication message using pglogrepl.Parse
and switches on the message type to take the appropriate action.
In this example, the script only handles RelationMessage
and InsertMessage
types. When it receives an InsertMessage
, it logs the values of the inserted row.
package main
import (
"context"
"encoding/hex"
"log"
"time"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/jackc/pgx/v5/pgtype"
)
const (
PGLOGREPL_DEMO_CONN_STRING = "postgres://user:password@127.0.0.1:5434/mydatabase?replication=database"
)
func main() {
// const outputPlugin = "test_decoding"
const outputPlugin = "pgoutput"
conn, err := pgconn.Connect(context.Background(), PGLOGREPL_DEMO_CONN_STRING)
if err != nil {
log.Fatalln("failed to connect to PostgreSQL server:", err)
}
defer conn.Close(context.Background())
result := conn.Exec(context.Background(), "DROP PUBLICATION IF EXISTS pglogrepl_demo;")
_, err = result.ReadAll()
if err != nil {
log.Fatalln("drop publication if exists error", err)
}
result = conn.Exec(context.Background(), "CREATE PUBLICATION pglogrepl_demo FOR ALL TABLES;")
_, err = result.ReadAll()
if err != nil {
log.Fatalln("create publication error", err)
}
log.Println("create publication pglogrepl_demo")
var pluginArguments []string
if outputPlugin == "pgoutput" {
pluginArguments = []string{"proto_version '1'", "publication_names 'pglogrepl_demo'"}
} else if outputPlugin == "wal2json" {
pluginArguments = []string{"\"pretty-print\" 'true'"}
}
sysident, err := pglogrepl.IdentifySystem(context.Background(), conn)
if err != nil {
log.Fatalln("IdentifySystem failed:", err)
}
log.Println("SystemID:", sysident.SystemID, "Timeline:", sysident.Timeline, "XLogPos:", sysident.XLogPos, "DBName:", sysident.DBName)
slotName := "pglogrepl_demo"
_, err = pglogrepl.CreateReplicationSlot(context.Background(), conn, slotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{Temporary: true})
if err != nil {
log.Fatalln("CreateReplicationSlot failed:", err)
}
log.Println("Created temporary replication slot:", slotName)
err = pglogrepl.StartReplication(context.Background(), conn, slotName, sysident.XLogPos, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
if err != nil {
log.Fatalln("StartReplication failed:", err)
}
log.Println("Logical replication started on slot", slotName)
clientXLogPos := sysident.XLogPos
standbyMessageTimeout := time.Second * 10
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
relations := map[uint32]*pglogrepl.RelationMessage{}
typeMap := pgtype.NewMap()
for {
if time.Now().After(nextStandbyMessageDeadline) {
err = pglogrepl.SendStandbyStatusUpdate(context.Background(), conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: clientXLogPos})
if err != nil {
log.Fatalln("SendStandbyStatusUpdate failed:", err)
}
log.Println("Sent Standby status message")
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
ctx, cancel := context.WithDeadline(context.Background(), nextStandbyMessageDeadline)
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil {
if pgconn.Timeout(err) {
continue
}
log.Fatalln("ReceiveMessage failed:", err)
}
if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
log.Fatalf("received Postgres WAL error: %+v", errMsg)
}
msg, ok := rawMsg.(*pgproto3.CopyData)
if !ok {
log.Printf("Received unexpected message: %T\n", rawMsg)
continue
}
switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])
if err != nil {
log.Fatalln("ParsePrimaryKeepaliveMessage failed:", err)
}
log.Println("Primary Keepalive Message =>", "ServerWALEnd:", pkm.ServerWALEnd, "ServerTime:", pkm.ServerTime, "ReplyRequested:", pkm.ReplyRequested)
if pkm.ReplyRequested {
nextStandbyMessageDeadline = time.Time{}
}
case pglogrepl.XLogDataByteID:
xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
if err != nil {
log.Fatalln("ParseXLogData failed:", err)
}
log.Printf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s WALData:\n%s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime, hex.Dump(xld.WALData))
logicalMsg, err := pglogrepl.Parse(xld.WALData)
if err != nil {
log.Fatalf("Parse logical replication message: %s", err)
}
log.Printf("Receive a logical replication message: %s", logicalMsg.Type())
switch logicalMsg := logicalMsg.(type) {
case *pglogrepl.RelationMessage:
relations[logicalMsg.RelationID] = logicalMsg
case *pglogrepl.BeginMessage:
// Indicates the beginning of a group of changes in a transaction. This is only sent for committed transactions. You won't get any events from rolled back transactions.
case *pglogrepl.CommitMessage:
case *pglogrepl.InsertMessage:
rel, ok := relations[logicalMsg.RelationID]
if !ok {
log.Fatalf("unknown relation ID %d", logicalMsg.RelationID)
}
values := map[string]interface{}{}
for idx, col := range logicalMsg.Tuple.Columns {
colName := rel.Columns[idx].Name
switch col.DataType {
case 'n': // null
values[colName] = nil
case 'u': // unchanged toast
// This TOAST value was not changed. TOAST values are not stored in the tuple, and logical replication doesn't want to spend a disk read to fetch its value for you.
case 't': //text
val, err := decodeTextColumnData(typeMap, col.Data, rel.Columns[idx].DataType)
if err != nil {
log.Fatalln("error decoding column data:", err)
}
values[colName] = val
}
}
log.Printf("INSERT INTO %s.%s: %v", rel.Namespace, rel.RelationName, values)
case *pglogrepl.UpdateMessage:
// ...
case *pglogrepl.DeleteMessage:
// ...
case *pglogrepl.TruncateMessage:
// ...
case *pglogrepl.TypeMessage:
case *pglogrepl.OriginMessage:
default:
log.Printf("Unknown message type in pgoutput stream: %T", logicalMsg)
}
clientXLogPos = xld.WALStart + pglogrepl.LSN(len(xld.WALData))
}
}
}
func decodeTextColumnData(mi *pgtype.Map, data []byte, dataType uint32) (interface{}, error) {
if dt, ok := mi.TypeForOID(dataType); ok {
return dt.Codec.DecodeValue(mi, dataType, pgtype.TextFormatCode, data)
}
return string(data), nil
}