Skip to content

Commit 11d9458

Browse files
committed
Merge branch 'PGPROEE9_6_CFS_385' into PGPROEE9_6
2 parents f7ee1e1 + e288375 commit 11d9458

File tree

3 files changed

+197
-64
lines changed

3 files changed

+197
-64
lines changed

src/backend/storage/file/cfs.c

Lines changed: 118 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,18 @@ int cfs_munmap(FileMap* map)
470470
*/
471471
uint32 cfs_alloc_page(FileMap* map, uint32 oldSize, uint32 newSize)
472472
{
473+
uint32 oldPhysSize = pg_atomic_read_u32(&map->hdr.physSize);
474+
uint32 newPhysSize;
475+
476+
do {
477+
newPhysSize = oldPhysSize + newSize;
478+
if (oldPhysSize > newPhysSize)
479+
elog(ERROR, "CFS: segment file exceed 4Gb limit");
480+
} while (!pg_atomic_compare_exchange_u32(&map->hdr.physSize, &oldPhysSize, newPhysSize));
481+
473482
pg_atomic_fetch_add_u32(&map->hdr.usedSize, newSize - oldSize);
474-
return pg_atomic_fetch_add_u32(&map->hdr.physSize, newSize);
483+
484+
return oldPhysSize;
475485
}
476486

477487
/*
@@ -643,12 +653,18 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
643653
return o1 < o2 ? -1 : o1 == o2 ? 0 : 1;
644654
}
645655

656+
typedef enum {
657+
CFS_BACKGROUND,
658+
CFS_EXPLICIT,
659+
CFS_IMPLICIT
660+
} GC_CALL_KIND;
661+
646662
/*
647663
* Perform garbage collection (if required) on the file
648664
* @param map_path - path to the map file (*.cfm).
649665
* @param bacground - GC is performed in background by BGW: surpress error message and set CfsGcLock
650666
*/
651-
static bool cfs_gc_file(char* map_path, bool background)
667+
static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
652668
{
653669
int md;
654670
FileMap* map;
@@ -660,27 +676,37 @@ static bool cfs_gc_file(char* map_path, bool background)
660676
int fd2 = -1;
661677
int md2 = -1;
662678
bool succeed = false;
679+
bool performed = false;
663680
int rc;
664681

665-
666682
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
667-
668-
while (!(background ? (cfs_state->gc_enabled & cfs_state->background_gc_enabled) : cfs_state->gc_enabled))
683+
if (background == CFS_IMPLICIT)
684+
{
685+
if (!cfs_state->gc_enabled)
686+
{
687+
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
688+
return false;
689+
}
690+
}
691+
else
669692
{
670-
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
693+
while (!cfs_state->gc_enabled || (background == CFS_BACKGROUND && !cfs_state->background_gc_enabled))
694+
{
695+
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
671696

672-
rc = WaitLatch(MyLatch,
673-
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
674-
CFS_DISABLE_TIMEOUT /* ms */);
675-
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH))
676-
exit(1);
697+
rc = WaitLatch(MyLatch,
698+
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
699+
CFS_DISABLE_TIMEOUT /* ms */);
700+
if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH))
701+
exit(1);
677702

678-
ResetLatch(MyLatch);
679-
CHECK_FOR_INTERRUPTS();
703+
ResetLatch(MyLatch);
704+
CHECK_FOR_INTERRUPTS();
680705

681-
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
706+
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc, 1);
707+
}
682708
}
683-
if (background)
709+
if (background == CFS_BACKGROUND)
684710
{
685711
LWLockAcquire(CfsGcLock, LW_SHARED); /* avoid race condition with cfs_file_lock */
686712
}
@@ -708,7 +734,7 @@ static bool cfs_gc_file(char* map_path, bool background)
708734
cfs_state->gc_stat.scannedFiles += 1;
709735

710736
/* do we need to perform defragmentation? */
711-
if ((uint64)(physSize - usedSize)*100 > (uint64)physSize*cfs_gc_threshold)
737+
if (physSize > CFS_IMPLICIT_GC_THRESHOLD || (uint64)(physSize - usedSize)*100 > (uint64)physSize*cfs_gc_threshold)
712738
{
713739
long delay = CFS_LOCK_MIN_TIMEOUT;
714740
char* file_path = (char*)palloc(suf+1);
@@ -1030,17 +1056,7 @@ static bool cfs_gc_file(char* map_path, bool background)
10301056
pfree(inodes);
10311057
pfree(newMap);
10321058

1033-
if (cfs_gc_delay != 0)
1034-
{
1035-
int rc = WaitLatch(MyLatch,
1036-
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1037-
cfs_gc_delay /* ms */ );
1038-
if (rc & WL_POSTMASTER_DEATH)
1039-
exit(1);
1040-
1041-
ResetLatch(MyLatch);
1042-
CHECK_FOR_INTERRUPTS();
1043-
}
1059+
performed = true;
10441060
}
10451061
else if (cfs_state->max_iterations == 1)
10461062
elog(LOG, "CFS GC worker %d: file %.*s: physical size %u, logical size %u, used %u, compression ratio %f",
@@ -1058,12 +1074,23 @@ static bool cfs_gc_file(char* map_path, bool background)
10581074
}
10591075

10601076
FinishGC:
1061-
if (background)
1077+
if (background == CFS_BACKGROUND)
10621078
{
10631079
LWLockRelease(CfsGcLock);
10641080
}
10651081
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc, 1);
10661082

1083+
if (cfs_gc_delay != 0 && performed && background == CFS_BACKGROUND)
1084+
{
1085+
int rc = WaitLatch(MyLatch,
1086+
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1087+
cfs_gc_delay /* ms */ );
1088+
if (rc & WL_POSTMASTER_DEATH)
1089+
exit(1);
1090+
1091+
ResetLatch(MyLatch);
1092+
CHECK_FOR_INTERRUPTS();
1093+
}
10671094
return succeed;
10681095
}
10691096

@@ -1097,7 +1124,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
10971124
strcmp(file_path + len - 4, ".cfm") == 0)
10981125
{
10991126
if (entry->d_ino % cfs_state->n_workers == worker_id
1100-
&& !cfs_gc_file(file_path, true))
1127+
&& !cfs_gc_file(file_path, CFS_BACKGROUND))
11011128
{
11021129
success = false;
11031130
break;
@@ -1195,6 +1222,7 @@ bool cfs_control_gc(bool enabled)
11951222
{
11961223
bool was_enabled = cfs_state->gc_enabled;
11971224
cfs_state->gc_enabled = enabled;
1225+
pg_memory_barrier();
11981226
if (was_enabled && !enabled)
11991227
{
12001228
/* Wait until there are no active GC workers */
@@ -1238,9 +1266,7 @@ Datum cfs_start_gc(PG_FUNCTION_ARGS)
12381266
int j;
12391267
BackgroundWorkerHandle** handles;
12401268

1241-
cfs_gc_stop = true; /* do just one iteration */
1242-
1243-
cfs_state->max_iterations = 1;
1269+
cfs_state->max_iterations = 1; /* do just one iteration */
12441270
cfs_state->n_workers = PG_GETARG_INT32(0);
12451271
handles = (BackgroundWorkerHandle**)palloc(cfs_state->n_workers*sizeof(BackgroundWorkerHandle*));
12461272

@@ -1460,45 +1486,95 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
14601486
char* path;
14611487
char* map_path;
14621488
int i = 0;
1463-
1464-
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
1489+
bool stop = false;
14651490

14661491
processed_segments = cfs_gc_processed_segments;
14671492

14681493
path = relpathbackend(rel->rd_node, rel->rd_backend, MAIN_FORKNUM);
14691494
map_path = (char*)palloc(strlen(path) + 16);
14701495
sprintf(map_path, "%s.cfm", path);
14711496

1472-
while (cfs_gc_file(map_path, false))
1497+
do
14731498
{
1499+
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
1500+
stop = !cfs_gc_file(map_path, CFS_EXPLICIT);
1501+
LWLockRelease(CfsGcLock);
14741502
sprintf(map_path, "%s.%u.cfm", path, ++i);
1475-
}
1503+
} while(!stop);
14761504
pfree(path);
14771505
pfree(map_path);
14781506
relation_close(rel, AccessShareLock);
14791507

14801508
processed_segments = cfs_gc_processed_segments - processed_segments;
1481-
1482-
LWLockRelease(CfsGcLock);
14831509
}
14841510
PG_RETURN_INT32(processed_segments);
14851511
}
14861512

14871513

1488-
void cfs_gc_segment(char const* fileName)
1514+
void cfs_gc_segment(char const* fileName, bool optional)
14891515
{
1490-
char* mapFileName = psprintf("%s.cfm", fileName);
1516+
char* mapFileName;
14911517

1492-
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
1518+
if (optional)
1519+
{
1520+
if (!LWLockConditionalAcquire(CfsGcLock, LW_EXCLUSIVE)) /* Prevent interaction with background GC */
1521+
return;
1522+
}
1523+
else
1524+
LWLockAcquire(CfsGcLock, LW_EXCLUSIVE); /* Prevent interaction with background GC */
14931525

1494-
cfs_gc_file(mapFileName, false);
1526+
mapFileName = psprintf("%s.cfm", fileName);
14951527

1528+
cfs_gc_file(mapFileName, optional ? CFS_IMPLICIT : CFS_EXPLICIT);
14961529
LWLockRelease(CfsGcLock);
14971530

14981531
pfree(mapFileName);
14991532
}
15001533

15011534

1535+
void cfs_recover_map(FileMap* map)
1536+
{
1537+
int i;
1538+
uint32 physSize;
1539+
uint32 virtSize;
1540+
uint32 usedSize = 0;
1541+
1542+
physSize = pg_atomic_read_u32(&map->hdr.physSize);
1543+
virtSize = pg_atomic_read_u32(&map->hdr.virtSize);
1544+
1545+
for (i = 0; i < RELSEG_SIZE; i++)
1546+
{
1547+
inode_t inode = map->inodes[i];
1548+
int size = CFS_INODE_SIZE(inode);
1549+
if (size != 0)
1550+
{
1551+
uint32 offs = CFS_INODE_OFFS(inode);
1552+
if (offs + size > physSize)
1553+
{
1554+
physSize = offs + size;
1555+
}
1556+
if ((i+1)*BLCKSZ > virtSize)
1557+
{
1558+
virtSize = (i+1)*BLCKSZ;
1559+
}
1560+
usedSize += size;
1561+
}
1562+
if (usedSize != pg_atomic_read_u32(&map->hdr.usedSize))
1563+
{
1564+
pg_atomic_write_u32(&map->hdr.usedSize, usedSize);
1565+
}
1566+
if (physSize != pg_atomic_read_u32(&map->hdr.physSize))
1567+
{
1568+
pg_atomic_write_u32(&map->hdr.physSize, physSize);
1569+
}
1570+
if (virtSize != pg_atomic_read_u32(&map->hdr.virtSize))
1571+
{
1572+
pg_atomic_write_u32(&map->hdr.virtSize, virtSize);
1573+
}
1574+
}
1575+
}
1576+
1577+
15021578
Datum cfs_gc_activity_processed_bytes(PG_FUNCTION_ARGS)
15031579
{
15041580
PG_RETURN_INT64(cfs_state->gc_stat.processedBytes);

0 commit comments

Comments
 (0)