Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix predicate pushdown when dealing with ProjectNode #14754

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public LogicalOptimizeFactory(PlannerContext plannerContext) {
new IterativeOptimizer(
plannerContext, ruleStats, ImmutableSet.of(new PruneDistinctAggregation())),
simplifyOptimizer,
new PushPredicateIntoTableScan(),
new PushPredicateIntoTableScan(plannerContext, typeAnalyzer),
// Currently, Distinct is not supported, so we cant use this rule for now.
// new IterativeOptimizer(
// plannerContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
import org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedAlignedDeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
import org.apache.iotdb.db.queryengine.plan.relational.planner.EqualityInference;
import org.apache.iotdb.db.queryengine.plan.relational.planner.IrTypeAnalyzer;
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ReplaceSymbolInExpression;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
Expand Down Expand Up @@ -78,6 +81,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -90,6 +95,7 @@
import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.TABLE_TYPE;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.ExpressionSymbolInliner.inlineSymbols;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_FIRST;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_LAST;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor.extractUnique;
Expand All @@ -98,6 +104,9 @@
import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.combineConjuncts;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractConjuncts;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.filterDeterministicConjuncts;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.isEffectivelyLiteral;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.CanonicalizeExpressionRewriter.canonicalizeExpression;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChildReplacer.replaceChildren;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.FULL;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.FULL_JOIN_ONLY_SUPPORT_EQUI_JOIN;
Expand Down Expand Up @@ -136,14 +145,25 @@ public class PushPredicateIntoTableScan implements PlanOptimizer {

private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();

private final PlannerContext plannerContext;

private final IrTypeAnalyzer typeAnalyzer;

public PushPredicateIntoTableScan(PlannerContext plannerContext, IrTypeAnalyzer typeAnalyzer) {
this.plannerContext = plannerContext;
this.typeAnalyzer = typeAnalyzer;
}

@Override
public PlanNode optimize(PlanNode plan, Context context) {
return plan.accept(
new Rewriter(
context.getQueryContext(),
context.getAnalysis(),
context.getMetadata(),
context.getSymbolAllocator()),
context.getSymbolAllocator(),
plannerContext,
typeAnalyzer),
new RewriteContext(TRUE_LITERAL));
}

Expand All @@ -153,17 +173,23 @@ private static class Rewriter extends PlanVisitor<PlanNode, RewriteContext> {
private final Metadata metadata;
private final SymbolAllocator symbolAllocator;
private final QueryId queryId;
private final PlannerContext plannerContext;
private final IrTypeAnalyzer typeAnalyzer;

Rewriter(
MPPQueryContext queryContext,
Analysis analysis,
Metadata metadata,
SymbolAllocator symbolAllocator) {
SymbolAllocator symbolAllocator,
PlannerContext plannerContext,
IrTypeAnalyzer typeAnalyzer) {
this.queryContext = queryContext;
this.analysis = analysis;
this.metadata = metadata;
this.symbolAllocator = symbolAllocator;
this.queryId = queryContext.getQueryId();
this.plannerContext = plannerContext;
this.typeAnalyzer = typeAnalyzer;
}

@Override
Expand Down Expand Up @@ -200,9 +226,98 @@ public PlanNode visitProject(ProjectNode node, RewriteContext context) {
}
}

// TODO(beyyes) in some situation, predicate can not be pushed down below ProjectNode
node.setChild(node.getChild().accept(this, context));
return node;
Set<Symbol> deterministicSymbols =
node.getAssignments().entrySet().stream()
.filter(entry -> isDeterministic(entry.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());

Predicate<Expression> deterministic =
conjunct -> deterministicSymbols.containsAll(extractUnique(conjunct));

Expression inheritedPredicate =
context.inheritedPredicate != null ? context.inheritedPredicate : TRUE_LITERAL;
Map<Boolean, List<Expression>> conjuncts =
extractConjuncts(inheritedPredicate).stream()
.collect(Collectors.partitioningBy(deterministic));

// Push down conjuncts from the inherited predicate that only depend on deterministic
// assignments with
// certain limitations.
List<Expression> deterministicConjuncts = conjuncts.get(true);

// We partition the expressions in the deterministicConjuncts into two lists, and only inline
// the
// expressions that are in the inlining targets list.
Map<Boolean, List<Expression>> inlineConjuncts =
deterministicConjuncts.stream()
.collect(
Collectors.partitioningBy(expression -> isInliningCandidate(expression, node)));

List<Expression> inlinedDeterministicConjuncts =
inlineConjuncts.get(true).stream()
.map(entry -> inlineSymbols(node.getAssignments().getMap(), entry))
.map(
conjunct ->
canonicalizeExpression(
conjunct,
typeAnalyzer,
queryContext.getTypeProvider(),
plannerContext,
queryContext
.getSession())) // normalize expressions to a form that unwrapCasts
// understands
// no need for now
// .map(conjunct -> unwrapCasts(session, plannerContext, typeAnalyzer, types,
// conjunct))
.collect(Collectors.toList());

PlanNode rewrittenChild =
node.getChild()
.accept(this, new RewriteContext(combineConjuncts(inlinedDeterministicConjuncts)));

PlanNode rewrittenNode = replaceChildren(node, ImmutableList.of(rewrittenChild));

// All deterministic conjuncts that contains non-inlining targets, and non-deterministic
// conjuncts,
// if any, will be in the filter node.
List<Expression> nonInliningConjuncts = inlineConjuncts.get(false);
nonInliningConjuncts.addAll(conjuncts.get(false));

if (!nonInliningConjuncts.isEmpty()) {
rewrittenNode =
new FilterNode(
queryId.genPlanNodeId(), rewrittenNode, combineConjuncts(nonInliningConjuncts));
}

return rewrittenNode;
}

private boolean isInliningCandidate(Expression expression, ProjectNode node) {
// TryExpressions should not be pushed down. However they are now being handled as lambda
// passed to a FunctionCall now and should not affect predicate push down. So we want to make
// sure the conjuncts are not TryExpressions.
// verify(AstUtils.preOrder(expression).noneMatch(TryExpression.class::isInstance));

// candidate symbols for inlining are
// 1. references to simple constants or symbol references
// 2. references to complex expressions that appear only once
// which come from the node, as opposed to an enclosing scope.
Set<Symbol> childOutputSet = ImmutableSet.copyOf(node.getOutputSymbols());
Map<Symbol, Long> dependencies =
SymbolsExtractor.extractAll(expression).stream()
.filter(childOutputSet::contains)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

return dependencies.entrySet().stream()
.allMatch(
entry ->
entry.getValue() == 1
|| isEffectivelyLiteral(
node.getAssignments().get(entry.getKey()),
plannerContext,
queryContext.getSession())
|| node.getAssignments().get(entry.getKey()) instanceof SymbolReference);
}

@Override
Expand All @@ -212,7 +327,7 @@ public PlanNode visitFilter(FilterNode node, RewriteContext context) {
Expression predicate = combineConjuncts(node.getPredicate(), context.inheritedPredicate);

// when exist diff function, predicate can not be pushed down into DeviceTableScanNode
if (containsDiffFunction(predicate) || canNotPushDownBelowProjectNode(node, predicate)) {
if (containsDiffFunction(predicate)) {
node.setChild(node.getChild().accept(this, new RewriteContext()));
node.setPredicate(predicate);
context.inheritedPredicate = TRUE_LITERAL;
Expand All @@ -234,35 +349,6 @@ public PlanNode visitFilter(FilterNode node, RewriteContext context) {
return node;
}

private boolean canNotPushDownBelowProjectNode(FilterNode node, Expression predicate) {
PlanNode child = node.getChild();
if (child instanceof ProjectNode) {
// if the inheritedPredicate is not in the output of the child of ProjectNode, we can not
// push it down for now.
// (predicate will be computed by the ProjectNode, Trino will rewrite the predicate in
// visitProject, but we have not implemented this for now.)
Set<Symbol> outputSymbolsOfProjectChild =
new HashSet<>(((ProjectNode) child).getChild().getOutputSymbols());
return missingTermsInOutputSymbols(predicate, outputSymbolsOfProjectChild);
}
return false;
}

private boolean missingTermsInOutputSymbols(Expression expression, Set<Symbol> outputSymbols) {
if (expression instanceof SymbolReference) {
return !outputSymbols.contains(Symbol.from(expression));
}
if (!expression.getChildren().isEmpty()) {
for (Node node : expression.getChildren()) {
if (missingTermsInOutputSymbols((Expression) node, outputSymbols)) {
return true;
}
}
}

return false;
}

// private boolean areExpressionsEquivalent(
// Expression leftExpression, Expression rightExpression) {
// return false;
Expand Down
Loading
Loading