Skip to content

Commit 8b733a4

Browse files
committed
Cleanup the temp dump file after we are done with it.
In passing make the temp dump path configurable.
1 parent 61931cf commit 8b733a4

File tree

3 files changed

+118
-72
lines changed

3 files changed

+118
-72
lines changed

pglogical.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ static const struct config_enum_entry PGLogicalConflictResolvers[] = {
4747
{NULL, 0, false}
4848
};
4949

50-
bool pglogical_synchronous_commit = false;
50+
bool pglogical_synchronous_commit = false;
51+
char *pglogical_temp_directory;
5152

5253
void _PG_init(void);
5354
void pglogical_supervisor_main(Datum main_arg);
@@ -417,6 +418,18 @@ _PG_init(void)
417418
0,
418419
NULL, NULL, NULL);
419420

421+
/*
422+
* We can't use the temp_tablespace safely for our dumps, because Pg's
423+
* crash recovery is very careful to delete only particularly formatted
424+
* files. Instead for now just allow user to specify dump storage.
425+
*/
426+
DefineCustomStringVariable("pglogical.temp_directory",
427+
"Directory to store dumps for local restore",
428+
NULL,
429+
&pglogical_temp_directory,
430+
"/tmp", PGC_SIGHUP,
431+
0,
432+
NULL, NULL, NULL);
420433
if (IsBinaryUpgrade)
421434
return;
422435

pglogical.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#define REPLICATION_ORIGIN_ALL "all"
4242

4343
extern bool pglogical_synchronous_commit;
44+
extern char *pglogical_temp_directory;
4445

4546
extern char *shorten_hash(const char *str, int maxlen);
4647

pglogical_sync.c

Lines changed: 103 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
#include "postgres.h"
1515

16+
#include <unistd.h>
17+
1618
#include "libpq-fe.h"
1719

1820
#include "miscadmin.h"
@@ -77,7 +79,8 @@ static PGLogicalSyncWorker *MySyncWorker = NULL;
7779

7880

7981
static void
80-
dump_structure(PGLogicalSubscription *sub, const char *snapshot)
82+
dump_structure(PGLogicalSubscription *sub, const char *destfile,
83+
const char *snapshot)
8184
{
8285
char pg_dump[MAXPGPATH];
8386
uint32 version;
@@ -95,12 +98,12 @@ dump_structure(PGLogicalSubscription *sub, const char *snapshot)
9598

9699
initStringInfo(&command);
97100
#if PG_VERSION_NUM < 90500
98-
appendStringInfo(&command, "%s --snapshot=\"%s\" -s -N %s -N pglogical_origin -F c -f \"/tmp/pglogical-%d.dump\" \"%s\"",
101+
appendStringInfo(&command, "%s --snapshot=\"%s\" -s -N %s -N pglogical_origin -F c -f \"%s\" \"%s\"",
99102
#else
100-
appendStringInfo(&command, "%s --snapshot=\"%s\" -s -N %s -F c -f \"/tmp/pglogical-%d.dump\" \"%s\"",
103+
appendStringInfo(&command, "%s --snapshot=\"%s\" -s -N %s -F c -f \"%s\" \"%s\"",
101104
#endif
102-
pg_dump, snapshot, EXTENSION_NAME, MyProcPid,
103-
sub->origin_if->dsn);
105+
pg_dump, snapshot, EXTENSION_NAME,
106+
destfile, sub->origin_if->dsn);
104107

105108
res = system(command.data);
106109
if (res != 0)
@@ -112,7 +115,8 @@ dump_structure(PGLogicalSubscription *sub, const char *snapshot)
112115

113116
/* TODO: switch to SPI? */
114117
static void
115-
restore_structure(PGLogicalSubscription *sub, const char *section)
118+
restore_structure(PGLogicalSubscription *sub, const char *srcfile,
119+
const char *section)
116120
{
117121
char pg_restore[MAXPGPATH];
118122
uint32 version;
@@ -130,9 +134,8 @@ restore_structure(PGLogicalSubscription *sub, const char *section)
130134

131135
initStringInfo(&command);
132136
appendStringInfo(&command,
133-
"%s --section=\"%s\" --exit-on-error -1 -d \"%s\" \"/tmp/pglogical-%d.dump\"",
134-
pg_restore, section, sub->target_if->dsn,
135-
MyProcPid);
137+
"%s --section=\"%s\" --exit-on-error -1 -d \"%s\" \"%s\"",
138+
pg_restore, section, sub->target_if->dsn, srcfile);
136139

137140
res = system(command.data);
138141
if (res != 0)
@@ -464,6 +467,16 @@ pglogical_sync_worker_cleanup_cb(int code, Datum arg)
464467
pglogical_sync_worker_cleanup(sub);
465468
}
466469

470+
static void
471+
pglogical_sync_tmpfile_cleanup_cb(int code, Datum arg)
472+
{
473+
const char *tmpfile = DatumGetCString(arg);
474+
475+
if (unlink(tmpfile) != 0 && errno != ENOENT)
476+
elog(WARNING, "Failed to clean up pglogical temporary dump file \"%s\" on exit/error",
477+
tmpfile);
478+
}
479+
467480
void
468481
pglogical_sync_subscription(PGLogicalSubscription *sub)
469482
{
@@ -523,88 +536,107 @@ pglogical_sync_subscription(PGLogicalSubscription *sub)
523536
PG_ENSURE_ERROR_CLEANUP(pglogical_sync_worker_cleanup_cb,
524537
PointerGetDatum(sub));
525538
{
526-
StartTransactionCommand();
539+
StringInfoData tmpfile;
527540

528-
originid = ensure_replication_origin(sub->slot_name);
529-
replorigin_advance(originid, lsn, XactLastCommitEnd, true, true);
530-
531-
CommitTransactionCommand();
541+
oldctx = MemoryContextSwitchTo(myctx);
542+
initStringInfo(&tmpfile);
543+
appendStringInfo(&tmpfile, "%s/pglogical-%d.dump",
544+
pglogical_temp_directory, MyProcPid);
545+
MemoryContextSwitchTo(oldctx);
532546

533-
if (SyncKindStructure(sync->kind))
547+
PG_ENSURE_ERROR_CLEANUP(pglogical_sync_tmpfile_cleanup_cb,
548+
CStringGetDatum(tmpfile.data));
534549
{
535-
elog(INFO, "synchronizing structure");
536-
537-
status = SYNC_STATUS_STRUCTURE;
538550
StartTransactionCommand();
539-
set_subscription_sync_status(sub->id, status);
540-
CommitTransactionCommand();
541551

542-
/* Dump structure to temp storage. */
543-
dump_structure(sub, snapshot);
552+
originid = ensure_replication_origin(sub->slot_name);
553+
replorigin_advance(originid, lsn, XactLastCommitEnd, true,
554+
true);
544555

545-
/* Restore base pre-data structure (types, tables, etc). */
546-
restore_structure(sub, "pre-data");
547-
}
556+
CommitTransactionCommand();
548557

549-
/* Copy data. */
550-
if (SyncKindData(sync->kind))
551-
{
552-
List *tables;
553-
ListCell *lc;
558+
if (SyncKindStructure(sync->kind))
559+
{
560+
elog(INFO, "synchronizing structure");
554561

555-
elog(INFO, "synchronizing data");
562+
status = SYNC_STATUS_STRUCTURE;
563+
StartTransactionCommand();
564+
set_subscription_sync_status(sub->id, status);
565+
CommitTransactionCommand();
556566

557-
status = SYNC_STATUS_DATA;
558-
StartTransactionCommand();
559-
set_subscription_sync_status(sub->id, status);
560-
CommitTransactionCommand();
567+
/* Dump structure to temp storage. */
568+
dump_structure(sub, tmpfile.data, snapshot);
561569

562-
tables = copy_replication_sets_data(sub->origin_if->dsn,
563-
sub->target_if->dsn,
564-
snapshot,
565-
sub->replication_sets);
570+
/* Restore base pre-data structure (types, tables, etc). */
571+
restore_structure(sub, tmpfile.data, "pre-data");
572+
}
566573

567-
/* Store info about all the synchronized tables. */
568-
StartTransactionCommand();
569-
foreach (lc, tables)
574+
/* Copy data. */
575+
if (SyncKindData(sync->kind))
570576
{
571-
RangeVar *rv = (RangeVar *) lfirst(lc);
572-
PGLogicalSyncStatus *oldsync;
577+
List *tables;
578+
ListCell *lc;
573579

574-
oldsync = get_table_sync_status(sub->id, rv->schemaname,
575-
rv->relname, true);
576-
if (oldsync)
577-
{
578-
set_table_sync_status(sub->id, rv->schemaname,
579-
rv->relname, SYNC_STATUS_READY);
580-
}
581-
else
580+
elog(INFO, "synchronizing data");
581+
582+
status = SYNC_STATUS_DATA;
583+
StartTransactionCommand();
584+
set_subscription_sync_status(sub->id, status);
585+
CommitTransactionCommand();
586+
587+
tables = copy_replication_sets_data(sub->origin_if->dsn,
588+
sub->target_if->dsn,
589+
snapshot,
590+
sub->replication_sets);
591+
592+
/* Store info about all the synchronized tables. */
593+
StartTransactionCommand();
594+
foreach (lc, tables)
582595
{
583-
PGLogicalSyncStatus newsync;
584-
585-
newsync.kind = SYNC_KIND_FULL;
586-
newsync.subid = sub->id;
587-
newsync.nspname = rv->schemaname;
588-
newsync.relname = rv->relname;
589-
newsync.status = SYNC_STATUS_READY;
590-
create_local_sync_status(&newsync);
596+
RangeVar *rv = (RangeVar *) lfirst(lc);
597+
PGLogicalSyncStatus *oldsync;
598+
599+
oldsync = get_table_sync_status(sub->id,
600+
rv->schemaname,
601+
rv->relname, true);
602+
if (oldsync)
603+
{
604+
set_table_sync_status(sub->id, rv->schemaname,
605+
rv->relname,
606+
SYNC_STATUS_READY);
607+
}
608+
else
609+
{
610+
PGLogicalSyncStatus newsync;
611+
612+
newsync.kind = SYNC_KIND_FULL;
613+
newsync.subid = sub->id;
614+
newsync.nspname = rv->schemaname;
615+
newsync.relname = rv->relname;
616+
newsync.status = SYNC_STATUS_READY;
617+
create_local_sync_status(&newsync);
618+
}
591619
}
620+
CommitTransactionCommand();
592621
}
593-
CommitTransactionCommand();
594-
}
595622

596-
/* Restore post-data structure (indexes, constraints, etc). */
597-
if (SyncKindStructure(sync->kind))
598-
{
599-
elog(INFO, "synchronizing constraints");
623+
/* Restore post-data structure (indexes, constraints, etc). */
624+
if (SyncKindStructure(sync->kind))
625+
{
626+
elog(INFO, "synchronizing constraints");
600627

601-
status = SYNC_STATUS_CONSTAINTS;
602-
StartTransactionCommand();
603-
set_subscription_sync_status(sub->id, status);
604-
CommitTransactionCommand();
628+
status = SYNC_STATUS_CONSTAINTS;
629+
StartTransactionCommand();
630+
set_subscription_sync_status(sub->id, status);
631+
CommitTransactionCommand();
605632

606-
restore_structure(sub, "post-data");
633+
restore_structure(sub, tmpfile.data, "post-data");
634+
}
607635
}
636+
PG_END_ENSURE_ERROR_CLEANUP(pglogical_sync_tmpfile_cleanup_cb,
637+
CStringGetDatum(tmpfile.data));
638+
pglogical_sync_tmpfile_cleanup_cb(0,
639+
CStringGetDatum(tmpfile.data));
608640
}
609641
PG_END_ENSURE_ERROR_CLEANUP(pglogical_sync_worker_cleanup_cb,
610642
PointerGetDatum(sub));

0 commit comments

Comments
 (0)