Skip to content

Commit 4596c34

Browse files
committed
Optimized version of postgres_fdw: use 2pc only when needed
1 parent 0e1ff43 commit 4596c34

File tree

9 files changed

+1462
-37
lines changed

9 files changed

+1462
-37
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ static bool xact_got_connection = false;
6767
typedef long long csn_t;
6868
static csn_t currentGlobalTransactionId = 0;
6969
static int currentLocalTransactionId = 0;
70+
static PGconn* currentConnection = NULL;
71+
7072

7173
/* prototypes of private functions */
7274
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
@@ -406,6 +408,8 @@ static void
406408
begin_remote_xact(ConnCacheEntry *entry)
407409
{
408410
int curlevel = GetCurrentTransactionNestLevel();
411+
PGresult *res;
412+
409413

410414
/* Start main transaction if we haven't yet */
411415
if (entry->xact_depth <= 0)
@@ -419,8 +423,6 @@ begin_remote_xact(ConnCacheEntry *entry)
419423
if (TransactionIdIsValid(gxid))
420424
{
421425
char stmt[64];
422-
PGresult *res;
423-
424426
snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
425427
res = PQexec(entry->conn, stmt);
426428
PQclear(res);
@@ -434,26 +436,30 @@ begin_remote_xact(ConnCacheEntry *entry)
434436
entry->xact_depth = 1;
435437
if (UseTsDtmTransactions)
436438
{
437-
if (!currentGlobalTransactionId)
439+
if (currentConnection == NULL)
438440
{
439-
PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_extend('%d.%d')",
440-
MyProcPid, ++currentLocalTransactionId));
441-
char *resp;
442-
443-
if (PQresultStatus(res) != PGRES_TUPLES_OK)
444-
{
445-
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
446-
}
447-
resp = PQgetvalue(res, 0, 0);
448-
if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", &currentGlobalTransactionId) != 1)
449-
{
450-
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
451-
}
452-
PQclear(res);
453-
}
454-
else
441+
currentConnection = entry->conn;
442+
}
443+
else if (entry->conn != currentConnection)
455444
{
456-
PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_access(%llu, '%d.%d')", currentGlobalTransactionId, MyProcPid, currentLocalTransactionId));
445+
if (!currentGlobalTransactionId)
446+
{
447+
char *resp;
448+
res = PQexec(currentConnection, psprintf("SELECT public.dtm_extend('%d.%d')",
449+
MyProcPid, ++currentLocalTransactionId));
450+
451+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
452+
{
453+
pgfdw_report_error(ERROR, res, currentConnection, true, sql);
454+
}
455+
resp = PQgetvalue(res, 0, 0);
456+
if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", &currentGlobalTransactionId) != 1)
457+
{
458+
pgfdw_report_error(ERROR, res, currentConnection, true, sql);
459+
}
460+
PQclear(res);
461+
}
462+
res = PQexec(entry->conn, psprintf("SELECT public.dtm_access(%llu, '%d.%d')", currentGlobalTransactionId, MyProcPid, currentLocalTransactionId));
457463

458464
if (PQresultStatus(res) != PGRES_TUPLES_OK)
459465
{
@@ -954,6 +960,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
954960
cursor_number = 0;
955961

956962
currentGlobalTransactionId = 0;
963+
currentConnection = NULL;
957964
}
958965
}
959966

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3179,22 +3179,29 @@ execute_dml_stmt(ForeignScanState *node)
31793179
/*
31803180
* Construct array of query parameter values in text format.
31813181
*/
3182-
if (numParams > 0)
3182+
if (numParams > 0)
3183+
{
31833184
process_query_params(econtext,
31843185
dmstate->param_flinfo,
31853186
dmstate->param_exprs,
31863187
values);
3187-
3188-
/*
3189-
* Notice that we pass NULL for paramTypes, thus forcing the remote server
3190-
* to infer types for all parameters. Since we explicitly cast every
3191-
* parameter (see deparse.c), the "inference" is trivial and will produce
3192-
* the desired result. This allows us to avoid assuming that the remote
3193-
* server has the same OIDs we do for the parameters' types.
3194-
*/
3195-
if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3196-
NULL, values, NULL, NULL, 0))
3197-
pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3188+
3189+
/*
3190+
* Notice that we pass NULL for paramTypes, thus forcing the remote server
3191+
* to infer types for all parameters. Since we explicitly cast every
3192+
* parameter (see deparse.c), the "inference" is trivial and will produce
3193+
* the desired result. This allows us to avoid assuming that the remote
3194+
* server has the same OIDs we do for the parameters' types.
3195+
*/
3196+
if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
3197+
NULL, values, NULL, NULL, 0))
3198+
pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3199+
}
3200+
else
3201+
{
3202+
if (!PQsendQuery(dmstate->conn, dmstate->query))
3203+
pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
3204+
}
31983205

31993206
/*
32003207
* Get the result, and check for success.

contrib/postgres_fdw/tests/dtmbench.cpp

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ struct config
5353
int nShards;
5454
string connection;
5555
bool prepared;
56+
bool local;
57+
bool pathman_sharding;
5658

5759
config() {
5860
nShards = 1;
@@ -62,6 +64,8 @@ struct config
6264
nAccounts = 10000;
6365
updatePercent = 100;
6466
prepared = false;
67+
local = false;
68+
pathman_sharding = false;
6569
}
6670
};
6771

@@ -125,14 +129,17 @@ void* writer(void* arg)
125129
{
126130
thread& t = *(thread*)arg;
127131
connection conn(cfg.connection);
132+
128133
if (cfg.prepared) {
129134
conn.prepare("transfer", "update t set v = v + $1 where u=$2");
130135
}
136+
131137
for (int i = 0; i < cfg.nIterations; i++)
132138
{
133139
work txn(conn);
134140
int srcAcc = random() % cfg.nAccounts;
135-
int dstAcc = random() % cfg.nAccounts;
141+
int dstAcc = (cfg.local ? srcAcc + 1 : random()) % cfg.nAccounts;
142+
136143
try {
137144
if (random() % 100 < cfg.updatePercent) {
138145
int rc = cfg.prepared
@@ -180,8 +187,17 @@ void initializeDatabase()
180187
for (int i = 0; i < cfg.nShards; i++)
181188
{
182189
work txn(conn);
183-
exec(txn, "alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
190+
191+
if (!cfg.pathman_sharding) {
192+
exec(txn, "alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
193+
} else {
194+
//exec(txn, "SELECT add_range_partition('t', %d::int, %d, 't_fdw%i')",
195+
exec(txn, "SELECT add_foreign_range_partition('t', %d::int, %d, 't_fdw%i', 'shard%i')",
196+
accountsPerShard*i, accountsPerShard*(i+1), i+1, i % 3 + 1);
197+
}
198+
184199
exec(txn, "insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1, 0);
200+
185201
txn.commit();
186202
}
187203
}
@@ -221,9 +237,15 @@ int main (int argc, char* argv[])
221237
initialize = true;
222238
cfg.nShards = atoi(argv[++i]);
223239
continue;
240+
case 'l':
241+
cfg.local = true;
242+
continue;
224243
case 'P':
225244
cfg.prepared = true;
226245
continue;
246+
case 'S':
247+
cfg.pathman_sharding = true;
248+
continue;
227249
}
228250
}
229251
printf("Options:\n"
@@ -234,7 +256,9 @@ int main (int argc, char* argv[])
234256
"\t-p N\tupdate percent (100)\n"
235257
"\t-c STR\tdatabase connection string\n"
236258
"\t-i N\tinitialize N shards\n"
237-
"\t-P\tuse prepared statements\n");
259+
"\t-l\ttlocal tranfers\n"
260+
"\t-P\tuse prepared statements\n"
261+
"\t-S\tuse pathman_sharding\n");
238262
return 1;
239263
}
240264

@@ -282,7 +306,8 @@ int main (int argc, char* argv[])
282306
printf(
283307
"{\"tps\":%f, \"transactions\":%ld,"
284308
" \"selects\":%ld, \"updates\":%ld, \"aborts\":%ld, \"abort_percent\": %d,"
285-
" \"readers\":%d, \"writers\":%d, \"update_percent\":%d, \"accounts\":%d, \"iterations\":%d ,\"shards\":%d, \"prepared\":%d}\n",
309+
" \"readers\":%d, \"writers\":%d, \"update_percent\":%d, \"accounts\":%d,"
310+
" \"iterations\":%d ,\"shards\":%d, \"prepared\":%d, \"pathman_sharding\":%d, \"local\":%d}\n",
286311
(double)(nTransactions*USEC)/elapsed,
287312
nTransactions,
288313
nSelects,
@@ -295,7 +320,9 @@ int main (int argc, char* argv[])
295320
cfg.nAccounts,
296321
cfg.nIterations,
297322
cfg.nShards,
298-
cfg.prepared);
323+
cfg.prepared,
324+
cfg.pathman_sharding,
325+
cfg.local);
299326

300327
return 0;
301328
}

0 commit comments

Comments
 (0)