Key Takeaways
- Postgres permits to emit messages into its write-ahead log (WAL), with out updating any precise tables
- Logical decoding messages could be learn utilizing change information seize instruments like Debezium
- Stream processing instruments like Apache Flink can be utilized to course of (e.g., enrich, rework, and route) logical decoding messages
- There are a number of variety of use circumstances for logical decoding messages, together with offering audit metadata, software logging, and microservices information alternate
- There is no such thing as a fastened schema for logical decoding messages; it’s on the appliance developer to outline, talk, and evolve such schema
Do you know there’s a operate in Postgres that allows you to write information which you’ll’t question? A operate that allows you to persist information in all types and shapes however which can by no means present up in any desk? Let me let you know about pg_logical_emit_message()
! It’s a Postgres operate that permits you to write messages to the write-ahead log (WAL) of the database.
You may then use logical decoding—Postgres’ change information seize functionality—to retrieve these messages from the WAL, course of them, and relay them to exterior shoppers.
On this article, we’ll discover how you can benefit from this characteristic for implementing three totally different use circumstances:
- Propagating information between microservices by way of the outbox sample
- Utility logging
- Enriching audit logs with metadata
For retrieving logical decoding messages from Postgres we’re going to use Debezium, a well-liked open-source platform for log-based change information seize (CDC), which may stream information modifications from a big number of databases into information streaming platforms like Apache Kafka or AWS Kinesis.
We’ll additionally use Apache Flink and the Flink CDC mission, which seamlessly integrates Debezium into the Flink ecosystem, for enriching and routing uncooked change occasion streams. You may be taught extra concerning the foundations of change information seize and Debezium on this discuss from QCon San Francisco.
Logical Decoding Messages 101
Earlier than diving into particular use circumstances, let’s check out how logical decoding messages could be emitted and consumed. To observe alongside, be certain that to have Docker put in in your machine. Begin by trying out this instance mission from GitHub:
git clone https://github.com/decodableco/examples.git
cd examples/postgres-logical-decoding
The mission incorporates a Docker Compose file for operating a Postgres database, which is enabled for logical replication already. Begin it like so:
docker compose up
Then, in one other terminal window, hook up with that Postgres occasion utilizing the pgcli command line shopper:
docker run --tty --rm -i
--network logical-decoding-network
quay.io/debezium/tooling:1.2 bash -c
'pgcli postgresql://postgresuser:postgrespw@postgres:5432/demodb'
Subsequent, you want to create a replication slot. A replication slot represents one particular stream of modifications coming from a Postgres database and retains monitor of how far a shopper has processed this stream. For this objective, it shops the most recent log sequence quantity (LSN) that the slot’s shopper has processed and acknowledged.
Every slot has a reputation and an assigned decoding plug-in which defines the format of that stream. Create a slot utilizing the “test_decoding” plug-in, which emits modifications in a easy text-based protocol, like this:
postgresuser@postgres:demodb> SELECT * FROM pg_create_logical_replication_slot('demo_slot', 'test_decoding');
+-------------+-----------+
| slot_name | lsn |
|-------------+-----------|
| demo_slot | 0/1A24E38 |
+-------------+-----------+
For manufacturing situations it is strongly recommended to make use of the pgoutput plug-in, which emits change occasions utilizing an environment friendly Postgres-specific binary format and is on the market by default in Postgres since model 10. Different generally used choices embody the Decoderbufs plug-in (primarily based on the Google Protocol Buffers format) and wal2json (emitting change occasions as JSON).
Adjustments are sometimes retrieved from distant shoppers reminiscent of Debezium by establishing a replication stream with the database. Alternatively, you should utilize the operate pg_logical_slot_get_changes()
, which helps you to fetch modifications from a given replication slot by way of SQL, optionally studying solely as much as a particular LSN (the primary NULL parameter) or solely a particular variety of modifications (the second NULL parameter). This turns out to be useful for testing functions:
postgresuser@postgres:demodb> SELECT * FROM pg_logical_slot_get_changes('demo_slot', NULL, NULL);
+-------+-------+--------+
| lsn | xid | information |
|-------+-------+--------|
+-------+-------+--------+
No modifications must be returned at this level. Let’s insert a logical decoding message utilizing the pg_logical_emit_message() operate:
postgresuser@postgres:demodb> SELECT * FROM pg_logical_emit_message(true, 'context', 'Howdy World!');
+---------------------------+
| pg_logical_emit_message |
|---------------------------|
| 0/1A24F68 |
+---------------------------+
The operate has three parameters:
transactional
: a boolean flag indicating whether or not the message must be transactional or not; when issued whereas a transaction is pending and that transaction will get rolled again finally, a transactional message wouldn’t be emitted, whereas a non-transactional message could be written to the WAL howeverprefix
: a textual identifier for categorizing messages; for example, this might point out the kind of a particular messagecontent material
: the precise payload of the message, both as textual content or binary information; you may have full flexibility of what to emit right here, e.g., in regard to format, schema, and semantics
If you retrieve modifications from the slot once more after having emitted a message, you now ought to see three change occasions: a BEGIN
and a COMMIT
occasion for the implicitly created transaction when emitting the occasion, and the “Howdy World!” message itself. Notice that this message doesn’t seem in any Postgres desk or view as could be the case when including information utilizing the INSERT
assertion; this message is solely current within the database’s transaction log.
There are just a few different helpful capabilities coping with logical decoding messages and replication slots, together with the next:
pg_logical_slot_get_binary_changes()
: retrieves binary messages from a slotpg_logical_slot_peek_changes()
: permits to try modifications from a slot with out advancing itpg_replication_slot_advance()
: advances a replication slotpg_drop_replication_slot():
deletes a replication slot
You can also question the pg_replication_slots
view for inspecting the present standing of your replication slots, newest confirmed LSN, and extra.
Use Circumstances
Having mentioned the foundations of logical decoding messages, let’s now discover just a few use circumstances of this convenient Postgres API.
The Outbox Sample
For microservices, it’s a typical requirement that, when processing a request, a service must replace its personal database and concurrently ship a message to different companies. For instance, take into account a “achievement” service in an e-commerce state of affairs: when the standing of a cargo modifications from READY_TO_SHIP to SHIPPED
, the cargo’s document within the achievement service database must be up to date accordingly, but in addition a message must be despatched to the “buyer” service in order that it may well replace the client’s account historical past and set off an electronic mail notification for the client.
Now, when utilizing information streaming platforms like Apache Kafka for connecting your companies, you possibly can’t reliably implement this state of affairs by simply letting the achievement service challenge its native database transaction after which ship a message by way of Kafka. The reason being that it isn’t supported to have shared transactions for a database and Kafka (in technical phrases, Kafka can’t take part in distributed transaction protocols like XA). Whereas all the things seems effective on the floor, you possibly can find yourself with an inconsistent state in case of failures. The database transaction may get dedicated, however sending out the notification by way of Kafka fails. Or, the opposite means round: the customer support will get notified, however the native database transaction will get rolled again.
Whereas you will discover this sort of implementation in lots of functions, at all times bear in mind: “Friends don’t let friends do dual writes”! An answer to this drawback is the outbox sample: as a substitute of making an attempt to replace two sources without delay (a database and Kafka), you solely replace a single one—the service’s database. When updating the cargo state within the database, you additionally write the message to be despatched to an outbox desk; this occurs as a part of one shared transaction, i.e., making use of the atomicity ensures you get from ACID transactions. Both the cargo state replace and the outbox message get persevered, or none of them do. You then use change information seize to retrieve any inserts from the outbox within the database and propagate them to shoppers.
Extra details about the outbox sample could be discovered on this weblog submit on the Debezium weblog. One other useful resource is this text on InfoQ which discusses how the outbox sample can be utilized as the inspiration for implementing Sagas between a number of companies. Within the following, I’d wish to dive into one explicit implementation method for the sample. As a substitute of inserting outbox occasions in a devoted outbox desk, the thought is to emit them simply as logical decoding messages to the WAL.
There are execs and cons to both method. What makes the route by way of logical decoding messages compelling is that it avoids any housekeeping wants. Not like with an outbox desk, there’s no must take away messages after they’ve been consumed from the transaction log. Additionally, this emphasizes the character of an outbox being an append-only medium: messages mustn’t ever be modified after being added to the outbox, which could occur accidentally with a table-based method.
Concerning the content material of outbox messages, you may have full flexibility there on the whole. Sticking to the e-commerce area from above, it may, for example, describe a cargo serialized as JSON, Apache Avro, Google Protocol Buffers, or some other format you select. What’s vital to remember is that whereas the message content material doesn’t adhere to any particular desk schema from a database perspective, it’s topic to an (ideally specific) contract between the sending software and any message shoppers. Particularly, the schema of any emitted occasions ought to solely be modified in the event you be mindful the impression on shoppers and backward compatibility.
One generally used method is to have a look at the design of outbox occasions and their schemas from a domain-driven design perspective. Particularly, Debezium recommends that your messages have the next attributes:
- id: a singular message id, e.g., a UUID, which shoppers can use for deduplication functions
- combination sort: describes the type of combination an occasion is about, e.g., “buyer,” “cargo,” or “buy order”; when propagating outbox occasions by way of Kafka or different streaming platforms, this can be utilized for sending occasions of 1 combination sort to a particular matter
- combination id: the id of the mixture an occasion is about, e.g., a buyer or order id; this can be utilized because the document key in Kafka, thus guaranteeing all occasions pertaining to at least one combination will go to the identical matter partition and ensuring shoppers obtain these occasions within the appropriate order
- payload: the precise message payload; not like “uncooked” table-level CDC occasions, this is usually a wealthy construction, representing a whole combination and all its components, which within the database itself could unfold throughout a number of tables
Determine 1: Routing outbox occasions from the transaction log to totally different Kafka matters
Sufficient of the speculation—let’s see how a database transaction may look, which emits a logical decoding message with an outbox occasion. Within the accompanying GitHub repository, you will discover a Docker Compose file for spinning up all of the required elements and detailed directions for operating the entire instance your self. Emit an outbox message like this:
postgresuser@postgres:demodb> SELECT * FROM pg_logical_emit_message(
true,
'outbox',
'
"id" : "298c2cc3-71bb-4d2b-b5b4-1b14006d56e6",
"aggregate_type" : "cargo",
"aggregate_id" : 42,
"payload" :
"customer_id" : 7398,
"item_id" : 8123,
"standing" : "SHIPPED",
"numberOfPackages" : 3,
"handle" : "Bob Summers, 12 Major St., 90210, Los Angeles/CA, US"
'
);
This creates a transactional message (i.e., it will not be emitted if the transaction aborts, e.g., due to a constraint violation of one other document inserted in the identical transaction). It makes use of the “outbox” prefix (permitting it to differentiate it from messages of different varieties) and incorporates a JSON message because the precise payload.
Concerning retrieving change occasions and propagating them to Kafka, the main points rely upon how precisely Debezium, because the underlying CDC software, is deployed. When used with Kafka Join, Debezium gives a single message rework (SMT) that helps outbox tables and, for example, routes outbox occasions to totally different matters in Kafka primarily based on a configurable column containing the mixture sort. Nonetheless, this SMT doesn’t but assist utilizing logical decoding messages because the outbox format.
When utilizing Debezium by way of Flink CDC, you would implement the same logic utilizing a customized KafkaRecordSerializationSchema
which routes outbox occasions to the precise Kafka matter and propagates the mixture id to the Kafka message key, thus guaranteeing appropriate ordering semantics. A primary implementation of this might appear to be this (you will discover the entire supply code, together with the utilization of this serializer in a Flink job right here):
public class OutboxSerializer implements KafkaRecordSerializationSchema
personal static remaining lengthy serialVersionUID = 1L;
personal ObjectMapper mapper;
@Override
public ProducerRecord serialize(ChangeEvent ingredient,
KafkaSinkContext context, Lengthy timestamp)
attempt
JsonNode content material = ingredient.getMessage().getContent();
ProducerRecord document =
new ProducerRecord(
content material.get("aggregate_type").asText(),
content material.get("aggregate_id").asText().getBytes(Charsets.UTF_8),
mapper.writeValueAsBytes(content material.get("payload"))
);
document.headers().add("message_id",
content material.get("id").asText().getBytes(Charsets.UTF_8));
return document;
catch (JsonProcessingException e)
throw new IllegalArgumentException(
"Could not serialize outbox message", e);
@Override
public void open(InitializationContext context,
KafkaSinkContext sinkContext) throws Exception
mapper = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.addDeserializer(Message.class, new MessageDeserializer());
mapper.registerModule(module);
With that Flink job in place, you’ll have the ability to study the outbox message on the “cargo” Kafka matter like so:
docker run --tty --rm
--network logical-decoding-network
quay.io/debezium/tooling:1.2
kcat -b kafka:9092 -C -o starting -q -t cargo
-f '%okay -- %h -- %sn'
42 -- message_id=298c2cc3-71bb-4d2b-b5b4-1b14006d56e6 -- "customer_id":7398,"item_id":8123,"standing":"SHIPPED","numberOfPackages":3,"handle":"Bob Summers, 12 Major St., 90210, Los Angeles/CA, US"
The subject title corresponds to the desired combination sort, i.e., in the event you had been to challenge outbox occasions for different combination varieties, they’d be routed to totally different matters accordingly. The message secret is 42, matching the mixture id. The combination id is propagated as a Kafka message header, enabling shoppers to implement environment friendly deduplication by protecting monitor of the ids they’ve already obtained and processed and ignoring any potential duplicates they might encounter. Lastly, the payload of the outbox occasion is propagated because the Kafka message worth.
Particularly, in bigger organizations with a various set of occasion producers and shoppers, it is smart to align on a shared occasion envelope format, which standardizes widespread attributes like occasion timestamp, origin, partitioning key, schema URLs, and others. The CloudEvents specification turns out to be useful right here, particularly for outlining occasion varieties and their schemas. It’s an possibility price contemplating to have your functions emit outbox occasions adhering to the CloudEvents normal.
Logging
Whereas log administration of contemporary functions sometimes occurs by means of devoted platforms like Datadog or Splunk, which ingest modifications from devoted APIs or logs within the file system, it typically could be handy to persist log messages within the database of an software. Log libraries such because the extensively used log4j 2 present database-backed appenders for this objective. These will sometimes require a second connection for the logger, although, as a result of in case of a rollback of an software transaction itself, you continue to (and specifically then) need to write out any log messages, serving to you with failure evaluation.
Non-transactional logical decoding messages is usually a good technique of utilizing a single connection and nonetheless guaranteeing that log messages persist, additionally when a transaction is rolled again. For instance, let’s take into account the next scenario with two transactions, one among which is dedicated and one rolled again:
Determine 2: Utilizing non-transactional logical decoding messages for logging functions
To observe alongside, run the next sequence of statements within the pgcli shell:
–- Assuming this desk: CREATE TABLE information (id INTEGER, worth TEXT);
BEGIN;
INSERT INTO information(id, worth) VALUES('1', 'foo');
SELECT * FROM pg_logical_emit_message(false, 'log', 'OK');
INSERT INTO information(id, worth) VALUES('2', 'bar');
COMMIT;
BEGIN;
INSERT INTO information(id, worth) VALUES('3', 'baz');
SELECT * FROM pg_logical_emit_message(false, 'log', 'ERROR');
INSERT INTO information(id, worth) VALUES('4', 'qux');
ROLLBACK;
The primary transaction inserts two data in a brand new desk, “information” and in addition emits a logical decoding message. The second transaction applies related modifications however then is rolled again. When retrieving the change occasions from the replication slot (utilizing the “testing” decoding plug-in as proven above), the next occasions might be returned:
postgresuser@postgres:demodb> SELECT * FROM pg_logical_slot_peek_changes('demo_slot', NULL, NULL) order by lsn;
+-----------+-------+------------------------------------------------------------+
| lsn | xid | information |
|-----------+-------+------------------------------------------------------------|
| 0/1A483F8 | 768 | BEGIN 768 |
| 0/1A504B8 | 768 | desk public.information: INSERT: id[integer]:1 worth[text]:'foo' |
| 0/1A50530 | 768 | message: transactional: 0 prefix: log, sz: 2 content material:OK |
| 0/1A50530 | 768 | desk public.information: INSERT: id[integer]:2 worth[text]:'bar' |
| 0/1A509B8 | 768 | COMMIT 768 |
| 0/1A50A38 | 769 | message: transactional: 0 prefix: log, sz: 5 content material:ERROR |
+-----------+-------+------------------------------------------------------------+
As anticipated, there are two INSERT occasions and the log message for the primary transaction. Nonetheless, there aren’t any change occasions for the aborted transaction for the INSERT statements, because it was rolled again. However because the logical decoding message was non-transactional, it nonetheless was written to the WAL and could be retrieved. I.e., you truly can have that cake and eat it too!
Audit Logs
In enterprise functions, protecting an audit log of your information is a typical requirement, i.e., an entire path of all of the modifications accomplished to a database document, reminiscent of a purchase order order or a buyer.
There are a number of potential approaches for constructing such an audit log; one among them is to repeat earlier document variations right into a separate historical past desk every time an information change is made. Arguably, this will increase software complexity. Relying on the particular implementation technique, you may need to deploy triggers for all of the tables that must be audited or add libraries reminiscent of Hibernate Envers, an extension to the favored Hibernate object-relational mapping software. As well as, there’s a efficiency impression, because the audit data are inserted as a part of the appliance’s transactions, thus growing write latency.
Change information seize is an attention-grabbing various for constructing audit logs: extracting information modifications from the database transaction log requires no modifications to writing functions. A change occasion stream, with occasions for all of the inserts, updates, and deletes executed for a desk—e.g., persevered as a subject in Apache Kafka, whose data are immutable by definition—might be thought-about a easy type of an audit log. Because the CDC course of runs asynchronously, there’s no latency impression on writing transactions.
One shortcoming of this method—no less than in its most elementary kind—is that it doesn’t seize contextual metadata, like the appliance consumer making a given change, shopper data like gadget configuration or IP handle, use case identifiers, and many others. Sometimes, this information shouldn’t be saved within the enterprise tables of an software and thus isn’t uncovered in uncooked change information occasions.
The mix of logical decoding messages and stream processing, with Apache Flink, can present an answer right here. Originally of every transaction, the supply software writes all of the required metadata right into a message; compared to writing a full historical past entry for every modified document, this simply provides a small overhead on the write path. You may then use a easy Flink job for enriching all the next change occasions from that very same transaction with that metadata. As all change occasions emitted by Debezium comprise the id of the transaction they originate from, together with logical decoding messages, correlating the occasions of 1 transaction isn’t difficult. The next picture reveals the overall concept:
Determine 3: Enriching information change occasions with transaction-scoped audit metadata
On the subject of implementing this logic with Apache Flink, you are able to do this utilizing a fairly easy mapping operate, particularly by implementing the RichFlatMapFunction interface, which lets you mix the enrichment performance and the removing of the unique logical decoding messages in a single operator name:
public void flatMap(String worth, Collector out)
throws Exception {
ChangeEvent changeEvent = mapper.readValue(worth, ChangeEvent.class);
String op = changeEvent.getOp();
String txId = changeEvent.getSource().get("txId").asText();
// logical decoding message
if (op.equals("m"))
Message message = changeEvent.getMessage();
// an audit metadata message -> bear in mind it
if (message.getPrefix().equals("audit"))
localAuditState = new AuditState(txId, message.getContent());
return;
else
out.accumulate(worth);
// an information change occasion -> enrich it with the metadata
else
if (txId != null && localAuditState != null)
if (txId.equals(localAuditState.getTxId()))
changeEvent.setAuditData(localAuditState.getState());
else
localAuditState = null;
changeEvent.setTransaction(null);
out.accumulate(mapper.writeValueAsString(changeEvent));
The logic is as follows:
- When the incoming occasion is of sort “m” (i.e., a logical decoding message) and it’s an audit metadata occasion, put the content material of the occasion right into a Flink worth state
- When the incoming occasion is of some other sort, and we’ve saved audit state for the occasion’s transaction earlier than, enrich the occasion with that state
- When the transaction id of the incoming occasion doesn’t match what’s saved within the audit state (e.g., when a transaction was issued with no metadata occasion firstly), clear the state retailer and propagate the occasion as is
You could find a easy but full Flink job that runs that mapping operate towards the Flink CDC connector for Postgres within the aforementioned GitHub repository. See the directions within the README for operating that job, triggering some information modifications, and observing the enriched change occasions. For instance, let’s take into account the next transaction which first emits a logical decoding message with the transaction metadata (consumer title and shopper IP handle) after which two INSERT statements:
BEGIN;
SELECT * FROM pg_logical_emit_message(true, 'audit', ' "consumer" : "bob@instance.com", "shopper" : "10.0.0.1" ');
INSERT INTO stock.buyer(first_name, last_name, electronic mail) VALUES ('Bob', 'Inexperienced', 'bob@instance.com');
INSERT INTO stock.handle
(customer_id, sort, line_1, line_2, zip_code, metropolis, nation)
VALUES
(currval('stock.customer_id_seq'), 'House', '12 Major St.', 'sdf', '90210', 'Los Angeles', 'US');
COMMIT;
The enriched change occasions, as emitted by Apache Flink, would appear to be so:
"op" : "c",
"ts_ms" : 1673434483049,
"supply" :
"connector" : "postgresql",
"snapshot" : false,
"db" : "demodb",
"desk" : "buyer"
"lsn" : 24023128,
"txId" : 555,
...
,
"earlier than" : null,
"after" :
"id" : 1018,
"first_name" : "Bob",
"last_name" : "Inexperienced",
"electronic mail" : "bobasdf@instance.com"
,
"auditData" :
"consumer" : "bob@instance.com",
"shopper" : "10.0.0.1"
"op" : "c",
"ts_ms" : 1673434483050,
"supply" :
"connector" : "postgresql",
"snapshot" : false,
"db" : "demodb",
"desk" : "handle"
"lsn" : 24023129,
"txId" : 555,
...
,
"earlier than" : null,
"after" :
"id" : 10007,
"customer_id" : 1018,
"sort" : "House",
"line_1" : "12 Major St.",
"line_2" : "sdf",
"zip_code" : "90210",
"metropolis" : "Los Angeles",
"nation" : "US"
,
"auditData" :
"consumer" : "bob@instance.com",
"shopper" : "10.0.0.1"
Inside the similar Flink job, you now may add a sink connector and for example write the enriched occasions right into a Kafka matter. Alternatively, relying on your corporation necessities, it may be a good suggestion to propagate the change occasions right into a queryable retailer, for example, an OLAP retailer like Apache Pinot or Clickhouse. You may use the identical method for enriching change occasions with contextual metadata for different functions too, typically talking for capturing all types of “intent” which isn’t instantly persevered within the enterprise tables of your software.
Bonus: Advancing Replication Slots
Lastly, let’s focus on a technical use case for logical decoding messages: advancing Postgres replication slots. This could turn out to be useful in sure situations, the place in any other case giant segments of the WAL might be retained by the database, finally inflicting the database machine to expire of disk house.
It is because replication slots are at all times created within the context of a particular database, whereas the WAL is shared between all of the databases on the identical Postgres host. This implies a replication slot arrange for a database with none information modifications and which, subsequently, can’t advance, will retain doubtlessly giant chunks of WAL if modifications are made to a different database on the identical host.
To expertise this case, cease the at the moment operating Docker Compose set-up and launch this various Compose file from the instance mission:
docker compose -f docker-compose-multi-db.yml up
This spins up a Postgres database container with two databases, DB1 and DB2. Then launch the AdvanceSlotMain class. You are able to do so by way of Maven (notice that is only for demonstration and improvement functions; often, you’d package deal up your Flink job as a JAR and deploy it to a operating Flink cluster):
mvn exec:exec@advanceslot
It runs a easy Flink pipeline that retrieves all modifications from the DB2 database and prints them out on the console. Now, do some modifications on the DB1 database:
docker run --tty --rm -i
--network logical-decoding-network
quay.io/debezium/tooling:1.2
bash -c 'pgcli postgresql://postgresuser:postgrespw@order-db:5432/db1'
postgresuser@order-db:db1> CREATE TABLE information (id INTEGER, worth TEXT);
postgresuser@order-db:db1> INSERT INTO information SELECT generate_series(1,1000) AS id, md5(random()::textual content) AS worth;
Question the standing of the replication slot (“flink”, arrange for database “DB2”), and as you retain operating extra inserts in DB1, you’ll see that the retained WAL of that slot repeatedly grows, so long as there aren’t any modifications accomplished over in DB2:
postgresuser@order-db:db1> SELECT
slot_name,
database,
pg_size_pretty(
pg_wal_lsn_diff(
pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
energetic,
restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
+-----------------+------------+----------------+----------+---------------+-----------------------+
| slot_name | database | retained_wal | energetic | restart_lsn | confirmed_flush_lsn |
|-----------------+------------+----------------+----------+---------------+-----------------------|
| flink | db2 | 526 kB | True | 0/22BA030 | 0/22BA030 |
+-----------------+------------+----------------+----------+---------------+-----------------------+
The issue is that so long as there aren’t any modifications within the DB2 database, the CDC connector of the operating Flink job won’t ever be invoked and thus by no means have an opportunity to acknowledge the most recent processed LSN of its replication slot. Now, let’s use pg_logical_emit_message() to repair this case. Get one other Postgres shell, this time for DB2, and emit a message like so:
docker run --tty --rm -i
--network logical-decoding-network
quay.io/debezium/tooling:1.2
bash -c 'pgcli postgresql://postgresuser:postgrespw@order-db:5432/db2'
postgresuser@order-db:db2> SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);
Within the console output of AdvanceSlotMain you need to see the change occasion emitted by the Debezium connector for that message. With the following checkpoint issued by Flink (search for “Accomplished checkpoint XYZ for job …” messages within the log), the LSN of that occasion may also be flushed to the database, basically permitting the database to discard any WAL segments earlier than that. In the event you now study the replication slot once more, you need to discover that the “retained WAL” worth is far decrease than earlier than (as this course of is asynchronous, it might take a bit till the disk house is freed up).
Wrapping Up
Logical decoding messages should not extensively recognized but very highly effective instruments, which must be within the field for each software program engineer working with Postgres. As you’ve seen, the flexibility to emit messages into the write-ahead log with out them ever surfacing in any precise desk permits for quite a few attention-grabbing use circumstances, reminiscent of dependable information alternate between microservices (thus avoiding unsafe twin writes), software logging, or offering metadata for constructing audit logs. Using stateful stream processing utilizing Apache Flink, you possibly can enrich and route your captured messages in addition to apply different operations in your information change occasions, reminiscent of filtering, becoming a member of, windowed aggregations, and extra.
The place there may be nice energy, there are additionally nice tasks. As logical decoding messages don’t have an specific schema, not like your database tables, the appliance developer should outline smart contracts and punctiliously evolve them, at all times protecting backward compatibility in thoughts. The CloudEvents format is usually a helpful basis to your customized message schemas, offering all of the producers and shoppers in a company with a constant message construction and well-defined semantics.
In the event you’d wish to get began along with your explorations round logical decoding messages, have a look at the GitHub repo accompanying this text, which incorporates the supply code of all of the examples proven above and detailed directions for operating them.
Many due to Hans-Peter Grahsl, Robert Metzger, and Srini Penchikala for his or her suggestions whereas writing this text.