Skip to content

Commit 71275ca

Browse files
author
Nikita Glukhov
committed
Add attribute compression/decompression
1 parent ac607a1 commit 71275ca

File tree

18 files changed

+226
-47
lines changed

18 files changed

+226
-47
lines changed

src/backend/access/brin/brin_tuple.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,8 @@ brin_deconstruct_tuple(BrinDesc *brdesc,
572572
off = att_align_nominal(off, thisatt->attalign);
573573
}
574574

575-
values[stored++] = fetchatt(thisatt, tp + off);
575+
values[stored] = fetchatt(diskdsc, stored, tp + off);
576+
stored++;
576577

577578
off = att_addlength_pointer(off, thisatt->attlen, tp + off);
578579
}

src/backend/access/common/heaptuple.c

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757

5858
#include "postgres.h"
5959

60+
#include "access/compression.h"
6061
#include "access/sysattr.h"
6162
#include "access/tuptoaster.h"
6263
#include "executor/tuptable.h"
@@ -410,8 +411,7 @@ nocachegetattr(HeapTuple tuple,
410411
*/
411412
if (att[attnum]->attcacheoff >= 0)
412413
{
413-
return fetchatt(att[attnum],
414-
tp + att[attnum]->attcacheoff);
414+
return fetchatt(tupleDesc, attnum, tp + att[attnum]->attcacheoff);
415415
}
416416

417417
/*
@@ -536,7 +536,7 @@ nocachegetattr(HeapTuple tuple,
536536
}
537537
}
538538

539-
return fetchatt(att[attnum], tp + off);
539+
return fetchatt(tupleDesc, attnum, tp + off);
540540
}
541541

542542
/* ----------------
@@ -681,6 +681,32 @@ heap_copy_tuple_as_datum(HeapTuple tuple, TupleDesc tupleDesc)
681681
return PointerGetDatum(td);
682682
}
683683

684+
static inline CompressionMethodRoutine *
685+
TupleDescGetCompressionMethodRoutine(TupleDesc tupdesc, AttrNumber attnum)
686+
{
687+
CompressionMethodRoutine *cmr;
688+
Assert(tupdesc->tdcmroutines);
689+
cmr = tupdesc->tdcmroutines[attnum];
690+
Assert(cmr);
691+
return cmr;
692+
}
693+
694+
Datum
695+
tuple_compress_attr(TupleDesc tupdesc, AttrNumber attnum, Datum value)
696+
{
697+
CompressionMethodRoutine *cmr =
698+
TupleDescGetCompressionMethodRoutine(tupdesc, attnum);
699+
return (*cmr->compress)(value, tupdesc->attrs[attnum]);
700+
}
701+
702+
Datum
703+
tuple_decompress_attr(TupleDesc tupdesc, int attnum, Datum value)
704+
{
705+
CompressionMethodRoutine *cmr =
706+
TupleDescGetCompressionMethodRoutine(tupdesc, attnum);
707+
return (*cmr->decompress)(value, tupdesc->attrs[attnum]);
708+
}
709+
684710
/*
685711
* heap_form_tuple
686712
* construct a tuple from the given values[] and isnull[] arrays,
@@ -695,6 +721,7 @@ heap_form_tuple(TupleDesc tupleDescriptor,
695721
{
696722
HeapTuple tuple; /* return tuple */
697723
HeapTupleHeader td; /* tuple data */
724+
Datum *oldValues = NULL;
698725
Size len,
699726
data_len;
700727
int hoff;
@@ -716,7 +743,21 @@ heap_form_tuple(TupleDesc tupleDescriptor,
716743
if (isnull[i])
717744
{
718745
hasnull = true;
719-
break;
746+
747+
if (!tupleDescriptor->tdcmroutines)
748+
break;
749+
}
750+
else if (/* XXX compress && */
751+
OidIsValid(tupleDescriptor->attrs[i]->attcompression))
752+
{
753+
if (!oldValues)
754+
{
755+
oldValues = values;
756+
values = palloc0(sizeof(Datum) * numberOfAttributes);
757+
memcpy(values, oldValues, sizeof(Datum) * numberOfAttributes);
758+
}
759+
760+
values[i] = tuple_compress_attr(tupleDescriptor, i, values[i]);
720761
}
721762
}
722763

@@ -775,6 +816,15 @@ heap_form_tuple(TupleDesc tupleDescriptor,
775816
&td->t_infomask,
776817
(hasnull ? td->t_bits : NULL));
777818

819+
if (oldValues)
820+
{
821+
for (i = 0; i < numberOfAttributes; i++)
822+
if (values[i] != oldValues[i])
823+
pfree(DatumGetPointer(values[i]));
824+
825+
pfree(values);
826+
}
827+
778828
return tuple;
779829
}
780830

@@ -932,8 +982,8 @@ heap_modify_tuple_by_cols(HeapTuple tuple,
932982
* noncacheable attribute offsets are involved.
933983
*/
934984
void
935-
heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc,
936-
Datum *values, bool *isnull)
985+
heap_deform_tuple_decompress(HeapTuple tuple, TupleDesc tupleDesc,
986+
Datum *values, bool *isnull, bool *decompress)
937987
{
938988
HeapTupleHeader tup = tuple->t_data;
939989
bool hasnulls = HeapTupleHasNulls(tuple);
@@ -1002,7 +1052,8 @@ heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc,
10021052
thisatt->attcacheoff = off;
10031053
}
10041054

1005-
values[attnum] = fetchatt(thisatt, tp + off);
1055+
values[attnum] = fetchatt_decompress(tupleDesc, attnum, tp + off,
1056+
!decompress || decompress[attnum]);
10061057

10071058
off = att_addlength_pointer(off, thisatt->attlen, tp + off);
10081059

@@ -1111,7 +1162,7 @@ slot_deform_tuple(TupleTableSlot *slot, int natts)
11111162
thisatt->attcacheoff = off;
11121163
}
11131164

1114-
values[attnum] = fetchatt(thisatt, tp + off);
1165+
values[attnum] = fetchatt(tupleDesc, attnum, tp + off);
11151166

11161167
off = att_addlength_pointer(off, thisatt->attlen, tp + off);
11171168

@@ -1451,6 +1502,7 @@ heap_form_minimal_tuple(TupleDesc tupleDescriptor,
14511502
if (tupleDescriptor->tdhasoid) /* else leave infomask = 0 */
14521503
tuple->t_infomask = HEAP_HASOID;
14531504

1505+
/* FIXME compressed/extended attributes */
14541506
heap_fill_tuple(tupleDescriptor,
14551507
values,
14561508
isnull,

src/backend/access/common/indextuple.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,7 @@ nocache_index_getattr(IndexTuple tup,
277277
*/
278278
if (att[attnum]->attcacheoff >= 0)
279279
{
280-
return fetchatt(att[attnum],
281-
tp + att[attnum]->attcacheoff);
280+
return fetchatt(tupleDesc, attnum, tp + att[attnum]->attcacheoff);
282281
}
283282

284283
/*
@@ -403,7 +402,7 @@ nocache_index_getattr(IndexTuple tup,
403402
}
404403
}
405404

406-
return fetchatt(att[attnum], tp + off);
405+
return fetchatt(tupleDesc, attnum, tp + off);
407406
}
408407

409408
/*

src/backend/access/heap/heapam.c

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
9191
bool temp_snap);
9292
static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
9393
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
94-
TransactionId xid, CommandId cid, int options);
94+
TupleDesc tupdesc, TransactionId xid,
95+
CommandId cid, int options);
9596
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
9697
Buffer newbuf, HeapTuple oldtup,
9798
HeapTuple newtup, HeapTuple old_key_tup,
@@ -1062,7 +1063,7 @@ fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
10621063
(
10631064
(tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ?
10641065
(
1065-
fetchatt((tupleDesc)->attrs[(attnum) - 1],
1066+
fetchatt(tupleDesc, (attnum) - 1,
10661067
(char *) (tup)->t_data + (tup)->t_data->t_hoff +
10671068
(tupleDesc)->attrs[(attnum) - 1]->attcacheoff)
10681069
)
@@ -2393,8 +2394,8 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
23932394
* within the tuple data is NOT reflected into *tup.
23942395
*/
23952396
Oid
2396-
heap_insert(Relation relation, HeapTuple tup, CommandId cid,
2397-
int options, BulkInsertState bistate)
2397+
heap_insert(Relation relation, HeapTuple tup, TupleDesc tupdesc,
2398+
CommandId cid, int options, BulkInsertState bistate)
23982399
{
23992400
TransactionId xid = GetCurrentTransactionId();
24002401
HeapTuple heaptup;
@@ -2409,7 +2410,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
24092410
* Note: below this point, heaptup is the data we actually intend to store
24102411
* into the relation; tup is the caller's original untoasted data.
24112412
*/
2412-
heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
2413+
heaptup = heap_prepare_insert(relation, tup, tupdesc, xid, cid, options);
24132414

24142415
/*
24152416
* Find buffer to insert this tuple into. If the page is all visible,
@@ -2569,6 +2570,36 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
25692570
return HeapTupleGetOid(tup);
25702571
}
25712572

2573+
static inline bool *
2574+
heap_tuple_needs_recompression(HeapTuple tup, TupleDesc td1, TupleDesc td2)
2575+
{
2576+
AttrNumber natts = td1->natts;
2577+
AttrNumber att;
2578+
bool *recompress = NULL;
2579+
2580+
if (td1 == td2)
2581+
return NULL;
2582+
2583+
if (!td1->tdcmroutines && !td2->tdcmroutines)
2584+
return NULL;
2585+
2586+
for (att = 1; att <= natts; att++)
2587+
{
2588+
if (heap_attisnull(tup, att))
2589+
continue;
2590+
2591+
if (td1->attrs[att - 1]->attcompression !=
2592+
td2->attrs[att - 1]->attcompression)
2593+
{
2594+
if (!recompress)
2595+
recompress = palloc0(sizeof(bool) * natts);
2596+
recompress[att - 1] = true;
2597+
}
2598+
}
2599+
2600+
return recompress;
2601+
}
2602+
25722603
/*
25732604
* Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the
25742605
* tuple header fields, assigns an OID, and toasts the tuple if necessary.
@@ -2577,9 +2608,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
25772608
* the original tuple.
25782609
*/
25792610
static HeapTuple
2580-
heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
2581-
CommandId cid, int options)
2611+
heap_prepare_insert(Relation relation, HeapTuple tup, TupleDesc tupdesc,
2612+
TransactionId xid, CommandId cid, int options)
25822613
{
2614+
bool *recompress;
2615+
25832616
/*
25842617
* For now, parallel operations are required to be strictly read-only.
25852618
* Unlike heap_update() and heap_delete(), an insert should never create a
@@ -2637,10 +2670,24 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
26372670
Assert(!HeapTupleHasExternal(tup));
26382671
return tup;
26392672
}
2640-
else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
2641-
return toast_insert_or_update(relation, tup, NULL, options);
2642-
else
2643-
return tup;
2673+
2674+
recompress = heap_tuple_needs_recompression(tup, tupdesc,
2675+
RelationGetDescr(relation));
2676+
2677+
if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD ||
2678+
recompress)
2679+
{
2680+
if (!recompress)
2681+
recompress = palloc0(sizeof(bool) * RelationGetDescr(relation)->natts);
2682+
2683+
tup = toast_insert_or_update(relation, tupdesc, tup, NULL, options,
2684+
recompress);
2685+
}
2686+
2687+
if (recompress)
2688+
pfree(recompress);
2689+
2690+
return tup;
26442691
}
26452692

26462693
/*
@@ -2677,6 +2724,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
26772724
heaptuples = palloc(ntuples * sizeof(HeapTuple));
26782725
for (i = 0; i < ntuples; i++)
26792726
heaptuples[i] = heap_prepare_insert(relation, tuples[i],
2727+
RelationGetDescr(relation),
26802728
xid, cid, options);
26812729

26822730
/*
@@ -2938,7 +2986,8 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
29382986
Oid
29392987
simple_heap_insert(Relation relation, HeapTuple tup)
29402988
{
2941-
return heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
2989+
return heap_insert(relation, tup, RelationGetDescr(relation),
2990+
GetCurrentCommandId(true), 0, NULL);
29422991
}
29432992

29442993
/*
@@ -3459,7 +3508,8 @@ simple_heap_delete(Relation relation, ItemPointer tid)
34593508
* See comments for struct HeapUpdateFailureData for additional info.
34603509
*/
34613510
HTSU_Result
3462-
heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
3511+
heap_update(Relation relation, ItemPointer otid,
3512+
HeapTuple newtup, TupleDesc newtupdesc,
34633513
CommandId cid, Snapshot crosscheck, bool wait,
34643514
HeapUpdateFailureData *hufd, LockTupleMode *lockmode)
34653515
{
@@ -3500,6 +3550,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
35003550
infomask2_old_tuple,
35013551
infomask_new_tuple,
35023552
infomask2_new_tuple;
3553+
bool *recompress;
35033554

35043555
Assert(ItemPointerIsValid(otid));
35053556

@@ -3963,11 +4014,21 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
39634014
Assert(!HeapTupleHasExternal(&oldtup));
39644015
Assert(!HeapTupleHasExternal(newtup));
39654016
need_toast = false;
4017+
recompress = NULL;
39664018
}
39674019
else
4020+
{
4021+
recompress = heap_tuple_needs_recompression(newtup, newtupdesc,
4022+
RelationGetDescr(relation));
4023+
39684024
need_toast = (HeapTupleHasExternal(&oldtup) ||
39694025
HeapTupleHasExternal(newtup) ||
3970-
newtup->t_len > TOAST_TUPLE_THRESHOLD);
4026+
newtup->t_len > TOAST_TUPLE_THRESHOLD) ||
4027+
recompress;
4028+
4029+
if (need_toast && !recompress)
4030+
recompress = palloc0(sizeof(bool) * RelationGetDescr(relation)->natts);
4031+
}
39714032

39724033
pagefree = PageGetHeapFreeSpace(page);
39734034

@@ -4068,8 +4129,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
40684129
if (need_toast)
40694130
{
40704131
/* Note we always use WAL and FSM during updates */
4071-
heaptup = toast_insert_or_update(relation, newtup, &oldtup, 0);
4132+
heaptup = toast_insert_or_update(relation, newtupdesc, newtup,
4133+
&oldtup, 0, recompress);
40724134
newtupsize = MAXALIGN(heaptup->t_len);
4135+
pfree(recompress);
40734136
}
40744137
else
40754138
heaptup = newtup;
@@ -4453,7 +4516,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
44534516
HeapUpdateFailureData hufd;
44544517
LockTupleMode lockmode;
44554518

4456-
result = heap_update(relation, otid, tup,
4519+
result = heap_update(relation, otid, tup, RelationGetDescr(relation),
44574520
GetCurrentCommandId(true), InvalidSnapshot,
44584521
true /* wait for commit */ ,
44594522
&hufd, &lockmode);

src/backend/access/heap/rewriteheap.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,10 +648,18 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
648648
heaptup = tup;
649649
}
650650
else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
651-
heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
651+
{
652+
TupleDesc tupdesc = RelationGetDescr(state->rs_new_rel);
653+
bool *recompress = palloc0(sizeof(bool) * tupdesc->natts);
654+
heaptup = toast_insert_or_update(state->rs_new_rel,
655+
tupdesc,
656+
tup, NULL,
652657
HEAP_INSERT_SKIP_FSM |
653658
(state->rs_use_wal ?
654-
0 : HEAP_INSERT_SKIP_WAL));
659+
0 : HEAP_INSERT_SKIP_WAL),
660+
recompress);
661+
pfree(recompress);
662+
}
655663
else
656664
heaptup = tup;
657665

0 commit comments

Comments
 (0)