@@ -322,8 +322,9 @@ static void AlterSeqNamespaces(Relation classRel, Relation rel,
322
322
LOCKMODE lockmode );
323
323
static ObjectAddress ATExecAlterConstraint (Relation rel , AlterTableCmd * cmd ,
324
324
bool recurse , bool recursing , LOCKMODE lockmode );
325
- static ObjectAddress ATExecValidateConstraint (Relation rel , char * constrName ,
326
- bool recurse , bool recursing , LOCKMODE lockmode );
325
+ static ObjectAddress ATExecValidateConstraint (List * * wqueue , Relation rel ,
326
+ char * constrName , bool recurse , bool recursing ,
327
+ LOCKMODE lockmode );
327
328
static int transformColumnNameList (Oid relId , List * colList ,
328
329
int16 * attnums , Oid * atttypids );
329
330
static int transformFkeyGetPrimaryKey (Relation pkrel , Oid * indexOid ,
@@ -336,7 +337,6 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
336
337
static void checkFkeyPermissions (Relation rel , int16 * attnums , int natts );
337
338
static CoercionPathType findFkeyCast (Oid targetTypeId , Oid sourceTypeId ,
338
339
Oid * funcid );
339
- static void validateCheckConstraint (Relation rel , HeapTuple constrtup );
340
340
static void validateForeignKeyConstraint (char * conname ,
341
341
Relation rel , Relation pkrel ,
342
342
Oid pkindOid , Oid constraintOid );
@@ -4187,13 +4187,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
4187
4187
address = ATExecAlterConstraint (rel , cmd , false, false, lockmode );
4188
4188
break ;
4189
4189
case AT_ValidateConstraint : /* VALIDATE CONSTRAINT */
4190
- address = ATExecValidateConstraint (rel , cmd -> name , false , false,
4191
- lockmode );
4190
+ address = ATExecValidateConstraint (wqueue , rel , cmd -> name , false,
4191
+ false, lockmode );
4192
4192
break ;
4193
4193
case AT_ValidateConstraintRecurse : /* VALIDATE CONSTRAINT with
4194
4194
* recursion */
4195
- address = ATExecValidateConstraint (rel , cmd -> name , true, false ,
4196
- lockmode );
4195
+ address = ATExecValidateConstraint (wqueue , rel , cmd -> name , true,
4196
+ false, lockmode );
4197
4197
break ;
4198
4198
case AT_DropConstraint : /* DROP CONSTRAINT */
4199
4199
ATExecDropConstraint (rel , cmd -> name , cmd -> behavior ,
@@ -8474,8 +8474,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
8474
8474
* was already validated, InvalidObjectAddress is returned.
8475
8475
*/
8476
8476
static ObjectAddress
8477
- ATExecValidateConstraint (Relation rel , char * constrName , bool recurse ,
8478
- bool recursing , LOCKMODE lockmode )
8477
+ ATExecValidateConstraint (List * * wqueue , Relation rel , char * constrName ,
8478
+ bool recurse , bool recursing , LOCKMODE lockmode )
8479
8479
{
8480
8480
Relation conrel ;
8481
8481
SysScanDesc scan ;
@@ -8521,27 +8521,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
8521
8521
8522
8522
if (!con -> convalidated )
8523
8523
{
8524
+ AlteredTableInfo * tab ;
8524
8525
HeapTuple copyTuple ;
8525
8526
Form_pg_constraint copy_con ;
8526
8527
8527
8528
if (con -> contype == CONSTRAINT_FOREIGN )
8528
8529
{
8529
- Relation refrel ;
8530
+ NewConstraint * newcon ;
8531
+ Constraint * fkconstraint ;
8530
8532
8531
- /*
8532
- * Triggers are already in place on both tables, so a concurrent
8533
- * write that alters the result here is not possible. Normally we
8534
- * can run a query here to do the validation, which would only
8535
- * require AccessShareLock. In some cases, it is possible that we
8536
- * might need to fire triggers to perform the check, so we take a
8537
- * lock at RowShareLock level just in case.
8538
- */
8539
- refrel = heap_open (con -> confrelid , RowShareLock );
8533
+ /* Queue validation for phase 3 */
8534
+ fkconstraint = makeNode (Constraint );
8535
+ /* for now this is all we need */
8536
+ fkconstraint -> conname = constrName ;
8537
+
8538
+ newcon = (NewConstraint * ) palloc0 (sizeof (NewConstraint ));
8539
+ newcon -> name = constrName ;
8540
+ newcon -> contype = CONSTR_FOREIGN ;
8541
+ newcon -> refrelid = con -> confrelid ;
8542
+ newcon -> refindid = con -> conindid ;
8543
+ newcon -> conid = HeapTupleGetOid (tuple );
8544
+ newcon -> qual = (Node * ) fkconstraint ;
8540
8545
8541
- validateForeignKeyConstraint (constrName , rel , refrel ,
8542
- con -> conindid ,
8543
- HeapTupleGetOid (tuple ));
8544
- heap_close (refrel , NoLock );
8546
+ /* Find or create work queue entry for this table */
8547
+ tab = ATGetQueueEntry (wqueue , rel );
8548
+ tab -> constraints = lappend (tab -> constraints , newcon );
8545
8549
8546
8550
/*
8547
8551
* We disallow creating invalid foreign keys to or from
@@ -8552,6 +8556,10 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
8552
8556
{
8553
8557
List * children = NIL ;
8554
8558
ListCell * child ;
8559
+ NewConstraint * newcon ;
8560
+ bool isnull ;
8561
+ Datum val ;
8562
+ char * conbin ;
8555
8563
8556
8564
/*
8557
8565
* If we're recursing, the parent has already done this, so skip
@@ -8591,12 +8599,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
8591
8599
/* find_all_inheritors already got lock */
8592
8600
childrel = heap_open (childoid , NoLock );
8593
8601
8594
- ATExecValidateConstraint (childrel , constrName , false,
8602
+ ATExecValidateConstraint (wqueue , childrel , constrName , false,
8595
8603
true, lockmode );
8596
8604
heap_close (childrel , NoLock );
8597
8605
}
8598
8606
8599
- validateCheckConstraint (rel , tuple );
8607
+ /* Queue validation for phase 3 */
8608
+ newcon = (NewConstraint * ) palloc0 (sizeof (NewConstraint ));
8609
+ newcon -> name = constrName ;
8610
+ newcon -> contype = CONSTR_CHECK ;
8611
+ newcon -> refrelid = InvalidOid ;
8612
+ newcon -> refindid = InvalidOid ;
8613
+ newcon -> conid = HeapTupleGetOid (tuple );
8614
+
8615
+ val = SysCacheGetAttr (CONSTROID , tuple ,
8616
+ Anum_pg_constraint_conbin , & isnull );
8617
+ if (isnull )
8618
+ elog (ERROR , "null conbin for constraint %u" ,
8619
+ HeapTupleGetOid (tuple ));
8620
+
8621
+ conbin = TextDatumGetCString (val );
8622
+ newcon -> qual = (Node * ) stringToNode (conbin );
8623
+
8624
+ /* Find or create work queue entry for this table */
8625
+ tab = ATGetQueueEntry (wqueue , rel );
8626
+ tab -> constraints = lappend (tab -> constraints , newcon );
8600
8627
8601
8628
/*
8602
8629
* Invalidate relcache so that others see the new validated
@@ -8972,91 +8999,6 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
8972
8999
}
8973
9000
}
8974
9001
8975
- /*
8976
- * Scan the existing rows in a table to verify they meet a proposed
8977
- * CHECK constraint.
8978
- *
8979
- * The caller must have opened and locked the relation appropriately.
8980
- */
8981
- static void
8982
- validateCheckConstraint (Relation rel , HeapTuple constrtup )
8983
- {
8984
- EState * estate ;
8985
- Datum val ;
8986
- char * conbin ;
8987
- Expr * origexpr ;
8988
- ExprState * exprstate ;
8989
- TupleDesc tupdesc ;
8990
- HeapScanDesc scan ;
8991
- HeapTuple tuple ;
8992
- ExprContext * econtext ;
8993
- MemoryContext oldcxt ;
8994
- TupleTableSlot * slot ;
8995
- Form_pg_constraint constrForm ;
8996
- bool isnull ;
8997
- Snapshot snapshot ;
8998
-
8999
- /*
9000
- * VALIDATE CONSTRAINT is a no-op for foreign tables and partitioned
9001
- * tables.
9002
- */
9003
- if (rel -> rd_rel -> relkind == RELKIND_FOREIGN_TABLE ||
9004
- rel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
9005
- return ;
9006
-
9007
- constrForm = (Form_pg_constraint ) GETSTRUCT (constrtup );
9008
-
9009
- estate = CreateExecutorState ();
9010
-
9011
- /*
9012
- * XXX this tuple doesn't really come from a syscache, but this doesn't
9013
- * matter to SysCacheGetAttr, because it only wants to be able to fetch
9014
- * the tupdesc
9015
- */
9016
- val = SysCacheGetAttr (CONSTROID , constrtup , Anum_pg_constraint_conbin ,
9017
- & isnull );
9018
- if (isnull )
9019
- elog (ERROR , "null conbin for constraint %u" ,
9020
- HeapTupleGetOid (constrtup ));
9021
- conbin = TextDatumGetCString (val );
9022
- origexpr = (Expr * ) stringToNode (conbin );
9023
- exprstate = ExecPrepareExpr (origexpr , estate );
9024
-
9025
- econtext = GetPerTupleExprContext (estate );
9026
- tupdesc = RelationGetDescr (rel );
9027
- slot = MakeSingleTupleTableSlot (tupdesc );
9028
- econtext -> ecxt_scantuple = slot ;
9029
-
9030
- snapshot = RegisterSnapshot (GetLatestSnapshot ());
9031
- scan = heap_beginscan (rel , snapshot , 0 , NULL );
9032
-
9033
- /*
9034
- * Switch to per-tuple memory context and reset it for each tuple
9035
- * produced, so we don't leak memory.
9036
- */
9037
- oldcxt = MemoryContextSwitchTo (GetPerTupleMemoryContext (estate ));
9038
-
9039
- while ((tuple = heap_getnext (scan , ForwardScanDirection )) != NULL )
9040
- {
9041
- ExecStoreTuple (tuple , slot , InvalidBuffer , false);
9042
-
9043
- if (!ExecCheck (exprstate , econtext ))
9044
- ereport (ERROR ,
9045
- (errcode (ERRCODE_CHECK_VIOLATION ),
9046
- errmsg ("check constraint \"%s\" is violated by some row" ,
9047
- NameStr (constrForm -> conname )),
9048
- errtableconstraint (rel , NameStr (constrForm -> conname ))));
9049
-
9050
- ResetExprContext (econtext );
9051
- }
9052
-
9053
- MemoryContextSwitchTo (oldcxt );
9054
- heap_endscan (scan );
9055
- UnregisterSnapshot (snapshot );
9056
- ExecDropSingleTupleTableSlot (slot );
9057
- FreeExecutorState (estate );
9058
- }
9059
-
9060
9002
/*
9061
9003
* Scan the existing rows in a table to verify they meet a proposed FK
9062
9004
* constraint.
0 commit comments