35
35
#include "utils/snapmgr.h"
36
36
#include "utils/jsonb.h"
37
37
38
+ typedef struct varatt_external_diff
39
+ {
40
+ int32 va_diff_offset ;
41
+ char va_diff_data [FLEXIBLE_ARRAY_MEMBER ];
42
+ } varatt_external_diff ;
43
+
38
44
char *
39
45
jsonxWriteCustomToastPointerHeader (char * ptr , Oid toasterid , uint32 header ,
40
46
int datalen , int rawsize )
@@ -935,6 +941,7 @@ jsonx_create_fetch_datum_iterator(struct varlena *attr, Oid toasterid,
935
941
uint32 header , char * inline_data , int inline_size )
936
942
{
937
943
JsonxFetchDatumIterator iter ;
944
+ uint32 type = header & JSONX_POINTER_TYPE_MASK ;
938
945
939
946
if (!VARATT_IS_EXTERNAL_ONDISK (attr ))
940
947
elog (ERROR , "create_fetch_datum_iterator shouldn't be called for non-ondisk datums" );
@@ -949,19 +956,19 @@ jsonx_create_fetch_datum_iterator(struct varlena *attr, Oid toasterid,
949
956
iter -> toasterid = toasterid ;
950
957
iter -> chunk_tids_inline_size = inline_size ;
951
958
952
- if (inline_size <= 0 )
959
+ if (inline_size <= 0 || type == JSONX_POINTER_DIFF )
953
960
{
954
961
iter -> nchunk_tids = 0 ;
955
962
iter -> chunk_tids = NULL ;
956
963
iter -> compressed_chunk_tids = NULL ;
957
- iter -> compressed_chunks = ( header & JSONX_POINTER_TYPE_MASK ) == JSONX_POINTER_COMPRESSED_CHUNKS ;
964
+ iter -> compressed_chunks = type == JSONX_POINTER_COMPRESSED_CHUNKS ;
958
965
}
959
966
else
960
967
{
961
968
iter -> nchunk_tids = header & ~JSONX_POINTER_TYPE_MASK ;
962
969
iter -> compressed_chunks = false;
963
970
964
- if (( header & JSONX_POINTER_TYPE_MASK ) == JSONX_POINTER_DIRECT_TIDS_COMP )
971
+ if (type == JSONX_POINTER_DIRECT_TIDS_COMP )
965
972
{
966
973
iter -> chunk_tids = palloc0 (sizeof (ItemPointerData ) * iter -> nchunk_tids );
967
974
iter -> compressed_chunk_tids = (char * ) inline_data ;
@@ -1503,6 +1510,9 @@ jsonx_fetch_datum_iterate_to(JsonxFetchDatumIterator iter, int32 chunkno, int32
1503
1510
static void
1504
1511
jsonx_free_detoast_iterator_internal (JsonxDetoastIterator iter )
1505
1512
{
1513
+ if (iter -> orig_buf && iter -> orig_buf != iter -> buf )
1514
+ free_toast_buffer (iter -> orig_buf );
1515
+
1506
1516
if (iter -> compressed && iter -> buf )
1507
1517
{
1508
1518
free_toast_buffer (iter -> buf );
@@ -1554,9 +1564,10 @@ jsonx_create_detoast_iterator(struct varlena *attr)
1554
1564
if (VARATT_IS_CUSTOM (attr ))
1555
1565
{
1556
1566
uint32 header = JSONX_CUSTOM_PTR_GET_HEADER (attr );
1567
+ uint32 type = header & JSONX_POINTER_TYPE_MASK ;
1557
1568
char * data = (char * ) JSONX_CUSTOM_PTR_GET_DATA (attr );
1558
- char * inlineData ;
1559
- uint32 inlineSize ;
1569
+ char * inline_data ;
1570
+ uint32 inline_size ;
1560
1571
1561
1572
if (!VARATT_IS_EXTERNAL_ONDISK (data ))
1562
1573
return NULL ;
@@ -1568,13 +1579,13 @@ jsonx_create_detoast_iterator(struct varlena *attr)
1568
1579
iter -> nrefs = 1 ;
1569
1580
iter -> gen .free_callback .func = (void (* )(void * )) jsonx_free_detoast_iterator_internal ;
1570
1581
1571
- inlineData = data + TOAST_POINTER_SIZE ;
1572
- inlineSize = JSONX_CUSTOM_PTR_GET_DATA_SIZE (attr ) - TOAST_POINTER_SIZE ;
1582
+ inline_data = data + TOAST_POINTER_SIZE ;
1583
+ inline_size = JSONX_CUSTOM_PTR_GET_DATA_SIZE (attr ) - TOAST_POINTER_SIZE ;
1573
1584
1574
1585
iter -> fetch_datum_iterator =
1575
1586
jsonx_create_fetch_datum_iterator ((struct varlena * ) data ,
1576
1587
VARATT_CUSTOM_GET_TOASTERID (attr ),
1577
- header , inlineData , inlineSize );
1588
+ header , inline_data , inline_size );
1578
1589
1579
1590
if (VARATT_EXTERNAL_IS_COMPRESSED (toast_pointer ))
1580
1591
{
@@ -1583,14 +1594,30 @@ jsonx_create_detoast_iterator(struct varlena *attr)
1583
1594
1584
1595
/* prepare buffer to received decompressed data */
1585
1596
iter -> buf = create_toast_buffer (toast_pointer .va_rawsize , false);
1597
+
1598
+ if (type == JSONX_POINTER_DIFF )
1599
+ iter -> orig_buf = create_toast_buffer (toast_pointer .va_rawsize , false);
1600
+ else
1601
+ iter -> orig_buf = iter -> buf ;
1586
1602
}
1587
1603
else
1588
1604
{
1589
1605
iter -> compressed = false;
1590
1606
iter -> compression_method = TOAST_INVALID_COMPRESSION_ID ;
1591
1607
1592
1608
/* point the buffer directly at the raw data */
1593
- iter -> buf = iter -> fetch_datum_iterator -> buf ;
1609
+ iter -> buf = iter -> orig_buf = iter -> fetch_datum_iterator -> buf ;
1610
+ }
1611
+
1612
+ if (type == JSONX_POINTER_DIFF )
1613
+ {
1614
+ varatt_external_diff * diff = (varatt_external_diff * ) inline_data ;
1615
+
1616
+ iter -> diff .inline_data = inline_data ;
1617
+ iter -> diff .inline_size = inline_size ;
1618
+ iter -> diff .size = inline_size - offsetof(varatt_external_diff , va_diff_data );
1619
+ iter -> diff .offset = diff -> va_diff_offset ;
1620
+ iter -> diff .data = diff -> va_diff_data ; /* FIXME MemoryContext */
1594
1621
}
1595
1622
1596
1623
return iter ;
@@ -1605,7 +1632,8 @@ jsonx_create_detoast_iterator(struct varlena *attr)
1605
1632
iter -> gen .free_callback .func = (void (* )(void * )) jsonx_free_detoast_iterator_internal ;
1606
1633
1607
1634
/* This is an externally stored datum --- initialize fetch datum iterator */
1608
- iter -> fetch_datum_iterator = fetch_iter = jsonx_create_fetch_datum_iterator (attr , InvalidOid , JSONX_PLAIN_JSONB , NULL , 0 );
1635
+ iter -> fetch_datum_iterator = fetch_iter =
1636
+ jsonx_create_fetch_datum_iterator (attr , InvalidOid , JSONX_PLAIN_JSONB , NULL , 0 );
1609
1637
VARATT_EXTERNAL_GET_POINTER (toast_pointer , attr );
1610
1638
1611
1639
if (VARATT_EXTERNAL_IS_COMPRESSED (toast_pointer ))
@@ -1614,15 +1642,15 @@ jsonx_create_detoast_iterator(struct varlena *attr)
1614
1642
iter -> compression_method = VARATT_EXTERNAL_GET_COMPRESS_METHOD (toast_pointer );
1615
1643
1616
1644
/* prepare buffer to received decompressed data */
1617
- iter -> buf = create_toast_buffer (toast_pointer .va_rawsize , false);
1645
+ iter -> buf = iter -> orig_buf = create_toast_buffer (toast_pointer .va_rawsize , false);
1618
1646
}
1619
1647
else
1620
1648
{
1621
1649
iter -> compressed = false;
1622
1650
iter -> compression_method = TOAST_INVALID_COMPRESSION_ID ;
1623
1651
1624
1652
/* point the buffer directly at the raw data */
1625
- iter -> buf = fetch_iter -> buf ;
1653
+ iter -> buf = iter -> orig_buf = fetch_iter -> buf ;
1626
1654
}
1627
1655
return iter ;
1628
1656
}
@@ -1660,7 +1688,7 @@ jsonx_create_detoast_iterator(struct varlena *attr)
1660
1688
buf -> limit = (char * ) buf -> capacity ;
1661
1689
1662
1690
/* prepare buffer to received decompressed data */
1663
- iter -> buf = create_toast_buffer (TOAST_COMPRESS_EXTSIZE (attr ) + VARHDRSZ , false);
1691
+ iter -> buf = iter -> orig_buf = create_toast_buffer (TOAST_COMPRESS_EXTSIZE (attr ) + VARHDRSZ , false);
1664
1692
1665
1693
return iter ;
1666
1694
}
@@ -1669,10 +1697,61 @@ jsonx_create_detoast_iterator(struct varlena *attr)
1669
1697
return NULL ;
1670
1698
}
1671
1699
1700
+ static void
1701
+ toast_apply_diff_internal (struct varlena * result , const char * diff_data ,
1702
+ int32 diff_offset , int32 diff_length ,
1703
+ int32 slice_offset , int32 slice_length )
1704
+ {
1705
+ if (diff_offset >= slice_offset )
1706
+ {
1707
+ if (diff_offset < slice_offset + slice_length )
1708
+ memcpy ((char * ) result /*VARDATA(result)*/ + diff_offset ,
1709
+ diff_data ,
1710
+ Min (diff_length , slice_offset + slice_length - diff_offset ));
1711
+ }
1712
+ else
1713
+ {
1714
+ if (slice_offset < diff_offset + diff_length )
1715
+ memcpy ((char * ) result /*VARDATA(result)*/ + slice_offset ,
1716
+ diff_data + slice_offset - diff_offset ,
1717
+ Min (slice_length , diff_offset + diff_length - slice_offset ));
1718
+ }
1719
+ }
1720
+
1721
+ #if 0
1722
+ static void
1723
+ toast_apply_diff (struct varlena * attr , struct varlena * result ,
1724
+ int32 sliceoffset , int32 slicelength )
1725
+ {
1726
+ if (VARATT_IS_EXTERNAL_ONDISK_INLINE_DIFF (attr ))
1727
+ {
1728
+ struct varatt_external_versioned toast_pointer ;
1729
+ struct varatt_external_diff diff ;
1730
+ const char * inline_data = VARDATA_EXTERNAL_INLINE (attr );
1731
+ /* Must copy to access aligned fields */
1732
+ int32 inline_size = VARATT_EXTERNAL_INLINE_GET_POINTER (toast_pointer , attr );
1733
+ int32 attrsize = VARATT_EXTERNAL_GET_EXTSIZE (toast_pointer .va_external );
1734
+ Size data_offset = offsetof(varatt_external_diff , va_diff_data );
1735
+ Size diff_size = inline_size - data_offset ;
1736
+ const char * diff_data = inline_data + data_offset ;
1737
+
1738
+ memcpy (& diff , inline_data , data_offset );
1739
+
1740
+ if (slicelength < 0 )
1741
+ slicelength = attrsize - sliceoffset ;
1742
+
1743
+ toast_apply_diff_internal (result , diff_data ,
1744
+ diff .va_diff_offset , diff_size ,
1745
+ sliceoffset , slicelength );
1746
+ }
1747
+ }
1748
+ #endif
1749
+
1672
1750
void
1673
1751
jsonx_detoast_iterate (JsonxDetoastIterator detoast_iter , const char * destend )
1674
1752
{
1675
1753
JsonxFetchDatumIterator fetch_iter = detoast_iter -> fetch_datum_iterator ;
1754
+ const char * old_limit = detoast_iter -> buf -> limit ;
1676
1755
1677
1756
Assert (detoast_iter != NULL && !detoast_iter -> done );
1678
1757
@@ -1692,10 +1771,40 @@ jsonx_detoast_iterate(JsonxDetoastIterator detoast_iter, const char *destend)
1692
1771
jsonx_fetch_datum_iterate (fetch_iter , -1 );
1693
1772
1694
1773
if (detoast_iter -> compressed )
1695
- toast_decompress_iterate (fetch_iter -> buf , detoast_iter -> buf ,
1774
+ toast_decompress_iterate (fetch_iter -> buf , detoast_iter -> orig_buf ,
1696
1775
detoast_iter -> compression_method ,
1697
1776
& detoast_iter -> decompression_state ,
1698
- destend );
1777
+ detoast_iter -> orig_buf -> buf + (destend - detoast_iter -> buf -> buf ));
1778
+
1779
+ if (detoast_iter -> diff .data )
1780
+ {
1781
+ int32 slice_offset ;
1782
+ int32 slice_length ;
1783
+
1784
+ /* copy original data to output buffer */
1785
+ if (detoast_iter -> compressed )
1786
+ {
1787
+ int dst_limit = detoast_iter -> buf -> limit - detoast_iter -> buf -> buf ;
1788
+ int src_limit = detoast_iter -> orig_buf -> limit - detoast_iter -> orig_buf -> buf ;
1789
+
1790
+ if (dst_limit < src_limit )
1791
+ {
1792
+ memcpy (detoast_iter -> buf -> limit ,
1793
+ detoast_iter -> orig_buf -> buf + dst_limit ,
1794
+ src_limit - dst_limit );
1795
+ detoast_iter -> buf -> limit += src_limit - dst_limit ;
1796
+ }
1797
+ }
1798
+
1799
+ slice_offset = old_limit - detoast_iter -> buf -> buf ;
1800
+ slice_length = detoast_iter -> buf -> limit - old_limit ;
1801
+
1802
+ toast_apply_diff_internal ((struct varlena * ) detoast_iter -> buf -> buf ,
1803
+ detoast_iter -> diff .data ,
1804
+ detoast_iter -> diff .offset ,
1805
+ detoast_iter -> diff .size ,
1806
+ slice_offset , slice_length );
1807
+ }
1699
1808
1700
1809
if (detoast_iter -> buf -> limit == detoast_iter -> buf -> capacity )
1701
1810
{
0 commit comments