Skip to content

Commit

Permalink
Pipe: Fix insertNode.getDevicePath() is not handled correctly for ins…
Browse files Browse the repository at this point in the history
…ertRowsNode (NPE) (#12569)
  • Loading branch information
Caideyipi authored May 22, 2024
1 parent 8150f7b commit 7d4dd9e
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -915,17 +915,17 @@ public void testNegativeTimestamp() throws Exception {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s1) values (-123, 3)",
"insert into root.db.d1(time, s1) values (now(), 3)",
"flush"))) {
// Test the correctness of insertRowsNode transmission
"insert into root.db.d1(time, s1) values (-122, 3)",
"insert into root.db.d1(time, s1) values (-123, 3), (now(), 3)"))) {
return;
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.s1),",
Collections.singleton("5,"));
Collections.singleton("6,"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,7 @@ public void testHistoryAndRealtime() throws Exception {
"insert into root.db.d1 (time, at1) values (2, 11)",
"insert into root.db.d2 (time, at1) values (2, 21)",
"insert into root.db.d3 (time, at1) values (2, 31)",
"insert into root.db.d4 (time, at1) values (2, 41)",
"flush"))) {
"insert into root.db.d4 (time, at1) values (2, 41), (3, 51)"))) {
return;
}

Expand All @@ -646,7 +645,7 @@ public void testHistoryAndRealtime() throws Exception {
receiverEnv,
"select count(*) from root.** where time >= 2",
"count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
Collections.singleton("1,1,0,"));
Collections.singleton("2,1,0,"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public synchronized Pair<TEndPoint, PipeEventBatch> onEvent(final TabletInsertio
} else if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final InsertNode insertNode =
((PipeInsertNodeTabletInsertionEvent) event).getInsertNodeViaCacheIfPossible();
if (Objects.nonNull(insertNode)) {
// insertNode.getDevicePath() is null for InsertRowsNode
if (Objects.nonNull(insertNode) && Objects.nonNull(insertNode.getDevicePath())) {
deviceId = insertNode.getDevicePath().getFullPath();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ private void transferWithoutCheck(final TabletInsertionEvent tabletInsertionEven
pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);

transfer(
Objects.nonNull(insertNode) ? insertNode.getDevicePath().getFullPath() : null,
// insertNode.getDevicePath() is null for InsertRowsNode
Objects.nonNull(insertNode) && Objects.nonNull(insertNode.getDevicePath())
? insertNode.getDevicePath().getFullPath()
: null,
pipeTransferInsertNodeReqHandler);
} else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ protected void doTransfer(AsyncPipeDataTransferServiceClient client, TPipeTransf
protected void updateLeaderCache(TSStatus status) {
final InsertNode insertNode =
((PipeInsertNodeTabletInsertionEvent) event).getInsertNodeViaCacheIfPossible();
if (insertNode != null) {
// insertNode.getDevicePath() is null for InsertRowsNode
if (insertNode != null && insertNode.getDevicePath() != null) {
connector.updateLeaderCache(
insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ private void doTransfer(
insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();

if (insertNode != null) {
clientAndStatus = clientManager.getClient(insertNode.getDevicePath().getFullPath());
clientAndStatus =
// insertNode.getDevicePath() is null for InsertRowsNode
Objects.nonNull(insertNode.getDevicePath())
? clientManager.getClient(insertNode.getDevicePath().getFullPath())
: clientManager.getClient();
resp =
clientAndStatus
.getLeft()
Expand Down Expand Up @@ -277,7 +281,8 @@ private void doTransfer(
pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
pipeInsertNodeTabletInsertionEvent.toString());
}
if (insertNode != null && status.isSetRedirectNode()) {
// insertNode.getDevicePath() is null for InsertRowsNode
if (insertNode != null && insertNode.getDevicePath() != null && status.isSetRedirectNode()) {
clientManager.updateLeaderCache(
insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
Expand Down Expand Up @@ -303,7 +304,15 @@ public boolean shouldParsePattern() {
final InsertNode node = getInsertNodeViaCacheIfPossible();
return super.shouldParsePattern()
&& Objects.nonNull(pipePattern)
&& (Objects.isNull(node) || !pipePattern.coversDevice(node.getDevicePath().getFullPath()));
&& (Objects.isNull(node)
|| (node.getType() == PlanNodeType.INSERT_ROWS
? ((InsertRowsNode) node)
.getInsertRowNodeList().stream()
.anyMatch(
insertRowNode ->
!pipePattern.coversDevice(
insertRowNode.getDevicePath().getFullPath()))
: !pipePattern.coversDevice(node.getDevicePath().getFullPath())));
}

public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
Expand Down

0 comments on commit 7d4dd9e

Please sign in to comment.