Skip to content

Commit 78c9dbc

Browse files
author
Nikita Glukhov
committed
TMP: in-place updates
1 parent 3200771 commit 78c9dbc

File tree

6 files changed

+319
-95
lines changed

6 files changed

+319
-95
lines changed

contrib/jsonb_toaster/jsonb_toast_internals.c

Lines changed: 124 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@
3535
#include "utils/snapmgr.h"
3636
#include "utils/jsonb.h"
3737

38+
typedef struct varatt_external_diff
39+
{
40+
int32 va_diff_offset;
41+
char va_diff_data[FLEXIBLE_ARRAY_MEMBER];
42+
} varatt_external_diff;
43+
3844
char *
3945
jsonxWriteCustomToastPointerHeader(char *ptr, Oid toasterid, uint32 header,
4046
int datalen, int rawsize)
@@ -935,6 +941,7 @@ jsonx_create_fetch_datum_iterator(struct varlena *attr, Oid toasterid,
935941
uint32 header, char *inline_data, int inline_size)
936942
{
937943
JsonxFetchDatumIterator iter;
944+
uint32 type = header & JSONX_POINTER_TYPE_MASK;
938945

939946
if (!VARATT_IS_EXTERNAL_ONDISK(attr))
940947
elog(ERROR, "create_fetch_datum_iterator shouldn't be called for non-ondisk datums");
@@ -949,19 +956,19 @@ jsonx_create_fetch_datum_iterator(struct varlena *attr, Oid toasterid,
949956
iter->toasterid = toasterid;
950957
iter->chunk_tids_inline_size = inline_size;
951958

952-
if (inline_size <= 0)
959+
if (inline_size <= 0 || type == JSONX_POINTER_DIFF)
953960
{
954961
iter->nchunk_tids = 0;
955962
iter->chunk_tids = NULL;
956963
iter->compressed_chunk_tids = NULL;
957-
iter->compressed_chunks = (header & JSONX_POINTER_TYPE_MASK) == JSONX_POINTER_COMPRESSED_CHUNKS;
964+
iter->compressed_chunks = type == JSONX_POINTER_COMPRESSED_CHUNKS;
958965
}
959966
else
960967
{
961968
iter->nchunk_tids = header & ~JSONX_POINTER_TYPE_MASK;
962969
iter->compressed_chunks = false;
963970

964-
if ((header & JSONX_POINTER_TYPE_MASK) == JSONX_POINTER_DIRECT_TIDS_COMP)
971+
if (type == JSONX_POINTER_DIRECT_TIDS_COMP)
965972
{
966973
iter->chunk_tids = palloc0(sizeof(ItemPointerData) * iter->nchunk_tids);
967974
iter->compressed_chunk_tids = (char *) inline_data;
@@ -1503,6 +1510,9 @@ jsonx_fetch_datum_iterate_to(JsonxFetchDatumIterator iter, int32 chunkno, int32
15031510
static void
15041511
jsonx_free_detoast_iterator_internal(JsonxDetoastIterator iter)
15051512
{
1513+
if (iter->orig_buf && iter->orig_buf != iter->buf)
1514+
free_toast_buffer(iter->orig_buf);
1515+
15061516
if (iter->compressed && iter->buf)
15071517
{
15081518
free_toast_buffer(iter->buf);
@@ -1554,9 +1564,10 @@ jsonx_create_detoast_iterator(struct varlena *attr)
15541564
if (VARATT_IS_CUSTOM(attr))
15551565
{
15561566
uint32 header = JSONX_CUSTOM_PTR_GET_HEADER(attr);
1567+
uint32 type = header & JSONX_POINTER_TYPE_MASK;
15571568
char *data = (char *) JSONX_CUSTOM_PTR_GET_DATA(attr);
1558-
char *inlineData;
1559-
uint32 inlineSize;
1569+
char *inline_data;
1570+
uint32 inline_size;
15601571

15611572
if (!VARATT_IS_EXTERNAL_ONDISK(data))
15621573
return NULL;
@@ -1568,13 +1579,13 @@ jsonx_create_detoast_iterator(struct varlena *attr)
15681579
iter->nrefs = 1;
15691580
iter->gen.free_callback.func = (void (*)(void *)) jsonx_free_detoast_iterator_internal;
15701581

1571-
inlineData = data + TOAST_POINTER_SIZE;
1572-
inlineSize = JSONX_CUSTOM_PTR_GET_DATA_SIZE(attr) - TOAST_POINTER_SIZE;
1582+
inline_data = data + TOAST_POINTER_SIZE;
1583+
inline_size = JSONX_CUSTOM_PTR_GET_DATA_SIZE(attr) - TOAST_POINTER_SIZE;
15731584

15741585
iter->fetch_datum_iterator =
15751586
jsonx_create_fetch_datum_iterator((struct varlena *) data,
15761587
VARATT_CUSTOM_GET_TOASTERID(attr),
1577-
header, inlineData, inlineSize);
1588+
header, inline_data, inline_size);
15781589

15791590
if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
15801591
{
@@ -1583,14 +1594,30 @@ jsonx_create_detoast_iterator(struct varlena *attr)
15831594

15841595
/* prepare buffer to received decompressed data */
15851596
iter->buf = create_toast_buffer(toast_pointer.va_rawsize, false);
1597+
1598+
if (type == JSONX_POINTER_DIFF)
1599+
iter->orig_buf = create_toast_buffer(toast_pointer.va_rawsize, false);
1600+
else
1601+
iter->orig_buf = iter->buf;
15861602
}
15871603
else
15881604
{
15891605
iter->compressed = false;
15901606
iter->compression_method = TOAST_INVALID_COMPRESSION_ID;
15911607

15921608
/* point the buffer directly at the raw data */
1593-
iter->buf = iter->fetch_datum_iterator->buf;
1609+
iter->buf = iter->orig_buf = iter->fetch_datum_iterator->buf;
1610+
}
1611+
1612+
if (type == JSONX_POINTER_DIFF)
1613+
{
1614+
varatt_external_diff *diff = (varatt_external_diff *) inline_data;
1615+
1616+
iter->diff.inline_data = inline_data;
1617+
iter->diff.inline_size = inline_size;
1618+
iter->diff.size = inline_size - offsetof(varatt_external_diff, va_diff_data);
1619+
iter->diff.offset = diff->va_diff_offset;
1620+
iter->diff.data = diff->va_diff_data; /* FIXME MemoryContext */
15941621
}
15951622

15961623
return iter;
@@ -1605,7 +1632,8 @@ jsonx_create_detoast_iterator(struct varlena *attr)
16051632
iter->gen.free_callback.func = (void (*)(void *)) jsonx_free_detoast_iterator_internal;
16061633

16071634
/* This is an externally stored datum --- initialize fetch datum iterator */
1608-
iter->fetch_datum_iterator = fetch_iter = jsonx_create_fetch_datum_iterator(attr, InvalidOid, JSONX_PLAIN_JSONB, NULL, 0);
1635+
iter->fetch_datum_iterator = fetch_iter =
1636+
jsonx_create_fetch_datum_iterator(attr, InvalidOid, JSONX_PLAIN_JSONB, NULL, 0);
16091637
VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
16101638

16111639
if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
@@ -1614,15 +1642,15 @@ jsonx_create_detoast_iterator(struct varlena *attr)
16141642
iter->compression_method = VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer);
16151643

16161644
/* prepare buffer to received decompressed data */
1617-
iter->buf = create_toast_buffer(toast_pointer.va_rawsize, false);
1645+
iter->buf = iter->orig_buf = create_toast_buffer(toast_pointer.va_rawsize, false);
16181646
}
16191647
else
16201648
{
16211649
iter->compressed = false;
16221650
iter->compression_method = TOAST_INVALID_COMPRESSION_ID;
16231651

16241652
/* point the buffer directly at the raw data */
1625-
iter->buf = fetch_iter->buf;
1653+
iter->buf = iter->orig_buf = fetch_iter->buf;
16261654
}
16271655
return iter;
16281656
}
@@ -1660,7 +1688,7 @@ jsonx_create_detoast_iterator(struct varlena *attr)
16601688
buf->limit = (char *) buf->capacity;
16611689

16621690
/* prepare buffer to received decompressed data */
1663-
iter->buf = create_toast_buffer(TOAST_COMPRESS_EXTSIZE(attr) + VARHDRSZ, false);
1691+
iter->buf = iter->orig_buf = create_toast_buffer(TOAST_COMPRESS_EXTSIZE(attr) + VARHDRSZ, false);
16641692

16651693
return iter;
16661694
}
@@ -1669,10 +1697,61 @@ jsonx_create_detoast_iterator(struct varlena *attr)
16691697
return NULL;
16701698
}
16711699

1700+
static void
1701+
toast_apply_diff_internal(struct varlena *result, const char *diff_data,
1702+
int32 diff_offset, int32 diff_length,
1703+
int32 slice_offset, int32 slice_length)
1704+
{
1705+
if (diff_offset >= slice_offset)
1706+
{
1707+
if (diff_offset < slice_offset + slice_length)
1708+
memcpy((char *) result /*VARDATA(result)*/ + diff_offset,
1709+
diff_data,
1710+
Min(diff_length, slice_offset + slice_length - diff_offset));
1711+
}
1712+
else
1713+
{
1714+
if (slice_offset < diff_offset + diff_length)
1715+
memcpy((char *) result /*VARDATA(result)*/ + slice_offset,
1716+
diff_data + slice_offset - diff_offset,
1717+
Min(slice_length, diff_offset + diff_length - slice_offset));
1718+
}
1719+
}
1720+
1721+
#if 0
1722+
static void
1723+
toast_apply_diff(struct varlena *attr, struct varlena *result,
1724+
int32 sliceoffset, int32 slicelength)
1725+
{
1726+
if (VARATT_IS_EXTERNAL_ONDISK_INLINE_DIFF(attr))
1727+
{
1728+
struct varatt_external_versioned toast_pointer;
1729+
struct varatt_external_diff diff;
1730+
const char *inline_data = VARDATA_EXTERNAL_INLINE(attr);
1731+
/* Must copy to access aligned fields */
1732+
int32 inline_size = VARATT_EXTERNAL_INLINE_GET_POINTER(toast_pointer, attr);
1733+
int32 attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer.va_external);
1734+
Size data_offset = offsetof(varatt_external_diff, va_diff_data);
1735+
Size diff_size = inline_size - data_offset;
1736+
const char *diff_data = inline_data + data_offset;
1737+
1738+
memcpy(&diff, inline_data, data_offset);
1739+
1740+
if (slicelength < 0)
1741+
slicelength = attrsize - sliceoffset;
1742+
1743+
toast_apply_diff_internal(result, diff_data,
1744+
diff.va_diff_offset, diff_size,
1745+
sliceoffset, slicelength);
1746+
}
1747+
}
1748+
#endif
1749+
16721750
void
16731751
jsonx_detoast_iterate(JsonxDetoastIterator detoast_iter, const char *destend)
16741752
{
16751753
JsonxFetchDatumIterator fetch_iter = detoast_iter->fetch_datum_iterator;
1754+
const char *old_limit = detoast_iter->buf->limit;
16761755

16771756
Assert(detoast_iter != NULL && !detoast_iter->done);
16781757

@@ -1692,10 +1771,40 @@ jsonx_detoast_iterate(JsonxDetoastIterator detoast_iter, const char *destend)
16921771
jsonx_fetch_datum_iterate(fetch_iter, -1);
16931772

16941773
if (detoast_iter->compressed)
1695-
toast_decompress_iterate(fetch_iter->buf, detoast_iter->buf,
1774+
toast_decompress_iterate(fetch_iter->buf, detoast_iter->orig_buf,
16961775
detoast_iter->compression_method,
16971776
&detoast_iter->decompression_state,
1698-
destend);
1777+
detoast_iter->orig_buf->buf + (destend - detoast_iter->buf->buf));
1778+
1779+
if (detoast_iter->diff.data)
1780+
{
1781+
int32 slice_offset;
1782+
int32 slice_length;
1783+
1784+
/* copy original data to output buffer */
1785+
if (detoast_iter->compressed)
1786+
{
1787+
int dst_limit = detoast_iter->buf->limit - detoast_iter->buf->buf;
1788+
int src_limit = detoast_iter->orig_buf->limit - detoast_iter->orig_buf->buf;
1789+
1790+
if (dst_limit < src_limit)
1791+
{
1792+
memcpy(detoast_iter->buf->limit,
1793+
detoast_iter->orig_buf->buf + dst_limit,
1794+
src_limit - dst_limit);
1795+
detoast_iter->buf->limit += src_limit - dst_limit;
1796+
}
1797+
}
1798+
1799+
slice_offset = old_limit - detoast_iter->buf->buf;
1800+
slice_length = detoast_iter->buf->limit - old_limit;
1801+
1802+
toast_apply_diff_internal((struct varlena *) detoast_iter->buf->buf,
1803+
detoast_iter->diff.data,
1804+
detoast_iter->diff.offset,
1805+
detoast_iter->diff.size,
1806+
slice_offset, slice_length);
1807+
}
16991808

17001809
if (detoast_iter->buf->limit == detoast_iter->buf->capacity)
17011810
{

0 commit comments

Comments
 (0)