Skip to content

Commit a8d6ebf

Browse files
committed
Merge branch 'PGPROEE9_6_CFS_385' into PGPROEE9_6
2 parents 89cd7eb + 598030c commit a8d6ebf

File tree

6 files changed

+172
-134
lines changed

6 files changed

+172
-134
lines changed

src/backend/storage/file/cfs.c

Lines changed: 130 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -389,23 +389,31 @@ void cfs_decrypt(const char* fname, void* block, uint32 offs, uint32 size)
389389
* Section 3: Compression implementation.
390390
* ----------------------------------------------------------------
391391
*/
392-
void cfs_initialize()
392+
int cfs_shmem_size()
393393
{
394-
cfs_state = (CfsState*)ShmemAlloc(sizeof(CfsState));
395-
memset(&cfs_state->gc_stat, 0, sizeof cfs_state->gc_stat);
396-
pg_atomic_init_flag(&cfs_state->gc_started);
397-
pg_atomic_init_u32(&cfs_state->n_active_gc, 0);
398-
cfs_state->n_workers = 0;
399-
cfs_state->gc_enabled = true;
400-
cfs_state->max_iterations = 0;
401-
402-
if (cfs_encryption)
403-
cfs_crypto_init();
404-
405-
elog(LOG, "Start CFS version %s compression algorithm %s encryption %s",
406-
CFS_VERSION, cfs_algorithm(), cfs_encryption ? "enabled" : "disabled");
394+
return sizeof(CfsState);
407395
}
408396

397+
void cfs_initialize()
398+
{
399+
bool found;
400+
cfs_state = (CfsState*)ShmemInitStruct("CFS Control", sizeof(CfsState), &found);
401+
if (!found)
402+
{
403+
memset(&cfs_state->gc_stat, 0, sizeof cfs_state->gc_stat);
404+
pg_atomic_init_flag(&cfs_state->gc_started);
405+
pg_atomic_init_u32(&cfs_state->n_active_gc, 0);
406+
cfs_state->n_workers = 0;
407+
cfs_state->gc_enabled = cfs_gc_enabled;
408+
cfs_state->max_iterations = 0;
409+
410+
if (cfs_encryption)
411+
cfs_crypto_init();
412+
413+
elog(LOG, "Start CFS version %s compression algorithm %s encryption %s GC %s",
414+
CFS_VERSION, cfs_algorithm(), cfs_encryption ? "enabled" : "disabled", cfs_gc_enabled ? "enabled" : "disabled");
415+
}
416+
}
409417
int cfs_msync(FileMap* map)
410418
{
411419
#ifdef WIN32
@@ -540,86 +548,72 @@ static bool cfs_write_file(int fd, void const* data, uint32 size)
540548
void cfs_lock_file(FileMap* map, char const* file_path)
541549
{
542550
long delay = CFS_LOCK_MIN_TIMEOUT;
543-
int n_attempts = 0;
544551

545552
while (true)
546553
{
547-
uint64 count = pg_atomic_fetch_add_u32(&map->lock, 1);
548-
bool revokeLock = false;
554+
uint32 count = pg_atomic_fetch_add_u32(&map->lock, 1);
549555

550556
if (count < CFS_GC_LOCK)
551-
break;
552-
553-
if (InRecovery)
554557
{
555-
revokeLock = true;
556-
}
557-
else
558-
{
559-
if (pg_atomic_unlocked_test_flag(&cfs_state->gc_started))
560-
{
561-
if (++n_attempts > MAX_LOCK_ATTEMPTS)
562-
{
563-
/* So there is GC lock, but no active GC process during MAX_LOCK_ATTEMPTS.
564-
* Most likely it means that GC is crashed (may be together with other postgres processes or even OS)
565-
* without releasing lock. And for some reasons recovery was not performed and this page left locked.
566-
* We should revoke the the lock to allow access to this segment.
567-
*/
568-
revokeLock = true;
569-
elog(WARNING, "CFS revokes lock on file %s\n", file_path);
570-
}
571-
}
572-
else
573-
{
574-
n_attempts = 0; /* Reset counter of attempts because GC is in progress */
575-
}
558+
/* No GC is active for this segment */
559+
break;
576560
}
577-
if (revokeLock
578-
/* use gc_started flag to prevent race condition with other backends and GC */
579-
&& pg_atomic_test_set_flag(&cfs_state->gc_started))
580-
{
581-
/* Ugggh... looks like last GC was interrupted.
582-
* Try to recover the file.
583-
*/
584-
char* map_bck_path = psprintf("%s.cfm.bck", file_path);
585-
char* file_bck_path = psprintf("%s.bck", file_path);
586561

587-
elog(WARNING, "CFS indicates that GC of %s was interrupted: try to perform recovery", file_path);
562+
if (pg_atomic_read_u32(&cfs_state->n_active_gc) == 0)
563+
{
564+
/* There is no active GC, so lock is set by crashed GC */
588565

589-
if (access(file_bck_path, R_OK) != 0)
590-
{
591-
/* There is no backup file: new map should be constructed */
592-
int md2 = open(map_bck_path, O_RDWR|PG_BINARY, 0);
593-
if (md2 >= 0)
594-
{
595-
/* Recover map. */
596-
if (!cfs_read_file(md2, map, sizeof(FileMap)))
597-
elog(WARNING, "CFS failed to read file %s: %m", map_bck_path);
566+
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent race condition with GC */
598567

599-
close(md2);
600-
}
601-
}
602-
else
568+
/* Recheck under CfsGcLock that map->lock was not released */
569+
if (pg_atomic_read_u32(&map->lock) >= CFS_GC_LOCK)
603570
{
604-
/* Presence of backup file means that we still have
605-
* unchanged data and map files. Just remove backup files and
606-
* revoke GC lock.
571+
/* Uhhh... looks like last GC was interrupted.
572+
* Try to recover the file.
607573
*/
608-
unlink(file_bck_path);
609-
unlink(map_bck_path);
574+
char* map_bck_path = psprintf("%s.cfm.bck", file_path);
575+
char* file_bck_path = psprintf("%s.bck", file_path);
576+
577+
elog(WARNING, "CFS indicates that GC of %s was interrupted: trying to perform recovery", file_path);
578+
579+
if (access(file_bck_path, R_OK) != 0)
580+
{
581+
/* There is no backup file: new map should be constructed */
582+
int md2 = open(map_bck_path, O_RDWR|PG_BINARY, 0);
583+
if (md2 >= 0)
584+
{
585+
/* Recover map. */
586+
if (!cfs_read_file(md2, map, sizeof(FileMap)))
587+
elog(WARNING, "CFS failed to read file %s: %m", map_bck_path);
588+
589+
close(md2);
590+
}
591+
}
592+
else
593+
{
594+
/* Presence of backup file means that we still have
595+
* unchanged data and map files. Just remove backup files and
596+
* revoke GC lock.
597+
*/
598+
unlink(file_bck_path);
599+
unlink(map_bck_path);
600+
}
601+
602+
count = pg_atomic_fetch_sub_u32(&map->lock, CFS_GC_LOCK); /* revoke GC lock */
603+
Assert((int)count > 0);
604+
pfree(file_bck_path);
605+
pfree(map_bck_path);
610606
}
611-
612-
pg_atomic_clear_flag(&cfs_state->gc_started);
613-
count = pg_atomic_fetch_sub_u32(&map->lock, CFS_GC_LOCK); /* revoke GC lock */
614-
Assert((int)count > 0);
615-
pfree(file_bck_path);
616-
pfree(map_bck_path);
607+
LWLockRelease(CfsGcLock);
617608
break;
618-
}
609+
}
610+
/* Wait until GC of segment is completed */
619611
pg_atomic_fetch_sub_u32(&map->lock, 1);
620612
pg_usleep(delay);
621613
if (delay < CFS_LOCK_MAX_TIMEOUT)
614+
{
622615
delay *= 2;
616+
}
623617
}
624618

625619
if (IsUnderPostmaster && cfs_gc_workers != 0
@@ -649,11 +643,11 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
649643
/*
650644
* Perform garbage collection (if required) on the file
651645
* @param map_path - path to the map file (*.cfm).
652-
* @param noerror - surpress error message (when this function is called by cfs_gc_relation until there are available segments)
646+
* @param bacground - GC is performed in background by BGW: surpress error message and set CfsGcLock
653647
*/
654-
static bool cfs_gc_file(char* map_path, bool noerror)
648+
static bool cfs_gc_file(char* map_path, bool background)
655649
{
656-
int md = open(map_path, O_RDWR|PG_BINARY, 0);
650+
int md;
657651
FileMap* map;
658652
uint32 physSize;
659653
uint32 usedSize;
@@ -663,29 +657,33 @@ static bool cfs_gc_file(char* map_path, bool noerror)
663657
int fd2 = -1;
664658
int md2 = -1;
665659
bool succeed = false;
660+
int rc;
661+
666662

667663
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
668664

669-
while (!cfs_state->gc_enabled)
665+
if (background)
670666
{
671-
int rc;
672-
673-
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
674-
675-
rc = WaitLatch(MyLatch,
676-
WL_TIMEOUT | WL_POSTMASTER_DEATH,
677-
CFS_DISABLE_TIMEOUT /* ms */);
678-
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH))
679-
exit(1);
667+
while (!cfs_state->gc_enabled)
668+
{
669+
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
670+
671+
rc = WaitLatch(MyLatch,
672+
WL_TIMEOUT | WL_POSTMASTER_DEATH,
673+
CFS_DISABLE_TIMEOUT /* ms */);
674+
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH))
675+
exit(1);
676+
677+
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
678+
}
680679

681-
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
680+
LWLockAcquire(CfsGcLock, LW_SHARED); /* avoid race condition with cfs_file_lock */
682681
}
683682

683+
md = open(map_path, O_RDWR|PG_BINARY, 0);
684684
if (md < 0)
685685
{
686-
if (!noerror) {
687-
elog(WARNING, "CFS failed to open map file %s: %m", map_path);
688-
}
686+
elog(DEBUG1, "CFS failed to open map file %s: %m", map_path);
689687
goto FinishGC;
690688
}
691689

@@ -766,6 +764,11 @@ static bool cfs_gc_file(char* map_path, bool noerror)
766764
remove_backups = false;
767765
goto ReplaceMap;
768766
}
767+
else
768+
{
769+
/* No backups - nothing has to be recovered. Just release GC lock */
770+
break;
771+
}
769772
}
770773
else
771774
{
@@ -908,7 +911,7 @@ static bool cfs_gc_file(char* map_path, bool noerror)
908911
{
909912
inode_t inode = newMap->inodes[i];
910913
int size = CFS_INODE_SIZE(inode);
911-
if (size != 0)
914+
if (size != 0 && size < BLCKSZ)
912915
{
913916
char block[BLCKSZ];
914917
char decomressedBlock[BLCKSZ];
@@ -926,7 +929,7 @@ static bool cfs_gc_file(char* map_path, bool noerror)
926929
pg_atomic_fetch_sub_u32(&map->lock, CFS_GC_LOCK); /* release lock */
927930
/* TODO Is it worth to PANIC or ERROR will be enough? */
928931
elog(PANIC, "Verification failed for block %d of relation %s: error code %d",
929-
i, file_path, (int)res);
932+
i, file_bck_path, (int)res);
930933
}
931934
}
932935
}
@@ -1032,7 +1035,12 @@ static bool cfs_gc_file(char* map_path, bool noerror)
10321035
}
10331036

10341037
FinishGC:
1038+
if (background)
1039+
{
1040+
LWLockRelease(CfsGcLock);
1041+
}
10351042
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
1043+
10361044
return succeed;
10371045
}
10381046

@@ -1066,7 +1074,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
10661074
strcmp(file_path + len - 4, ".cfm") == 0)
10671075
{
10681076
if (entry->d_ino % cfs_state->n_workers == worker_id
1069-
&& !cfs_gc_file(file_path, false))
1077+
&& !cfs_gc_file(file_path, true))
10701078
{
10711079
success = false;
10721080
break;
@@ -1395,31 +1403,35 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
13951403

13961404
Datum cfs_gc_relation(PG_FUNCTION_ARGS)
13971405
{
1398-
cfs_gc_processed_segments = 0;
1399-
1400-
if (cfs_gc_workers == 0 && pg_atomic_test_set_flag(&cfs_state->gc_started))
1406+
Oid oid = PG_GETARG_OID(0);
1407+
Relation rel = try_relation_open(oid, AccessShareLock);
1408+
int processed_segments = 0;
1409+
1410+
if (rel != NULL)
14011411
{
1402-
Oid oid = PG_GETARG_OID(0);
1403-
Relation rel = try_relation_open(oid, AccessShareLock);
1404-
1405-
if (rel != NULL)
1406-
{
1407-
char* path = relpathbackend(rel->rd_node, rel->rd_backend, MAIN_FORKNUM);
1408-
char* map_path = (char*)palloc(strlen(path) + 16);
1409-
int i = 0;
1410-
sprintf(map_path, "%s.cfm", path);
1412+
char* path;
1413+
char* map_path;
1414+
int i = 0;
1415+
1416+
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
1417+
1418+
processed_segments = cfs_gc_processed_segments;
1419+
1420+
path = relpathbackend(rel->rd_node, rel->rd_backend, MAIN_FORKNUM);
1421+
map_path = (char*)palloc(strlen(path) + 16);
1422+
sprintf(map_path, "%s.cfm", path);
14111423

1412-
while (true)
1413-
{
1414-
if (!cfs_gc_file(map_path, true))
1415-
break;
1416-
sprintf(map_path, "%s.%u.cfm", path, ++i);
1417-
}
1418-
pfree(path);
1419-
pfree(map_path);
1420-
relation_close(rel, AccessShareLock);
1424+
while (cfs_gc_file(map_path, false))
1425+
{
1426+
sprintf(map_path, "%s.%u.cfm", path, ++i);
14211427
}
1422-
pg_atomic_clear_flag(&cfs_state->gc_started);
1428+
pfree(path);
1429+
pfree(map_path);
1430+
relation_close(rel, AccessShareLock);
1431+
1432+
processed_segments -= cfs_gc_processed_segments;
1433+
1434+
LWLockRelease(CfsGcLock);
14231435
}
14241436
PG_RETURN_INT32(cfs_gc_processed_segments);
14251437
}

src/backend/storage/ipc/ipci.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "storage/procsignal.h"
4545
#include "storage/sinvaladt.h"
4646
#include "storage/spin.h"
47+
#include "storage/cfs.h"
4748
#include "utils/snapmgr.h"
4849

4950

@@ -142,6 +143,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
142143
size = add_size(size, BTreeShmemSize());
143144
size = add_size(size, SyncScanShmemSize());
144145
size = add_size(size, AsyncShmemSize());
146+
size = add_size(size, cfs_shmem_size());
145147
#ifdef EXEC_BACKEND
146148
size = add_size(size, ShmemBackendArraySize());
147149
#endif
@@ -254,7 +256,8 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
254256
BTreeShmemInit();
255257
SyncScanShmemInit();
256258
AsyncShmemInit();
257-
259+
cfs_initialize();
260+
258261
/*
259262
* Init array of Latches in SHMEM for WAITLSN
260263
*/

src/backend/storage/lmgr/lwlocknames.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ CommitTsLock 39
4747
ReplicationOriginLock 40
4848
MultiXactTruncationLock 41
4949
OldSnapshotTimeMapLock 42
50+
CfsGcLock 43

0 commit comments

Comments
 (0)