Skip to content

Commit

Permalink
Extract "merge" method for SearchNode (#14736)
Browse files Browse the repository at this point in the history
* done && try to fix IoTDBPipeInclusionIT.testPureDeleteInclusion

done

* Tan review
  • Loading branch information
liyuheng55555 authored Jan 22, 2025
1 parent 9be5eeb commit 765cbb2
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
Expand All @@ -34,12 +33,6 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
Expand All @@ -50,15 +43,13 @@
import org.apache.iotdb.db.storageengine.dataregion.snapshot.SnapshotTaker;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class DataRegionStateMachine extends BaseStateMachine {

Expand Down Expand Up @@ -155,132 +146,37 @@ public void loadSnapshot(File latestSnapshotRootDir) {
}

protected PlanNode grabPlanNode(IndexedConsensusRequest indexedRequest) {
List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
List<DeleteDataNode> deleteDataNodes = new ArrayList<>();
List<SearchNode> searchNodes = new ArrayList<>();
PlanNode onlyOne = null;
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
PlanNode planNode = getPlanNode(req);
if (planNode instanceof SearchNode) {
((SearchNode) planNode).setSearchIndex(indexedRequest.getSearchIndex());
}
if (planNode instanceof InsertNode) {
insertNodes.add((InsertNode) planNode);
} else if (planNode instanceof DeleteDataNode) {
deleteDataNodes.add((DeleteDataNode) planNode);
} else if (indexedRequest.getRequests().size() == 1) {
// If the planNode is not InsertNode, it is expected that the IndexedConsensusRequest only
// contains one request
return planNode;
searchNodes.add((SearchNode) planNode);
} else {
throw new IllegalArgumentException(
"PlanNodes in IndexedConsensusRequest are not InsertNode and "
+ "the size of requests are larger than 1");
logger.warn("Unexpected PlanNode type {}, which is not SearchNode", planNode.getClass());
if (onlyOne == null) {
onlyOne = planNode;
} else {
throw new IllegalArgumentException(
String.format(
"There are two types of PlanNode in one request: %s and %s",
onlyOne.getClass(), planNode.getClass()));
}
}
}
if (!insertNodes.isEmpty()) {
if (!deleteDataNodes.isEmpty()) {
if (onlyOne != null) {
if (!searchNodes.isEmpty()) {
throw new IllegalArgumentException(
"One indexedRequest cannot contain InsertNode and DeleteDataNode at the same time");
String.format(
"There are two types of PlanNode in one request: %s and SearchNode",
onlyOne.getClass()));
}
return mergeInsertNodes(insertNodes);
}
return mergeDeleteDataNode(deleteDataNodes);
}

private DeleteDataNode mergeDeleteDataNode(List<DeleteDataNode> deleteDataNodes) {
int size = deleteDataNodes.size();
if (size == 0) {
throw new IllegalArgumentException("deleteDataNodes is empty");
}
DeleteDataNode firstOne = deleteDataNodes.get(0);
if (size == 1) {
return firstOne;
}
if (!deleteDataNodes.stream()
.allMatch(
deleteDataNode ->
firstOne.getDeleteStartTime() == deleteDataNode.getDeleteStartTime()
&& firstOne.getDeleteEndTime() == deleteDataNode.getDeleteEndTime())) {
throw new IllegalArgumentException(
"DeleteDataNodes which start time or end time are not same cannot be merged");
}
List<MeasurementPath> pathList =
deleteDataNodes.stream()
.flatMap(deleteDataNode -> deleteDataNode.getPathList().stream())
// Some time the deleteDataNode list contains a path for multiple times, so use
// distinct() to clear them
.distinct()
.collect(Collectors.toList());
return new DeleteDataNode(
firstOne.getPlanNodeId(),
pathList,
firstOne.getDeleteStartTime(),
firstOne.getDeleteEndTime());
}

/**
* Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
* merged to one multi-tablet). <br>
* Notice: the continuity of insert nodes sharing same search index should be protected by the
* upper layer.
*
* @exception IllegalArgumentException when insertNodes is empty
*/
protected InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
int size = insertNodes.size();
if (size == 0) {
throw new IllegalArgumentException("insertNodes should never be empty");
return onlyOne;
}
if (size == 1) {
return insertNodes.get(0);
}

InsertNode result;
List<Integer> index = new ArrayList<>();
int i = 0;
switch (insertNodes.get(0).getType()) {
case RELATIONAL_INSERT_TABLET:
case INSERT_TABLET:
// merge to InsertMultiTabletsNode
List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
for (InsertNode insertNode : insertNodes) {
insertTabletNodes.add((InsertTabletNode) insertNode);
index.add(i);
i++;
}
result =
new InsertMultiTabletsNode(
insertNodes.get(0).getPlanNodeId(), index, insertTabletNodes);
break;
case INSERT_ROW:
// merge to InsertRowsNode
List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
for (InsertNode insertNode : insertNodes) {
insertRowNodes.add((InsertRowNode) insertNode);
index.add(i);
i++;
}
result = new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, insertRowNodes);
break;
case INSERT_ROWS:
// merge to InsertRowsNode
List<InsertRowNode> list = new ArrayList<>();
for (InsertNode insertNode : insertNodes) {
for (InsertRowNode insertRowNode : ((InsertRowsNode) insertNode).getInsertRowNodeList()) {
list.add(insertRowNode);
index.add(i);
i++;
}
}
result = new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, list);
break;
default:
throw new UnSupportedDataTypeException(
"Unsupported node type " + insertNodes.get(0).getType());
}
result.setSearchIndex(insertNodes.get(0).getSearchIndex());
result.setTargetPath(insertNodes.get(0).getTargetPath());
return result;
// searchNodes should never be empty here
return searchNodes.get(0).merge(searchNodes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;

import java.io.DataOutputStream;
Expand Down Expand Up @@ -195,4 +196,16 @@ public void serializeToWAL(final IWALByteBufferView buffer) {
public int serializedSize() {
return deleteDataNode.serializedSize();
}

@Override
public SearchNode merge(List<SearchNode> searchNodes) {
List<SearchNode> unrichedDeleteDataNodes =
searchNodes.stream()
.map(
searchNode ->
(SearchNode) ((PipeEnrichedDeleteDataNode) searchNode).getDeleteDataNode())
.collect(Collectors.toList());
return new PipeEnrichedDeleteDataNode(
(DeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public List<WritePlanNode> splitByPartition(final IAnalysis analysis) {
.collect(Collectors.toList());
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
return insertNode.mergeInsertNode(insertNodes);
}

@Override
public TRegionReplicaSet getDataRegionReplicaSet() {
return insertNode.getDataRegionReplicaSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public ContinuousSameSearchIndexSeparatorNode(PlanNodeId id) {
this.searchIndex = -1;
}

@Override
public SearchNode merge(List<SearchNode> searchNodes) {
throw new UnsupportedOperationException(
"ContinuousSameSearchIndexSeparatorNode not support merge");
}

@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort(PlanNodeType.CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR.getNodeType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,40 @@ private void splitPathPatternByDevice(
.collect(Collectors.toList())));
}
}

@Override
public SearchNode merge(List<SearchNode> searchNodes) {
List<DeleteDataNode> deleteDataNodes =
searchNodes.stream()
.map(searchNode -> (DeleteDataNode) searchNode)
.collect(Collectors.toList());
int size = deleteDataNodes.size();
if (size == 0) {
throw new IllegalArgumentException("deleteDataNodes is empty");
}
DeleteDataNode firstOne = deleteDataNodes.get(0);
if (size == 1) {
return firstOne;
}
if (!deleteDataNodes.stream()
.allMatch(
deleteDataNode ->
firstOne.getDeleteStartTime() == deleteDataNode.getDeleteStartTime()
&& firstOne.getDeleteEndTime() == deleteDataNode.getDeleteEndTime())) {
throw new IllegalArgumentException(
"DeleteDataNodes which start time or end time are not same cannot be merged");
}
List<MeasurementPath> pathList =
deleteDataNodes.stream()
.flatMap(deleteDataNode -> deleteDataNode.getPathList().stream())
// Some time the deleteDataNode list contains a path for multiple times, so use
// distinct() to clear them
.distinct()
.collect(Collectors.toList());
return new DeleteDataNode(
firstOne.getPlanNodeId(),
pathList,
firstOne.getDeleteStartTime(),
firstOne.getDeleteEndTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public InsertMultiTabletsNode(PlanNodeId id) {
insertTabletNodeList = new ArrayList<>();
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
throw new UnsupportedOperationException("InsertMultiTabletsNode not support merge");
}

public InsertMultiTabletsNode(
PlanNodeId id,
List<Integer> parentInsertTabletNodeIndexList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public abstract class InsertNode extends SearchNode {

Expand Down Expand Up @@ -94,6 +95,26 @@ protected InsertNode(PlanNodeId id) {
super(id);
}

@Override
public final SearchNode merge(List<SearchNode> searchNodes) {
if (searchNodes.isEmpty()) {
throw new IllegalArgumentException("insertNodes should never be empty");
}
if (searchNodes.size() == 1) {
return searchNodes.get(0);
}
List<InsertNode> insertNodes =
searchNodes.stream()
.map(searchNode -> (InsertNode) searchNode)
.collect(Collectors.toList());
InsertNode result = mergeInsertNode(insertNodes);
result.setSearchIndex(insertNodes.get(0).getSearchIndex());
result.setTargetPath(insertNodes.get(0).getTargetPath());
return result;
}

public abstract InsertNode mergeInsertNode(List<InsertNode> insertNodes);

protected InsertNode(
PlanNodeId id,
PartialPath devicePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -74,6 +75,17 @@ public InsertRowNode(PlanNodeId id) {
super(id);
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
List<Integer> index = new ArrayList<>();
List<InsertRowNode> insertRowNodes = new ArrayList<>();
for (int i = 0; i < insertNodes.size(); i++) {
insertRowNodes.add((InsertRowNode) insertNodes.get(i));
index.add(i);
}
return new InsertRowsNode(this.getPlanNodeId(), index, insertRowNodes);
}

@TestOnly
public InsertRowNode(
PlanNodeId id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ public InsertRowsNode(PlanNodeId id) {
insertRowNodeIndexList = new ArrayList<>();
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
List<InsertRowNode> list = new ArrayList<>();
List<Integer> index = new ArrayList<>();
int i = 0;
for (InsertNode insertNode : insertNodes) {
for (InsertRowNode insertRowNode : ((InsertRowsNode) insertNode).getInsertRowNodeList()) {
list.add(insertRowNode);
index.add(i);
i++;
}
}
return new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, list);
}

public InsertRowsNode(
PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
super(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public InsertRowsOfOneDeviceNode(PlanNodeId id) {
insertRowNodeList = new ArrayList<>();
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
throw new UnsupportedOperationException("InsertRowsOfOneDeviceNode not support merge");
}

public InsertRowsOfOneDeviceNode(
PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
super(id);
Expand Down
Loading

0 comments on commit 765cbb2

Please sign in to comment.