Skip to content

Commit

Permalink
Add some ITs for aggregation in Table Model
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 authored Oct 17, 2024
1 parent 85e0e15 commit 2a28e10
Show file tree
Hide file tree
Showing 7 changed files with 925 additions and 98 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,6 @@ && isIntegerNumber(argumentTypes.get(2)))) {
switch (functionName.toLowerCase(Locale.ENGLISH)) {
case SqlConstant.AVG:
case SqlConstant.SUM:
case SqlConstant.EXTREME:
case SqlConstant.MIN:
case SqlConstant.MAX:
case SqlConstant.STDDEV:
case SqlConstant.STDDEV_POP:
case SqlConstant.STDDEV_SAMP:
Expand All @@ -543,6 +540,9 @@ && isIntegerNumber(argumentTypes.get(2)))) {
functionName));
}
break;
case SqlConstant.EXTREME:
case SqlConstant.MIN:
case SqlConstant.MAX:
case SqlConstant.MODE:
if (argumentTypes.size() != 1) {
throw new SemanticException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
Expand Down Expand Up @@ -68,6 +69,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -78,6 +80,7 @@
import static org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.SEPARATOR;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TransformSortToStreamSort.isOrderByAllIdsAndTime;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.split;
import static org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction.DATE_BIN;
import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
Expand Down Expand Up @@ -317,9 +320,13 @@ public List<PlanNode> visitSort(SortNode node, PlanContext context) {
new MergeSortNode(
queryId.genPlanNodeId(), node.getOrderingScheme(), node.getOutputSymbols());
for (PlanNode child : childrenNodes) {
SortNode subSortNode =
new SortNode(queryId.genPlanNodeId(), child, node.getOrderingScheme(), false, false);
mergeSortNode.addChild(subSortNode);
if (canSortEliminated(node.getOrderingScheme(), nodeOrderingMap.get(child.getPlanNodeId()))) {
mergeSortNode.addChild(child);
} else {
SortNode subSortNode =
new SortNode(queryId.genPlanNodeId(), child, node.getOrderingScheme(), false, false);
mergeSortNode.addChild(subSortNode);
}
}
nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), mergeSortNode.getOrderingScheme());

Expand Down Expand Up @@ -358,7 +365,8 @@ public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext context)

List<PlanNode> childrenNodes = node.getChild().accept(this, context);
if (childrenNodes.size() == 1) {
if (canSortEliminated(node.getOrderingScheme(), nodeOrderingMap.get(childrenNodes.get(0)))) {
if (canSortEliminated(
node.getOrderingScheme(), nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()))) {
return childrenNodes;
} else {
node.setChild(childrenNodes.get(0));
Expand All @@ -371,15 +379,19 @@ public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext context)
new MergeSortNode(
queryId.genPlanNodeId(), node.getOrderingScheme(), node.getOutputSymbols());
for (PlanNode child : childrenNodes) {
StreamSortNode subSortNode =
new StreamSortNode(
queryId.genPlanNodeId(),
child,
node.getOrderingScheme(),
false,
node.isOrderByAllIdsAndTime(),
node.getStreamCompareKeyEndIndex());
mergeSortNode.addChild(subSortNode);
if (canSortEliminated(node.getOrderingScheme(), nodeOrderingMap.get(child.getPlanNodeId()))) {
mergeSortNode.addChild(child);
} else {
StreamSortNode subSortNode =
new StreamSortNode(
queryId.genPlanNodeId(),
child,
node.getOrderingScheme(),
false,
node.isOrderByAllIdsAndTime(),
node.getStreamCompareKeyEndIndex());
mergeSortNode.addChild(subSortNode);
}
}
nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), mergeSortNode.getOrderingScheme());

Expand Down Expand Up @@ -502,14 +514,17 @@ public List<PlanNode> visitTableScan(TableScanNode node, PlanContext context) {

@Override
public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context) {
OrderingScheme expectedOrderingSchema = null;
if (node.isStreamable()) {
context.setExpectedOrderingScheme(constructOrderingSchema(node.getPreGroupedSymbols()));
expectedOrderingSchema = constructOrderingSchema(node.getPreGroupedSymbols());
context.setExpectedOrderingScheme(expectedOrderingSchema);
}
List<PlanNode> childrenNodes = node.getChild().accept(this, context);
OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
if (childOrdering != null) {
nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
}
// TODO add back while implementing StreamingAggregationOperator
// if (childOrdering != null) {
// nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
// }

if (childrenNodes.size() == 1) {
node.setChild(childrenNodes.get(0));
Expand All @@ -522,18 +537,28 @@ public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context
childrenNodes =
childrenNodes.stream()
.map(
child ->
new AggregationNode(
queryId.genPlanNodeId(),
child,
intermediate.getAggregations(),
intermediate.getGroupingSets(),
intermediate.getPreGroupedSymbols(),
intermediate.getStep(),
intermediate.getHashSymbol(),
intermediate.getGroupIdSymbol()))
child -> {
PlanNodeId planNodeId = queryId.genPlanNodeId();
AggregationNode aggregationNode =
new AggregationNode(
planNodeId,
child,
intermediate.getAggregations(),
intermediate.getGroupingSets(),
intermediate.getPreGroupedSymbols(),
intermediate.getStep(),
intermediate.getHashSymbol(),
intermediate.getGroupIdSymbol());
// TODO add back while implementing StreamingAggregationOperator
// if (node.isStreamable()) {
// nodeOrderingMap.put(planNodeId, childOrdering);
// }
return aggregationNode;
})
.collect(Collectors.toList());
splitResult.left.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, childrenNodes));
splitResult.left.setChild(
mergeChildrenViaCollectOrMergeSort(
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()), childrenNodes));
return Collections.singletonList(splitResult.left);
}

Expand Down Expand Up @@ -707,6 +732,7 @@ private void processSortProperty(
final List<SortOrder> newSortOrders = new ArrayList<>();
final OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme;

boolean lastIsTimeRelated = false;
for (final Symbol symbol : expectedOrderingScheme.getOrderBy()) {
if (timeRelatedSymbol(symbol)) {
if (!expectedOrderingScheme.getOrderings().get(symbol).isAscending()) {
Expand All @@ -716,6 +742,7 @@ private void processSortProperty(
}
newOrderingSymbols.add(symbol);
newSortOrders.add(expectedOrderingScheme.getOrdering(symbol));
lastIsTimeRelated = true;
break;
} else if (!tableScanNode.getIdAndAttributeIndexMap().containsKey(symbol)) {
break;
Expand Down Expand Up @@ -788,21 +815,62 @@ private void processSortProperty(
}
}

final OrderingScheme newOrderingScheme =
new OrderingScheme(
final Optional<OrderingScheme> newOrderingScheme =
tableScanOrderingSchema(
analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName()),
newOrderingSymbols,
IntStream.range(0, newOrderingSymbols.size())
.boxed()
.collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get)));
newSortOrders,
lastIsTimeRelated,
tableScanNode.getDeviceEntries().size() == 1);
for (final PlanNode planNode : resultTableScanNodeList) {
final TableScanNode scanNode = (TableScanNode) planNode;
nodeOrderingMap.put(scanNode.getPlanNodeId(), newOrderingScheme);
newOrderingScheme.ifPresent(
orderingScheme -> nodeOrderingMap.put(scanNode.getPlanNodeId(), orderingScheme));
if (comparator != null) {
scanNode.getDeviceEntries().sort(comparator);
}
}
}

private Optional<OrderingScheme> tableScanOrderingSchema(
Map<Symbol, ColumnSchema> tableColumnSchema,
List<Symbol> newOrderingSymbols,
List<SortOrder> newSortOrders,
boolean lastIsTimeRelated,
boolean isSingleDevice) {

if (isSingleDevice || !lastIsTimeRelated) {
return Optional.of(
new OrderingScheme(
newOrderingSymbols,
IntStream.range(0, newOrderingSymbols.size())
.boxed()
.collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))));
} else { // table scan node has more than one device and last order item is time related
int size = newOrderingSymbols.size();
if (size == 1) {
return Optional.empty();
}
OrderingScheme orderingScheme =
new OrderingScheme(
newOrderingSymbols.subList(0, size - 1),
IntStream.range(0, size - 1)
.boxed()
.collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get)));
if (isOrderByAllIdsAndTime(
tableColumnSchema, orderingScheme, size - 2)) { // all id columns included
return Optional.of(
new OrderingScheme(
newOrderingSymbols,
IntStream.range(0, newOrderingSymbols.size())
.boxed()
.collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))));
} else { // remove the last time column related
return Optional.of(orderingScheme);
}
}
}

// time column or push down date_bin function call in agg which should only have one such column
private boolean timeRelatedSymbol(Symbol symbol) {
return TIMESTAMP_STR.equalsIgnoreCase(symbol.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
Expand All @@ -44,20 +43,7 @@
*
* <pre>
* - TopK (limit = x, order by a, b)
* </pre>
*
* <pre>
* - Limit (limit = x)
* - MergeSort (order by a, b)
* - StreamSort (order by a, b)
* </pre>
*
* Into:
*
* <pre>
* - TopK (limit = x, order by a, b)
* - Limit (limit = x)
* - StreamSort (order by a, b)
* </pre>
*
* Applies to LimitNode without ties only.
Expand Down Expand Up @@ -85,36 +71,19 @@ public Result apply(LimitNode parent, Captures captures, Context context) {

static TopKNode transformByMergeSortNode(
LimitNode parent, MergeSortNode mergeSortNode, PlanNode childOfMergeSort, Context context) {
TopKNode topKNode;
if (childOfMergeSort instanceof StreamSortNode) {
topKNode =
new TopKNode(
parent.getPlanNodeId(),
mergeSortNode.getOrderingScheme(),
parent.getCount(),
childOfMergeSort.getOutputSymbols(),
true);
for (PlanNode child : mergeSortNode.getChildren()) {
LimitNode limitNode =
new LimitNode(
context.getIdAllocator().genPlanNodeId(),
child,
parent.getCount(),
Optional.empty());
topKNode.addChild(limitNode);
}

} else {
topKNode =
new TopKNode(
parent.getPlanNodeId(),
mergeSortNode.getChildren(),
mergeSortNode.getOrderingScheme(),
parent.getCount(),
childOfMergeSort.getOutputSymbols(),
true);
TopKNode topKNode =
new TopKNode(
parent.getPlanNodeId(),
mergeSortNode.getOrderingScheme(),
parent.getCount(),
childOfMergeSort.getOutputSymbols(),
true);
for (PlanNode child : mergeSortNode.getChildren()) {
LimitNode limitNode =
new LimitNode(
context.getIdAllocator().genPlanNodeId(), child, parent.getCount(), Optional.empty());
topKNode.addChild(limitNode);
}

return topKNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,6 @@ public PlanNode visitSort(SortNode node, Context context) {
return node;
}

private boolean isOrderByAllIdsAndTime(
Map<Symbol, ColumnSchema> tableColumnSchema,
OrderingScheme orderingScheme,
int streamSortIndex) {
for (Map.Entry<Symbol, ColumnSchema> entry : tableColumnSchema.entrySet()) {
if (entry.getValue().getColumnCategory() == TsTableColumnCategory.ID
&& !orderingScheme.getOrderings().containsKey(entry.getKey())) {
return false;
}
}
return orderingScheme.getOrderings().size() == streamSortIndex + 1
|| TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(
orderingScheme.getOrderBy().get(streamSortIndex + 1).getName());
}

@Override
public PlanNode visitTableScan(TableScanNode node, Context context) {
context.setTableScanNode(node);
Expand All @@ -154,6 +139,21 @@ public PlanNode visitAggregationTableScan(AggregationTableScanNode node, Context
}
}

public static boolean isOrderByAllIdsAndTime(
Map<Symbol, ColumnSchema> tableColumnSchema,
OrderingScheme orderingScheme,
int streamSortIndex) {
for (Map.Entry<Symbol, ColumnSchema> entry : tableColumnSchema.entrySet()) {
if (entry.getValue().getColumnCategory() == TsTableColumnCategory.ID
&& !orderingScheme.getOrderings().containsKey(entry.getKey())) {
return false;
}
}
return orderingScheme.getOrderings().size() == streamSortIndex + 1
|| TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(
orderingScheme.getOrderBy().get(streamSortIndex + 1).getName());
}

private static class Context {
private TableScanNode tableScanNode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.expression;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.singleGroupingSet;
Expand Down Expand Up @@ -167,7 +166,7 @@ public void noPushDownTest() {
ImmutableList.of("tag1", "tag2", "tag3"), // Streamable
Optional.empty(),
FINAL,
mergeSort(
collect(
exchange(),
aggregation(
singleGroupingSet("s1", "tag1", "tag2", "tag3", "time"),
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
<tsfile.version>1.2.0-53d247f-SNAPSHOT</tsfile.version>
<tsfile.version>1.2.0-d28fef17-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
Expand Down

0 comments on commit 2a28e10

Please sign in to comment.