Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to dev/1.3] Extract "merge" method for SearchNode #14757

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
Expand All @@ -34,12 +33,6 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
Expand All @@ -50,15 +43,13 @@
import org.apache.iotdb.db.storageengine.dataregion.snapshot.SnapshotTaker;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class DataRegionStateMachine extends BaseStateMachine {

Expand Down Expand Up @@ -155,131 +146,37 @@ public void loadSnapshot(File latestSnapshotRootDir) {
}

protected PlanNode grabPlanNode(IndexedConsensusRequest indexedRequest) {
List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
List<DeleteDataNode> deleteDataNodes = new ArrayList<>();
List<SearchNode> searchNodes = new ArrayList<>();
PlanNode onlyOne = null;
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
PlanNode planNode = getPlanNode(req);
if (planNode instanceof SearchNode) {
((SearchNode) planNode).setSearchIndex(indexedRequest.getSearchIndex());
}
if (planNode instanceof InsertNode) {
insertNodes.add((InsertNode) planNode);
} else if (planNode instanceof DeleteDataNode) {
deleteDataNodes.add((DeleteDataNode) planNode);
} else if (indexedRequest.getRequests().size() == 1) {
// If the planNode is not InsertNode, it is expected that the IndexedConsensusRequest only
// contains one request
return planNode;
searchNodes.add((SearchNode) planNode);
} else {
throw new IllegalArgumentException(
"PlanNodes in IndexedConsensusRequest are not InsertNode and "
+ "the size of requests are larger than 1");
logger.warn("Unexpected PlanNode type {}, which is not SearchNode", planNode.getClass());
if (onlyOne == null) {
onlyOne = planNode;
} else {
throw new IllegalArgumentException(
String.format(
"There are two types of PlanNode in one request: %s and %s",
onlyOne.getClass(), planNode.getClass()));
}
}
}
if (!insertNodes.isEmpty()) {
if (!deleteDataNodes.isEmpty()) {
if (onlyOne != null) {
if (!searchNodes.isEmpty()) {
throw new IllegalArgumentException(
"One indexedRequest cannot contain InsertNode and DeleteDataNode at the same time");
String.format(
"There are two types of PlanNode in one request: %s and SearchNode",
onlyOne.getClass()));
}
return mergeInsertNodes(insertNodes);
}
return mergeDeleteDataNode(deleteDataNodes);
}

private DeleteDataNode mergeDeleteDataNode(List<DeleteDataNode> deleteDataNodes) {
int size = deleteDataNodes.size();
if (size == 0) {
throw new IllegalArgumentException("deleteDataNodes is empty");
}
DeleteDataNode firstOne = deleteDataNodes.get(0);
if (size == 1) {
return firstOne;
}
if (!deleteDataNodes.stream()
.allMatch(
deleteDataNode ->
firstOne.getDeleteStartTime() == deleteDataNode.getDeleteStartTime()
&& firstOne.getDeleteEndTime() == deleteDataNode.getDeleteEndTime())) {
throw new IllegalArgumentException(
"DeleteDataNodes which start time or end time are not same cannot be merged");
}
List<PartialPath> pathList =
deleteDataNodes.stream()
.flatMap(deleteDataNode -> deleteDataNode.getPathList().stream())
// Some time the deleteDataNode list contains a path for multiple times, so use
// distinct() to clear them
.distinct()
.collect(Collectors.toList());
return new DeleteDataNode(
firstOne.getPlanNodeId(),
pathList,
firstOne.getDeleteStartTime(),
firstOne.getDeleteEndTime());
}

/**
* Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
* merged to one multi-tablet). <br>
* Notice: the continuity of insert nodes sharing same search index should be protected by the
* upper layer.
*
* @exception IllegalArgumentException when insertNodes is empty
*/
protected InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
int size = insertNodes.size();
if (size == 0) {
throw new IllegalArgumentException("insertNodes should never be empty");
return onlyOne;
}
if (size == 1) {
return insertNodes.get(0);
}

InsertNode result;
List<Integer> index = new ArrayList<>();
int i = 0;
switch (insertNodes.get(0).getType()) {
case INSERT_TABLET:
// merge to InsertMultiTabletsNode
List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
for (InsertNode insertNode : insertNodes) {
insertTabletNodes.add((InsertTabletNode) insertNode);
index.add(i);
i++;
}
result =
new InsertMultiTabletsNode(
insertNodes.get(0).getPlanNodeId(), index, insertTabletNodes);
break;
case INSERT_ROW:
// merge to InsertRowsNode
List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
for (InsertNode insertNode : insertNodes) {
insertRowNodes.add((InsertRowNode) insertNode);
index.add(i);
i++;
}
result = new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, insertRowNodes);
break;
case INSERT_ROWS:
// merge to InsertRowsNode
List<InsertRowNode> list = new ArrayList<>();
for (InsertNode insertNode : insertNodes) {
for (InsertRowNode insertRowNode : ((InsertRowsNode) insertNode).getInsertRowNodeList()) {
list.add(insertRowNode);
index.add(i);
i++;
}
}
result = new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, list);
break;
default:
throw new UnSupportedDataTypeException(
"Unsupported node type " + insertNodes.get(0).getType());
}
result.setSearchIndex(insertNodes.get(0).getSearchIndex());
result.setDevicePath(insertNodes.get(0).getDevicePath());
return result;
// searchNodes should never be empty here
return searchNodes.get(0).merge(searchNodes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;

import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -171,4 +173,26 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
: new PipeEnrichedDeleteDataNode((DeleteDataNode) plan))
.collect(Collectors.toList());
}

@Override
public void serializeToWAL(final IWALByteBufferView buffer) {
deleteDataNode.serializeToWAL(buffer);
}

@Override
public int serializedSize() {
return deleteDataNode.serializedSize();
}

@Override
public SearchNode merge(List<SearchNode> searchNodes) {
List<SearchNode> unrichedDeleteDataNodes =
searchNodes.stream()
.map(
searchNode ->
(SearchNode) ((PipeEnrichedDeleteDataNode) searchNode).getDeleteDataNode())
.collect(Collectors.toList());
return new PipeEnrichedDeleteDataNode(
(DeleteDataNode) deleteDataNode.merge(unrichedDeleteDataNodes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
.collect(Collectors.toList());
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
return insertNode.mergeInsertNode(insertNodes);
}

@Override
public TRegionReplicaSet getDataRegionReplicaSet() {
return insertNode.getDataRegionReplicaSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public ContinuousSameSearchIndexSeparatorNode(PlanNodeId id) {
this.searchIndex = -1;
}

@Override
public SearchNode merge(List<SearchNode> searchNodes) {
throw new UnsupportedOperationException(
"ContinuousSameSearchIndexSeparatorNode not support merge");
}

@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort(PlanNodeType.CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR.getNodeType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,40 @@ private void splitPathPatternByDevice(
.addAll(pathPattern.alterPrefixPath(devicePath)));
}
}

@Override
public SearchNode merge(List<SearchNode> searchNodes) {
List<DeleteDataNode> deleteDataNodes =
searchNodes.stream()
.map(searchNode -> (DeleteDataNode) searchNode)
.collect(Collectors.toList());
int size = deleteDataNodes.size();
if (size == 0) {
throw new IllegalArgumentException("deleteDataNodes is empty");
}
DeleteDataNode firstOne = deleteDataNodes.get(0);
if (size == 1) {
return firstOne;
}
if (!deleteDataNodes.stream()
.allMatch(
deleteDataNode ->
firstOne.getDeleteStartTime() == deleteDataNode.getDeleteStartTime()
&& firstOne.getDeleteEndTime() == deleteDataNode.getDeleteEndTime())) {
throw new IllegalArgumentException(
"DeleteDataNodes which start time or end time are not same cannot be merged");
}
List<PartialPath> pathList =
deleteDataNodes.stream()
.flatMap(deleteDataNode -> deleteDataNode.getPathList().stream())
// Some time the deleteDataNode list contains a path for multiple times, so use
// distinct() to clear them
.distinct()
.collect(Collectors.toList());
return new DeleteDataNode(
firstOne.getPlanNodeId(),
pathList,
firstOne.getDeleteStartTime(),
firstOne.getDeleteEndTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public InsertMultiTabletsNode(PlanNodeId id) {
insertTabletNodeList = new ArrayList<>();
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
throw new UnsupportedOperationException("InsertMultiTabletsNode not support merge");
}

public InsertMultiTabletsNode(
PlanNodeId id,
List<Integer> parentInsertTabletNodeIndexList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public abstract class InsertNode extends SearchNode implements ComparableConsensusRequest {

Expand Down Expand Up @@ -83,6 +85,26 @@ protected InsertNode(PlanNodeId id) {
super(id);
}

@Override
public final SearchNode merge(List<SearchNode> searchNodes) {
if (searchNodes.isEmpty()) {
throw new IllegalArgumentException("insertNodes should never be empty");
}
if (searchNodes.size() == 1) {
return searchNodes.get(0);
}
List<InsertNode> insertNodes =
searchNodes.stream()
.map(searchNode -> (InsertNode) searchNode)
.collect(Collectors.toList());
InsertNode result = mergeInsertNode(insertNodes);
result.setSearchIndex(insertNodes.get(0).getSearchIndex());
result.setDevicePath(insertNodes.get(0).getDevicePath());
return result;
}

public abstract InsertNode mergeInsertNode(List<InsertNode> insertNodes);

protected InsertNode(
PlanNodeId id,
PartialPath devicePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -73,6 +74,17 @@ public InsertRowNode(PlanNodeId id) {
super(id);
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
List<Integer> index = new ArrayList<>();
List<InsertRowNode> insertRowNodes = new ArrayList<>();
for (int i = 0; i < insertNodes.size(); i++) {
insertRowNodes.add((InsertRowNode) insertNodes.get(i));
index.add(i);
}
return new InsertRowsNode(this.getPlanNodeId(), index, insertRowNodes);
}

@TestOnly
public InsertRowNode(
PlanNodeId id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ public InsertRowsNode(PlanNodeId id) {
insertRowNodeIndexList = new ArrayList<>();
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
List<InsertRowNode> list = new ArrayList<>();
List<Integer> index = new ArrayList<>();
int i = 0;
for (InsertNode insertNode : insertNodes) {
for (InsertRowNode insertRowNode : ((InsertRowsNode) insertNode).getInsertRowNodeList()) {
list.add(insertRowNode);
index.add(i);
i++;
}
}
return new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, list);
}

public InsertRowsNode(
PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
super(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public InsertRowsOfOneDeviceNode(PlanNodeId id) {
insertRowNodeList = new ArrayList<>();
}

@Override
public InsertNode mergeInsertNode(List<InsertNode> insertNodes) {
throw new UnsupportedOperationException("InsertRowsOfOneDeviceNode not support merge");
}

public InsertRowsOfOneDeviceNode(
PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
super(id);
Expand Down
Loading
Loading