Skip to content

Commit f061c82

Browse files
author
Nikita Glukhov
committed
Partial decompression in TOAST iterators
1 parent 01ee7ea commit f061c82

File tree

6 files changed

+126
-16
lines changed

6 files changed

+126
-16
lines changed

src/backend/access/common/detoast.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ create_detoast_iterator(struct varlena *attr)
5656
/* initialize state for pglz_decompress_iterate() */
5757
iter->ctrl = 0;
5858
iter->ctrlc = INVALID_CTRLC;
59+
iter->len = 0;
60+
iter->off = 0;
5961
}
6062
else
6163
{
@@ -81,7 +83,7 @@ create_detoast_iterator(struct varlena *attr)
8183
return create_detoast_iterator(attr);
8284

8385
}
84-
else if (VARATT_IS_COMPRESSED(attr))
86+
else if (1 && VARATT_IS_COMPRESSED(attr))
8587
{
8688
ToastBuffer *buf;
8789

src/backend/access/common/toast_compression.c

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,82 @@ lz4_decompress_datum_slice(const struct varlena *value, int32 slicelength)
246246
#endif
247247
}
248248

249+
void
250+
toast_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
251+
DetoastIterator iter, const char *destend)
252+
{
253+
const char *sp;
254+
const char *srcend;
255+
char *dp;
256+
int32 dlen;
257+
int32 slen;
258+
bool last_source_chunk;
259+
260+
/*
261+
* In the while loop, sp may be incremented such that it points beyond
262+
* srcend. To guard against reading beyond the end of the current chunk,
263+
* we set srcend such that we exit the loop when we are within four bytes
264+
* of the end of the current chunk. When source->limit reaches
265+
* source->capacity, we are decompressing the last chunk, so we can (and
266+
* need to) read every byte.
267+
*/
268+
last_source_chunk = source->limit == source->capacity;
269+
srcend = last_source_chunk ? source->limit : source->limit - 4;
270+
sp = source->position;
271+
dp = dest->limit;
272+
if (destend > dest->capacity)
273+
destend = dest->capacity;
274+
275+
slen = srcend - source->position;
276+
277+
/*
278+
* Decompress the data using the appropriate decompression routine.
279+
*/
280+
switch (iter->compression_method)
281+
{
282+
case TOAST_PGLZ_COMPRESSION_ID:
283+
dlen = pglz_decompress_state(sp, &slen, dp, destend - dp,
284+
last_source_chunk && destend == dest->capacity,
285+
last_source_chunk,
286+
&iter->decompression_state);
287+
break;
288+
case TOAST_LZ4_COMPRESSION_ID:
289+
if (source->limit < source->capacity)
290+
dlen = 0; /* LZ4 needs need full data to decompress */
291+
else
292+
{
293+
/* decompress the data */
294+
#ifndef USE_LZ4
295+
NO_LZ4_SUPPORT();
296+
dlen = 0;
297+
#else
298+
dlen = LZ4_decompress_safe(source->buf + VARHDRSZ_COMPRESSED,
299+
VARDATA(dest->buf),
300+
VARSIZE(source->buf) - VARHDRSZ_COMPRESSED,
301+
VARDATA_COMPRESSED_GET_EXTSIZE(source->buf));
302+
303+
if (dlen < 0)
304+
ereport(ERROR,
305+
(errcode(ERRCODE_DATA_CORRUPTED),
306+
errmsg_internal("compressed lz4 data is corrupt")));
307+
#endif
308+
slen = 0;
309+
}
310+
break;
311+
default:
312+
elog(ERROR, "invalid compression method id %d", iter->compression_method);
313+
return; /* keep compiler quiet */
314+
}
315+
316+
if (dlen < 0)
317+
ereport(ERROR,
318+
(errcode(ERRCODE_DATA_CORRUPTED),
319+
errmsg_internal("compressed data is corrupt")));
320+
321+
source->position += slen;
322+
dest->limit += dlen;
323+
}
324+
249325
/*
250326
* Extract compression ID from a varlena.
251327
*

src/backend/access/common/toast_internals.c

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,9 +1410,33 @@ pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
14101410
(source->limit == source->capacity ? source->limit : (source->limit - 4));
14111411
sp = (const unsigned char *) source->position;
14121412
dp = (unsigned char *) dest->limit;
1413-
if (destend < (unsigned char *) dest->capacity)
1413+
if (destend > (unsigned char *) dest->capacity)
14141414
destend = (unsigned char *) dest->capacity;
14151415

1416+
if (iter->len)
1417+
{
1418+
int32 len = iter->len;
1419+
int32 off = iter->off;
1420+
int32 copylen = Min(len, destend - dp);
1421+
int32 remlen = len - copylen;
1422+
1423+
while (copylen--)
1424+
{
1425+
*dp = dp[-off];
1426+
dp++;
1427+
}
1428+
1429+
iter->len = remlen;
1430+
1431+
if (dp >= destend)
1432+
{
1433+
dest->limit = (char *) dp;
1434+
return;
1435+
}
1436+
1437+
Assert(remlen == 0);
1438+
}
1439+
14161440
while (sp < srcend && dp < destend)
14171441
{
14181442
/*
@@ -1433,7 +1457,6 @@ pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
14331457
ctrlc = 0;
14341458
}
14351459

1436-
14371460
for (; ctrlc < INVALID_CTRLC && sp < srcend && dp < destend; ctrlc++)
14381461
{
14391462

@@ -1450,6 +1473,7 @@ pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
14501473
*/
14511474
int32 len;
14521475
int32 off;
1476+
int32 copylen;
14531477

14541478
len = (sp[0] & 0x0f) + 3;
14551479
off = ((sp[0] & 0xf0) << 4) | sp[1];
@@ -1463,17 +1487,21 @@ pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
14631487
* areas could overlap; to prevent possible uncertainty, we
14641488
* copy only non-overlapping regions.
14651489
*/
1466-
len = Min(len, destend - dp);
1467-
while (off < len)
1490+
copylen = Min(len, destend - dp);
1491+
iter->len = len - copylen;
1492+
1493+
while (off < copylen)
14681494
{
14691495
/* see comments in common/pg_lzcompress.c */
14701496
memcpy(dp, dp - off, off);
1471-
len -= off;
1497+
copylen -= off;
14721498
dp += off;
14731499
off += off;
14741500
}
1475-
memcpy(dp, dp - off, len);
1476-
dp += len;
1501+
memcpy(dp, dp - off, copylen);
1502+
dp += copylen;
1503+
1504+
iter->off = off;
14771505
}
14781506
else
14791507
{

src/common/pg_lzcompress.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,8 @@ typedef struct pglz_state
700700
*/
701701
int32
702702
pglz_decompress_state(const char *source, int32 slen, char *dest,
703-
int32 rawsize, bool check_complete, void **pstate)
703+
int32 dlen, bool check_complete, bool last_cource_chunk,
704+
void **pstate)
704705
{
705706
pglz_state *state = pstate ? *pstate : NULL;
706707
const unsigned char *sp;
@@ -800,7 +801,7 @@ pglz_decompress_state(const char *source, int32 slen, char *dest,
800801
* must check this, else we risk an infinite loop below in the
801802
* face of corrupt data.)
802803
*/
803-
if (unlikely(sp > srcend || off == 0))
804+
if (unlikely((sp > srcend && last_cource_chunk) || off == 0))
804805
return -1;
805806

806807
/*
@@ -947,5 +948,5 @@ int32
947948
pglz_decompress(const char *source, int32 slen, char *dest, int32 rawsize,
948949
bool check_complete)
949950
{
950-
return pglz_decompress_state(source, slen, dest, rawsize, check_complete, NULL);
951+
return pglz_decompress_state(source, slen, dest, rawsize, check_complete, true, NULL);
951952
}

src/include/access/detoast.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ typedef struct DetoastIteratorData
105105
unsigned char ctrl;
106106
int ctrlc;
107107
int nrefs;
108+
int32 len;
109+
int32 off;
108110
bool compressed; /* toast value is compressed? */
109111
bool done;
110112
} DetoastIteratorData;
@@ -156,16 +158,16 @@ detoast_iterate(DetoastIterator detoast_iter, const char *destend)
156158
if (!detoast_iter->compressed)
157159
destend = NULL;
158160

159-
if (destend)
161+
if (1 && destend)
160162
{
161163
const char *srcend = (const char *)
162164
(fetch_iter->buf->limit == fetch_iter->buf->capacity ?
163165
fetch_iter->buf->limit : fetch_iter->buf->limit - 4);
164166

165-
if (fetch_iter->buf->position >= srcend)
167+
if (fetch_iter->buf->position >= srcend && !fetch_iter->done)
166168
fetch_datum_iterate(fetch_iter);
167169
}
168-
else
170+
else if (!fetch_iter->done)
169171
fetch_datum_iterate(fetch_iter);
170172

171173
if (detoast_iter->compressed)

src/include/common/pg_lzcompress.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ extern int32 pglz_compress(const char *source, int32 slen, char *dest,
8787
const PGLZ_Strategy *strategy);
8888
extern int32 pglz_decompress(const char *source, int32 slen, char *dest,
8989
int32 rawsize, bool check_complete);
90-
extern int32 pglz_decompress_state(const char *source, int32 slen, char *dest,
91-
int32 rawsize, bool check_complete,
90+
extern int32 pglz_decompress_state(const char *source, int32 slen,
91+
char *dest, int32 dlen,
92+
bool check_complete, bool last_source_chunk,
9293
void **state);
9394
extern int32 pglz_maximum_compressed_size(int32 rawsize,
9495
int32 total_compressed_size);

0 commit comments

Comments
 (0)