Skip to content

Commit 0a7653a

Browse files
committed
ParsePrepareRecord -- broken
1 parent f8a58a4 commit 0a7653a

File tree

5 files changed

+133
-106
lines changed

5 files changed

+133
-106
lines changed

reinit.sh

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,25 +98,25 @@ reinit_master
9898
# );
9999
# SQL
100100

101-
# ./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
102-
103-
# ./install/bin/psql <<SQL
104-
# begin;
105-
# insert into t values (42);
106-
# prepare transaction 'hellyeah';
107-
# rollback prepared 'hellyeah';
108-
# SQL
109-
110-
# ./install/bin/psql <<SQL
111-
# SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
112-
# NULL, NULL,
113-
# 'expected_encoding', 'UTF8',
114-
# 'min_proto_version', '1',
115-
# 'max_proto_version', '1',
116-
# 'startup_params_format', '1',
117-
# 'proto_format', 'json',
118-
# 'no_txinfo', 't');
119-
# SQL
101+
./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
102+
103+
./install/bin/psql <<SQL
104+
begin;
105+
insert into t values (42);
106+
prepare transaction 'hellyeah';
107+
rollback prepared 'hellyeah';
108+
SQL
109+
110+
./install/bin/psql <<SQL
111+
SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
112+
NULL, NULL,
113+
'expected_encoding', 'UTF8',
114+
'min_proto_version', '1',
115+
'max_proto_version', '1',
116+
'startup_params_format', '1',
117+
'proto_format', 'json',
118+
'no_txinfo', 't');
119+
SQL
120120

121121

122122

src/backend/access/transam/twophase.c

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,50 @@ TwoPhaseGetDummyProc(TransactionId xid)
826826
return &ProcGlobal->allProcs[gxact->pgprocno];
827827
}
828828

829+
/************************************************************************/
830+
/* State file support */
831+
/************************************************************************/
832+
833+
#define TwoPhaseFilePath(path, xid) \
834+
snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
835+
836+
/*
837+
* 2PC state file format:
838+
*
839+
* 1. TwoPhaseFileHeader
840+
* 2. TransactionId[] (subtransactions)
841+
* 3. RelFileNode[] (files to be deleted at commit)
842+
* 4. RelFileNode[] (files to be deleted at abort)
843+
* 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
844+
* 6. TwoPhaseRecordOnDisk
845+
* 7. ...
846+
* 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
847+
* 9. checksum (CRC-32C)
848+
*
849+
* Each segment except the final checksum is MAXALIGN'd.
850+
*/
851+
852+
/*
853+
* Header for a 2PC state file
854+
*/
855+
#define TWOPHASE_MAGIC 0x57F94532 /* format identifier */
856+
857+
typedef struct TwoPhaseFileHeader
858+
{
859+
uint32 magic; /* format identifier */
860+
uint32 total_len; /* actual file length */
861+
TransactionId xid; /* original transaction XID */
862+
Oid database; /* OID of database it was in */
863+
TimestampTz prepared_at; /* time of preparation */
864+
Oid owner; /* user running the transaction */
865+
int32 nsubxacts; /* number of following subxact XIDs */
866+
int32 ncommitrels; /* number of delete-on-commit rels */
867+
int32 nabortrels; /* number of delete-on-abort rels */
868+
int32 ninvalmsgs; /* number of cache invalidation messages */
869+
bool initfileinval; /* does relcache init file need invalidation? */
870+
char gid[GIDSIZE]; /* GID for transaction */
871+
} TwoPhaseFileHeader;
872+
829873
/*
830874
* Header for each record in a state file
831875
*
@@ -1189,6 +1233,35 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
11891233
return buf;
11901234
}
11911235

1236+
/*
1237+
* ParsePrepareRecord
1238+
*/
1239+
void
1240+
ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
1241+
{
1242+
TwoPhaseFileHeader *hdr;
1243+
char *bufptr;
1244+
1245+
hdr = (TwoPhaseFileHeader *) xlrec;
1246+
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
1247+
1248+
parsed->twophase_xid = hdr->xid;
1249+
parsed->dbId = hdr->database;
1250+
strcpy(parsed->twophase_gid, hdr->gid);
1251+
1252+
parsed->subxacts = (TransactionId *) bufptr;
1253+
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1254+
1255+
parsed->xnodes = (RelFileNode *) bufptr;
1256+
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1257+
1258+
/* Ignoring abortrels? */
1259+
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1260+
1261+
parsed->msgs = (SharedInvalidationMessage *) bufptr;
1262+
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1263+
}
1264+
11921265

11931266
/*
11941267
* Reads 2PC data from xlog. During checkpoint this data will be moved to

src/backend/replication/logical/decode.c

Lines changed: 21 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
7070
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
7171
xl_xact_parsed_commit *parsed, TransactionId xid);
7272
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
73-
xl_xact_parsed_abort *parsed, TransactionId xid);
74-
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
73+
xl_xact_parsed_abort *parsed, TransactionId xid);
74+
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
75+
xl_xact_parsed_prepare *parsed);
7576

7677
/* common function to decode tuples */
7778
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -256,7 +257,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
256257
}
257258
case XLOG_XACT_PREPARE:
258259
{
259-
DecodePrepare(ctx, buf);
260+
xl_xact_parsed_prepare parsed;
261+
262+
ParsePrepareRecord(XLogRecGetInfo(buf->record), XLogRecGetData(buf->record), &parsed);
263+
DecodePrepare(ctx, buf, &parsed);
260264
break;
261265
}
262266
default:
@@ -533,74 +537,49 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
533537
}
534538

535539
static void
536-
DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
540+
DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
541+
xl_xact_parsed_prepare *parsed)
537542
{
538-
XLogReaderState *r = buf->record;
539543
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
540-
XLogRecPtr commit_time = InvalidXLogRecPtr;
544+
TimestampTz commit_time = 0;
541545
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
542546
int i;
543-
544-
TransactionId xid;
545-
TwoPhaseFileHeader *hdr;
546-
char *twophase_buf;
547-
int twophase_len;
548-
char *twophase_bufptr;
549-
TransactionId *children;
550-
RelFileNode *commitrels;
551-
RelFileNode *abortrels;
552-
SharedInvalidationMessage *invalmsgs;
553-
554-
xid = XLogRecGetXid(r);
555-
twophase_buf = XLogRecGetData(r);
556-
twophase_len = sizeof(char) * XLogRecGetDataLen(r);
557-
558-
hdr = (TwoPhaseFileHeader *) twophase_buf;
559-
Assert(TransactionIdEquals(hdr->xid, xid));
560-
twophase_bufptr = twophase_buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
561-
children = (TransactionId *) twophase_bufptr;
562-
twophase_bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
563-
commitrels = (RelFileNode *) twophase_bufptr;
564-
twophase_bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
565-
abortrels = (RelFileNode *) twophase_bufptr;
566-
twophase_bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
567-
invalmsgs = (SharedInvalidationMessage *) twophase_bufptr;
568-
twophase_bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
569-
570-
memcpy(ctx->reorder->gid, hdr->gid, GIDSIZE);
547+
TransactionId xid = parsed->twophase_xid;
571548

572549
/*
573550
* Process invalidation messages, even if we're not interested in the
574551
* transaction's contents, since the various caches need to always be
575552
* consistent.
576553
*/
577-
if (hdr->ninvalmsgs > 0)
554+
if (parsed->nmsgs > 0)
578555
{
579556
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
580-
hdr->ninvalmsgs, invalmsgs);
557+
parsed->nmsgs, parsed->msgs);
581558
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
582559
}
583560

584561
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
585-
hdr->nsubxacts, children);
562+
parsed->nsubxacts, parsed->subxacts);
563+
564+
586565

587566
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
588-
(hdr->database != InvalidOid && hdr->database != ctx->slot->data.database) ||
567+
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
589568
FilterByOrigin(ctx, origin_id))
590569
{
591-
for (i = 0; i < hdr->nsubxacts; i++)
570+
for (i = 0; i < parsed->nsubxacts; i++)
592571
{
593-
ReorderBufferForget(ctx->reorder, children[i], buf->origptr);
572+
ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
594573
}
595574
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
596575

597576
return;
598577
}
599578

600579
/* tell the reorderbuffer about the surviving subtransactions */
601-
for (i = 0; i < hdr->nsubxacts; i++)
580+
for (i = 0; i < parsed->nsubxacts; i++)
602581
{
603-
ReorderBufferCommitChild(ctx->reorder, xid, children[i],
582+
ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
604583
buf->origptr, buf->endptr);
605584
}
606585

src/include/access/twophase.h

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,50 +25,6 @@
2525
*/
2626
typedef struct GlobalTransactionData *GlobalTransaction;
2727

28-
/************************************************************************/
29-
/* State file support */
30-
/************************************************************************/
31-
32-
#define TwoPhaseFilePath(path, xid) \
33-
snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
34-
35-
/*
36-
* 2PC state file format:
37-
*
38-
* 1. TwoPhaseFileHeader
39-
* 2. TransactionId[] (subtransactions)
40-
* 3. RelFileNode[] (files to be deleted at commit)
41-
* 4. RelFileNode[] (files to be deleted at abort)
42-
* 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
43-
* 6. TwoPhaseRecordOnDisk
44-
* 7. ...
45-
* 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
46-
* 9. checksum (CRC-32C)
47-
*
48-
* Each segment except the final checksum is MAXALIGN'd.
49-
*/
50-
51-
/*
52-
* Header for a 2PC state file
53-
*/
54-
#define TWOPHASE_MAGIC 0x57F94532 /* format identifier */
55-
56-
typedef struct TwoPhaseFileHeader
57-
{
58-
uint32 magic; /* format identifier */
59-
uint32 total_len; /* actual file length */
60-
TransactionId xid; /* original transaction XID */
61-
Oid database; /* OID of database it was in */
62-
TimestampTz prepared_at; /* time of preparation */
63-
Oid owner; /* user running the transaction */
64-
int32 nsubxacts; /* number of following subxact XIDs */
65-
int32 ncommitrels; /* number of delete-on-commit rels */
66-
int32 nabortrels; /* number of delete-on-abort rels */
67-
int32 ninvalmsgs; /* number of cache invalidation messages */
68-
bool initfileinval; /* does relcache init file need invalidation? */
69-
char gid[GIDSIZE]; /* GID for transaction */
70-
} TwoPhaseFileHeader;
71-
7228
/* GUC variable */
7329
extern int max_prepared_xacts;
7430

@@ -91,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
9147

9248
extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
9349
int *nxids_p);
50+
extern void ParsePrepareRecord(uint8 info, char *xlrec,
51+
xl_xact_parsed_prepare *parsed);
9452
extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
9553
extern void RecoverPreparedTransactions(void);
9654

src/include/access/xact.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,23 @@ typedef struct xl_xact_parsed_commit
295295
TimestampTz origin_timestamp;
296296
} xl_xact_parsed_commit;
297297

298+
typedef struct xl_xact_parsed_prepare
299+
{
300+
Oid dbId; /* MyDatabaseId */
301+
302+
int nsubxacts;
303+
TransactionId *subxacts;
304+
305+
int nrels;
306+
RelFileNode *xnodes;
307+
308+
int nmsgs;
309+
SharedInvalidationMessage *msgs;
310+
311+
TransactionId twophase_xid;
312+
char twophase_gid[GIDSIZE];
313+
} xl_xact_parsed_prepare;
314+
298315
typedef struct xl_xact_parsed_abort
299316
{
300317
TimestampTz xact_time;
@@ -307,7 +324,7 @@ typedef struct xl_xact_parsed_abort
307324
RelFileNode *xnodes;
308325

309326
TransactionId twophase_xid; /* only for 2PC */
310-
char twophase_gid[GIDSIZE]; // GIDSIZE
327+
char twophase_gid[GIDSIZE];
311328
} xl_xact_parsed_abort;
312329

313330

0 commit comments

Comments
 (0)