34
34
#include "access/xlogutils.h"
35
35
#include "access/xlogreader.h"
36
36
#include "access/xlogrecord.h"
37
+ #include "access/twophase.h"
38
+ //#include "access/twophase_rmgr.h"
37
39
38
40
#include "catalog/pg_control.h"
39
41
@@ -52,6 +54,24 @@ typedef struct XLogRecordBuffer
52
54
XLogReaderState * record ;
53
55
} XLogRecordBuffer ;
54
56
57
+
58
+ typedef struct TwoPhaseFileHeader
59
+ {
60
+ uint32 magic ; /* format identifier */
61
+ uint32 total_len ; /* actual file length */
62
+ TransactionId xid ; /* original transaction XID */
63
+ Oid database ; /* OID of database it was in */
64
+ TimestampTz prepared_at ; /* time of preparation */
65
+ Oid owner ; /* user running the transaction */
66
+ int32 nsubxacts ; /* number of following subxact XIDs */
67
+ int32 ncommitrels ; /* number of delete-on-commit rels */
68
+ int32 nabortrels ; /* number of delete-on-abort rels */
69
+ int32 ninvalmsgs ; /* number of cache invalidation messages */
70
+ bool initfileinval ; /* does relcache init file need invalidation? */
71
+ char gid [200 ]; /* GID for transaction */
72
+ } TwoPhaseFileHeader ;
73
+
74
+
55
75
/* RMGR Handlers */
56
76
static void DecodeXLogOp (LogicalDecodingContext * ctx , XLogRecordBuffer * buf );
57
77
static void DecodeHeapOp (LogicalDecodingContext * ctx , XLogRecordBuffer * buf );
@@ -70,6 +90,7 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
70
90
xl_xact_parsed_commit * parsed , TransactionId xid );
71
91
static void DecodeAbort (LogicalDecodingContext * ctx , XLogRecordBuffer * buf ,
72
92
xl_xact_parsed_abort * parsed , TransactionId xid );
93
+ static void DecodePrepare (LogicalDecodingContext * ctx , XLogRecordBuffer * buf );
73
94
74
95
/* common function to decode tuples */
75
96
static void DecodeXLogTuple (char * data , Size len , ReorderBufferTupleBuf * tup );
@@ -251,16 +272,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
251
272
break ;
252
273
}
253
274
case XLOG_XACT_PREPARE :
254
-
255
- /*
256
- * Currently decoding ignores PREPARE TRANSACTION and will just
257
- * decode the transaction when the COMMIT PREPARED is sent or
258
- * throw away the transaction's contents when a ROLLBACK PREPARED
259
- * is received. In the future we could add code to expose prepared
260
- * transactions in the changestream allowing for a kind of
261
- * distributed 2PC.
262
- */
263
- break ;
275
+ {
276
+ DecodePrepare (ctx , buf );
277
+ break ;
278
+ }
264
279
default :
265
280
elog (ERROR , "unexpected RM_XACT_ID record type: %u" , info );
266
281
}
@@ -524,6 +539,116 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
524
539
commit_time , origin_id , origin_lsn );
525
540
}
526
541
542
+ static void
543
+ DecodePrepare (LogicalDecodingContext * ctx , XLogRecordBuffer * buf )
544
+ {
545
+ XLogReaderState * r = buf -> record ;
546
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr ;
547
+ XLogRecPtr commit_time = InvalidXLogRecPtr ;
548
+ XLogRecPtr origin_id = XLogRecGetOrigin (buf -> record );
549
+ int i ;
550
+
551
+ TransactionId xid ;
552
+ TwoPhaseFileHeader * hdr ;
553
+ char * twophase_buf ;
554
+ int twophase_len ;
555
+ char * twophase_bufptr ;
556
+ TransactionId * children ;
557
+ RelFileNode * commitrels ;
558
+ RelFileNode * abortrels ;
559
+ SharedInvalidationMessage * invalmsgs ;
560
+
561
+
562
+
563
+ // probably there are no origin
564
+ // if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
565
+ // {
566
+ // origin_lsn = parsed->origin_lsn;
567
+ // commit_time = parsed->origin_timestamp;
568
+ // }
569
+
570
+ xid = XLogRecGetXid (r );
571
+ twophase_buf = XLogRecGetData (r );
572
+ twophase_len = sizeof (char ) * XLogRecGetDataLen (r );
573
+
574
+ hdr = (TwoPhaseFileHeader * ) twophase_buf ;
575
+ Assert (TransactionIdEquals (hdr -> xid , xid ));
576
+ twophase_bufptr = twophase_buf + MAXALIGN (sizeof (TwoPhaseFileHeader ));
577
+ children = (TransactionId * ) twophase_bufptr ;
578
+ twophase_bufptr += MAXALIGN (hdr -> nsubxacts * sizeof (TransactionId ));
579
+ commitrels = (RelFileNode * ) twophase_bufptr ;
580
+ twophase_bufptr += MAXALIGN (hdr -> ncommitrels * sizeof (RelFileNode ));
581
+ abortrels = (RelFileNode * ) twophase_bufptr ;
582
+ twophase_bufptr += MAXALIGN (hdr -> nabortrels * sizeof (RelFileNode ));
583
+ invalmsgs = (SharedInvalidationMessage * ) twophase_bufptr ;
584
+ twophase_bufptr += MAXALIGN (hdr -> ninvalmsgs * sizeof (SharedInvalidationMessage ));
585
+
586
+ /*
587
+ * Process invalidation messages, even if we're not interested in the
588
+ * transaction's contents, since the various caches need to always be
589
+ * consistent.
590
+ */
591
+ if (hdr -> ninvalmsgs > 0 )
592
+ {
593
+ ReorderBufferAddInvalidations (ctx -> reorder , xid , buf -> origptr ,
594
+ hdr -> ninvalmsgs , invalmsgs );
595
+ ReorderBufferXidSetCatalogChanges (ctx -> reorder , xid , buf -> origptr );
596
+ }
597
+
598
+ SnapBuildCommitTxn (ctx -> snapshot_builder , buf -> origptr , xid ,
599
+ hdr -> nsubxacts , children );
600
+
601
+ /* ----
602
+ * Check whether we are interested in this specific transaction, and tell
603
+ * the reorderbuffer to forget the content of the (sub-)transactions
604
+ * if not.
605
+ *
606
+ * There can be several reasons we might not be interested in this
607
+ * transaction:
608
+ * 1) We might not be interested in decoding transactions up to this
609
+ * LSN. This can happen because we previously decoded it and now just
610
+ * are restarting or if we haven't assembled a consistent snapshot yet.
611
+ * 2) The transaction happened in another database.
612
+ * 3) The output plugin is not interested in the origin.
613
+ *
614
+ * We can't just use ReorderBufferAbort() here, because we need to execute
615
+ * the transaction's invalidations. This currently won't be needed if
616
+ * we're just skipping over the transaction because currently we only do
617
+ * so during startup, to get to the first transaction the client needs. As
618
+ * we have reset the catalog caches before starting to read WAL, and we
619
+ * haven't yet touched any catalogs, there can't be anything to invalidate.
620
+ * But if we're "forgetting" this commit because it's it happened in
621
+ * another database, the invalidations might be important, because they
622
+ * could be for shared catalogs and we might have loaded data into the
623
+ * relevant syscaches.
624
+ * ---
625
+ */
626
+
627
+ // Add db check here
628
+ if (SnapBuildXactNeedsSkip (ctx -> snapshot_builder , buf -> origptr ) ||
629
+ FilterByOrigin (ctx , origin_id ))
630
+ {
631
+ for (i = 0 ; i < hdr -> nsubxacts ; i ++ )
632
+ {
633
+ ReorderBufferForget (ctx -> reorder , children [i ], buf -> origptr );
634
+ }
635
+ ReorderBufferForget (ctx -> reorder , xid , buf -> origptr );
636
+
637
+ return ;
638
+ }
639
+
640
+ /* tell the reorderbuffer about the surviving subtransactions */
641
+ for (i = 0 ; i < hdr -> nsubxacts ; i ++ )
642
+ {
643
+ ReorderBufferCommitChild (ctx -> reorder , xid , children [i ],
644
+ buf -> origptr , buf -> endptr );
645
+ }
646
+
647
+ /* replay actions of all transaction + subtransactions in order */
648
+ ReorderBufferCommit (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
649
+ commit_time , origin_id , origin_lsn );
650
+ }
651
+
527
652
/*
528
653
* Get the data from the various forms of abort records and pass it on to
529
654
* snapbuild.c and reorderbuffer.c
0 commit comments