13
13
14
14
#include "postgres.h"
15
15
16
+ #include <unistd.h>
17
+
16
18
#include "libpq-fe.h"
17
19
18
20
#include "miscadmin.h"
@@ -77,7 +79,8 @@ static PGLogicalSyncWorker *MySyncWorker = NULL;
77
79
78
80
79
81
static void
80
- dump_structure (PGLogicalSubscription * sub , const char * snapshot )
82
+ dump_structure (PGLogicalSubscription * sub , const char * destfile ,
83
+ const char * snapshot )
81
84
{
82
85
char pg_dump [MAXPGPATH ];
83
86
uint32 version ;
@@ -95,12 +98,12 @@ dump_structure(PGLogicalSubscription *sub, const char *snapshot)
95
98
96
99
initStringInfo (& command );
97
100
#if PG_VERSION_NUM < 90500
98
- appendStringInfo (& command , "%s --snapshot=\"%s\" -s -N %s -N pglogical_origin -F c -f \"/tmp/pglogical-%d.dump \" \"%s\"" ,
101
+ appendStringInfo (& command , "%s --snapshot=\"%s\" -s -N %s -N pglogical_origin -F c -f \"%s \" \"%s\"" ,
99
102
#else
100
- appendStringInfo (& command , "%s --snapshot=\"%s\" -s -N %s -F c -f \"/tmp/pglogical-%d.dump \" \"%s\"" ,
103
+ appendStringInfo (& command , "%s --snapshot=\"%s\" -s -N %s -F c -f \"%s \" \"%s\"" ,
101
104
#endif
102
- pg_dump , snapshot , EXTENSION_NAME , MyProcPid ,
103
- sub -> origin_if -> dsn );
105
+ pg_dump , snapshot , EXTENSION_NAME ,
106
+ destfile , sub -> origin_if -> dsn );
104
107
105
108
res = system (command .data );
106
109
if (res != 0 )
@@ -112,7 +115,8 @@ dump_structure(PGLogicalSubscription *sub, const char *snapshot)
112
115
113
116
/* TODO: switch to SPI? */
114
117
static void
115
- restore_structure (PGLogicalSubscription * sub , const char * section )
118
+ restore_structure (PGLogicalSubscription * sub , const char * srcfile ,
119
+ const char * section )
116
120
{
117
121
char pg_restore [MAXPGPATH ];
118
122
uint32 version ;
@@ -130,9 +134,8 @@ restore_structure(PGLogicalSubscription *sub, const char *section)
130
134
131
135
initStringInfo (& command );
132
136
appendStringInfo (& command ,
133
- "%s --section=\"%s\" --exit-on-error -1 -d \"%s\" \"/tmp/pglogical-%d.dump\"" ,
134
- pg_restore , section , sub -> target_if -> dsn ,
135
- MyProcPid );
137
+ "%s --section=\"%s\" --exit-on-error -1 -d \"%s\" \"%s\"" ,
138
+ pg_restore , section , sub -> target_if -> dsn , srcfile );
136
139
137
140
res = system (command .data );
138
141
if (res != 0 )
@@ -464,6 +467,16 @@ pglogical_sync_worker_cleanup_cb(int code, Datum arg)
464
467
pglogical_sync_worker_cleanup (sub );
465
468
}
466
469
470
+ static void
471
+ pglogical_sync_tmpfile_cleanup_cb (int code , Datum arg )
472
+ {
473
+ const char * tmpfile = DatumGetCString (arg );
474
+
475
+ if (unlink (tmpfile ) != 0 && errno != ENOENT )
476
+ elog (WARNING , "Failed to clean up pglogical temporary dump file \"%s\" on exit/error" ,
477
+ tmpfile );
478
+ }
479
+
467
480
void
468
481
pglogical_sync_subscription (PGLogicalSubscription * sub )
469
482
{
@@ -523,88 +536,107 @@ pglogical_sync_subscription(PGLogicalSubscription *sub)
523
536
PG_ENSURE_ERROR_CLEANUP (pglogical_sync_worker_cleanup_cb ,
524
537
PointerGetDatum (sub ));
525
538
{
526
- StartTransactionCommand () ;
539
+ StringInfoData tmpfile ;
527
540
528
- originid = ensure_replication_origin (sub -> slot_name );
529
- replorigin_advance (originid , lsn , XactLastCommitEnd , true, true);
530
-
531
- CommitTransactionCommand ();
541
+ oldctx = MemoryContextSwitchTo (myctx );
542
+ initStringInfo (& tmpfile );
543
+ appendStringInfo (& tmpfile , "%s/pglogical-%d.dump" ,
544
+ pglogical_temp_directory , MyProcPid );
545
+ MemoryContextSwitchTo (oldctx );
532
546
533
- if (SyncKindStructure (sync -> kind ))
547
+ PG_ENSURE_ERROR_CLEANUP (pglogical_sync_tmpfile_cleanup_cb ,
548
+ CStringGetDatum (tmpfile .data ));
534
549
{
535
- elog (INFO , "synchronizing structure" );
536
-
537
- status = SYNC_STATUS_STRUCTURE ;
538
550
StartTransactionCommand ();
539
- set_subscription_sync_status (sub -> id , status );
540
- CommitTransactionCommand ();
541
551
542
- /* Dump structure to temp storage. */
543
- dump_structure (sub , snapshot );
552
+ originid = ensure_replication_origin (sub -> slot_name );
553
+ replorigin_advance (originid , lsn , XactLastCommitEnd , true,
554
+ true);
544
555
545
- /* Restore base pre-data structure (types, tables, etc). */
546
- restore_structure (sub , "pre-data" );
547
- }
556
+ CommitTransactionCommand ();
548
557
549
- /* Copy data. */
550
- if (SyncKindData (sync -> kind ))
551
- {
552
- List * tables ;
553
- ListCell * lc ;
558
+ if (SyncKindStructure (sync -> kind ))
559
+ {
560
+ elog (INFO , "synchronizing structure" );
554
561
555
- elog (INFO , "synchronizing data" );
562
+ status = SYNC_STATUS_STRUCTURE ;
563
+ StartTransactionCommand ();
564
+ set_subscription_sync_status (sub -> id , status );
565
+ CommitTransactionCommand ();
556
566
557
- status = SYNC_STATUS_DATA ;
558
- StartTransactionCommand ();
559
- set_subscription_sync_status (sub -> id , status );
560
- CommitTransactionCommand ();
567
+ /* Dump structure to temp storage. */
568
+ dump_structure (sub , tmpfile .data , snapshot );
561
569
562
- tables = copy_replication_sets_data (sub -> origin_if -> dsn ,
563
- sub -> target_if -> dsn ,
564
- snapshot ,
565
- sub -> replication_sets );
570
+ /* Restore base pre-data structure (types, tables, etc). */
571
+ restore_structure (sub , tmpfile .data , "pre-data" );
572
+ }
566
573
567
- /* Store info about all the synchronized tables. */
568
- StartTransactionCommand ();
569
- foreach (lc , tables )
574
+ /* Copy data. */
575
+ if (SyncKindData (sync -> kind ))
570
576
{
571
- RangeVar * rv = ( RangeVar * ) lfirst ( lc ) ;
572
- PGLogicalSyncStatus * oldsync ;
577
+ List * tables ;
578
+ ListCell * lc ;
573
579
574
- oldsync = get_table_sync_status (sub -> id , rv -> schemaname ,
575
- rv -> relname , true);
576
- if (oldsync )
577
- {
578
- set_table_sync_status (sub -> id , rv -> schemaname ,
579
- rv -> relname , SYNC_STATUS_READY );
580
- }
581
- else
580
+ elog (INFO , "synchronizing data" );
581
+
582
+ status = SYNC_STATUS_DATA ;
583
+ StartTransactionCommand ();
584
+ set_subscription_sync_status (sub -> id , status );
585
+ CommitTransactionCommand ();
586
+
587
+ tables = copy_replication_sets_data (sub -> origin_if -> dsn ,
588
+ sub -> target_if -> dsn ,
589
+ snapshot ,
590
+ sub -> replication_sets );
591
+
592
+ /* Store info about all the synchronized tables. */
593
+ StartTransactionCommand ();
594
+ foreach (lc , tables )
582
595
{
583
- PGLogicalSyncStatus newsync ;
584
-
585
- newsync .kind = SYNC_KIND_FULL ;
586
- newsync .subid = sub -> id ;
587
- newsync .nspname = rv -> schemaname ;
588
- newsync .relname = rv -> relname ;
589
- newsync .status = SYNC_STATUS_READY ;
590
- create_local_sync_status (& newsync );
596
+ RangeVar * rv = (RangeVar * ) lfirst (lc );
597
+ PGLogicalSyncStatus * oldsync ;
598
+
599
+ oldsync = get_table_sync_status (sub -> id ,
600
+ rv -> schemaname ,
601
+ rv -> relname , true);
602
+ if (oldsync )
603
+ {
604
+ set_table_sync_status (sub -> id , rv -> schemaname ,
605
+ rv -> relname ,
606
+ SYNC_STATUS_READY );
607
+ }
608
+ else
609
+ {
610
+ PGLogicalSyncStatus newsync ;
611
+
612
+ newsync .kind = SYNC_KIND_FULL ;
613
+ newsync .subid = sub -> id ;
614
+ newsync .nspname = rv -> schemaname ;
615
+ newsync .relname = rv -> relname ;
616
+ newsync .status = SYNC_STATUS_READY ;
617
+ create_local_sync_status (& newsync );
618
+ }
591
619
}
620
+ CommitTransactionCommand ();
592
621
}
593
- CommitTransactionCommand ();
594
- }
595
622
596
- /* Restore post-data structure (indexes, constraints, etc). */
597
- if (SyncKindStructure (sync -> kind ))
598
- {
599
- elog (INFO , "synchronizing constraints" );
623
+ /* Restore post-data structure (indexes, constraints, etc). */
624
+ if (SyncKindStructure (sync -> kind ))
625
+ {
626
+ elog (INFO , "synchronizing constraints" );
600
627
601
- status = SYNC_STATUS_CONSTAINTS ;
602
- StartTransactionCommand ();
603
- set_subscription_sync_status (sub -> id , status );
604
- CommitTransactionCommand ();
628
+ status = SYNC_STATUS_CONSTAINTS ;
629
+ StartTransactionCommand ();
630
+ set_subscription_sync_status (sub -> id , status );
631
+ CommitTransactionCommand ();
605
632
606
- restore_structure (sub , "post-data" );
633
+ restore_structure (sub , tmpfile .data , "post-data" );
634
+ }
607
635
}
636
+ PG_END_ENSURE_ERROR_CLEANUP (pglogical_sync_tmpfile_cleanup_cb ,
637
+ CStringGetDatum (tmpfile .data ));
638
+ pglogical_sync_tmpfile_cleanup_cb (0 ,
639
+ CStringGetDatum (tmpfile .data ));
608
640
}
609
641
PG_END_ENSURE_ERROR_CLEANUP (pglogical_sync_worker_cleanup_cb ,
610
642
PointerGetDatum (sub ));
0 commit comments