Skip to content

Commit 7f08387

Browse files
committed
decode prepares. WIP
1 parent 12085bf commit 7f08387

File tree

1 file changed

+135
-10
lines changed

1 file changed

+135
-10
lines changed

src/backend/replication/logical/decode.c

Lines changed: 135 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
#include "access/xlogutils.h"
3535
#include "access/xlogreader.h"
3636
#include "access/xlogrecord.h"
37+
#include "access/twophase.h"
38+
//#include "access/twophase_rmgr.h"
3739

3840
#include "catalog/pg_control.h"
3941

@@ -52,6 +54,24 @@ typedef struct XLogRecordBuffer
5254
XLogReaderState *record;
5355
} XLogRecordBuffer;
5456

57+
58+
typedef struct TwoPhaseFileHeader
59+
{
60+
uint32 magic; /* format identifier */
61+
uint32 total_len; /* actual file length */
62+
TransactionId xid; /* original transaction XID */
63+
Oid database; /* OID of database it was in */
64+
TimestampTz prepared_at; /* time of preparation */
65+
Oid owner; /* user running the transaction */
66+
int32 nsubxacts; /* number of following subxact XIDs */
67+
int32 ncommitrels; /* number of delete-on-commit rels */
68+
int32 nabortrels; /* number of delete-on-abort rels */
69+
int32 ninvalmsgs; /* number of cache invalidation messages */
70+
bool initfileinval; /* does relcache init file need invalidation? */
71+
char gid[200]; /* GID for transaction */
72+
} TwoPhaseFileHeader;
73+
74+
5575
/* RMGR Handlers */
5676
static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
5777
static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -70,6 +90,7 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
7090
xl_xact_parsed_commit *parsed, TransactionId xid);
7191
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
7292
xl_xact_parsed_abort *parsed, TransactionId xid);
93+
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
7394

7495
/* common function to decode tuples */
7596
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -251,16 +272,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
251272
break;
252273
}
253274
case XLOG_XACT_PREPARE:
254-
255-
/*
256-
* Currently decoding ignores PREPARE TRANSACTION and will just
257-
* decode the transaction when the COMMIT PREPARED is sent or
258-
* throw away the transaction's contents when a ROLLBACK PREPARED
259-
* is received. In the future we could add code to expose prepared
260-
* transactions in the changestream allowing for a kind of
261-
* distributed 2PC.
262-
*/
263-
break;
275+
{
276+
DecodePrepare(ctx, buf);
277+
break;
278+
}
264279
default:
265280
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
266281
}
@@ -524,6 +539,116 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
524539
commit_time, origin_id, origin_lsn);
525540
}
526541

542+
static void
543+
DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
544+
{
545+
XLogReaderState *r = buf->record;
546+
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
547+
XLogRecPtr commit_time = InvalidXLogRecPtr;
548+
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
549+
int i;
550+
551+
TransactionId xid;
552+
TwoPhaseFileHeader *hdr;
553+
char *twophase_buf;
554+
int twophase_len;
555+
char *twophase_bufptr;
556+
TransactionId *children;
557+
RelFileNode *commitrels;
558+
RelFileNode *abortrels;
559+
SharedInvalidationMessage *invalmsgs;
560+
561+
562+
563+
// probably there are no origin
564+
// if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
565+
// {
566+
// origin_lsn = parsed->origin_lsn;
567+
// commit_time = parsed->origin_timestamp;
568+
// }
569+
570+
xid = XLogRecGetXid(r);
571+
twophase_buf = XLogRecGetData(r);
572+
twophase_len = sizeof(char) * XLogRecGetDataLen(r);
573+
574+
hdr = (TwoPhaseFileHeader *) twophase_buf;
575+
Assert(TransactionIdEquals(hdr->xid, xid));
576+
twophase_bufptr = twophase_buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
577+
children = (TransactionId *) twophase_bufptr;
578+
twophase_bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
579+
commitrels = (RelFileNode *) twophase_bufptr;
580+
twophase_bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
581+
abortrels = (RelFileNode *) twophase_bufptr;
582+
twophase_bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
583+
invalmsgs = (SharedInvalidationMessage *) twophase_bufptr;
584+
twophase_bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
585+
586+
/*
587+
* Process invalidation messages, even if we're not interested in the
588+
* transaction's contents, since the various caches need to always be
589+
* consistent.
590+
*/
591+
if (hdr->ninvalmsgs > 0)
592+
{
593+
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
594+
hdr->ninvalmsgs, invalmsgs);
595+
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
596+
}
597+
598+
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
599+
hdr->nsubxacts, children);
600+
601+
/* ----
602+
* Check whether we are interested in this specific transaction, and tell
603+
* the reorderbuffer to forget the content of the (sub-)transactions
604+
* if not.
605+
*
606+
* There can be several reasons we might not be interested in this
607+
* transaction:
608+
* 1) We might not be interested in decoding transactions up to this
609+
* LSN. This can happen because we previously decoded it and now just
610+
* are restarting or if we haven't assembled a consistent snapshot yet.
611+
* 2) The transaction happened in another database.
612+
* 3) The output plugin is not interested in the origin.
613+
*
614+
* We can't just use ReorderBufferAbort() here, because we need to execute
615+
* the transaction's invalidations. This currently won't be needed if
616+
* we're just skipping over the transaction because currently we only do
617+
* so during startup, to get to the first transaction the client needs. As
618+
* we have reset the catalog caches before starting to read WAL, and we
619+
* haven't yet touched any catalogs, there can't be anything to invalidate.
620+
* But if we're "forgetting" this commit because it's it happened in
621+
* another database, the invalidations might be important, because they
622+
* could be for shared catalogs and we might have loaded data into the
623+
* relevant syscaches.
624+
* ---
625+
*/
626+
627+
// Add db check here
628+
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
629+
FilterByOrigin(ctx, origin_id))
630+
{
631+
for (i = 0; i < hdr->nsubxacts; i++)
632+
{
633+
ReorderBufferForget(ctx->reorder, children[i], buf->origptr);
634+
}
635+
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
636+
637+
return;
638+
}
639+
640+
/* tell the reorderbuffer about the surviving subtransactions */
641+
for (i = 0; i < hdr->nsubxacts; i++)
642+
{
643+
ReorderBufferCommitChild(ctx->reorder, xid, children[i],
644+
buf->origptr, buf->endptr);
645+
}
646+
647+
/* replay actions of all transaction + subtransactions in order */
648+
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
649+
commit_time, origin_id, origin_lsn);
650+
}
651+
527652
/*
528653
* Get the data from the various forms of abort records and pass it on to
529654
* snapbuild.c and reorderbuffer.c

0 commit comments

Comments
 (0)