Skip to content

Commit

Permalink
IoTConsensusV2: Support Table Model Replicate (#14169)
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna authored Nov 22, 2024
1 parent d82dffd commit a689726
Show file tree
Hide file tree
Showing 16 changed files with 292 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY;
Expand Down Expand Up @@ -103,6 +105,8 @@ public void createConsensusPipe(
EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
String.valueOf(consensusPipeName.getReceiverDataNodeId()))
.put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
.put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
.put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
.build();
} else {
extractorParams =
Expand All @@ -119,6 +123,8 @@ public void createConsensusPipe(
EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
String.valueOf(consensusPipeName.getReceiverDataNodeId()))
.put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
.put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
.put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
.put(
EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY,
String.valueOf(new ConsensusPipeName(senderPeer, regionMigrationCoordinatorPeer)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;

import org.apache.tsfile.utils.PublicBAOS;
Expand All @@ -39,20 +40,20 @@

public class PipeConsensusDeleteNodeReq extends TPipeConsensusTransferReq {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusDeleteNodeReq.class);
private transient DeleteDataNode deleteDataNode;
private transient AbstractDeleteDataNode deleteDataNode;

private PipeConsensusDeleteNodeReq() {
// Do nothing
}

public DeleteDataNode getDeleteDataNode() {
public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}

/////////////////////////////// Thrift ///////////////////////////////

public static PipeConsensusDeleteNodeReq toTPipeConsensusTransferReq(
DeleteDataNode deleteDataNode,
AbstractDeleteDataNode deleteDataNode,
TCommitId commitId,
TConsensusGroupId consensusGroupId,
ProgressIndex progressIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteNodeType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,7 +48,7 @@ public class DeletionResource implements PersistentResource {
private static final Logger LOGGER = LoggerFactory.getLogger(DeletionResource.class);
private final Consumer<DeletionResource> removeHook;
private final AtomicInteger pipeTaskReferenceCount;
private final DeleteDataNode deleteDataNode;
private final AbstractDeleteDataNode deleteDataNode;
private final ConsensusGroupId consensusGroupId;
private volatile Status currentStatus;

Expand All @@ -56,7 +57,9 @@ public class DeletionResource implements PersistentResource {
private volatile Exception cause;

public DeletionResource(
DeleteDataNode deleteDataNode, Consumer<DeletionResource> removeHook, String regionId) {
AbstractDeleteDataNode deleteDataNode,
Consumer<DeletionResource> removeHook,
String regionId) {
this.deleteDataNode = deleteDataNode;
this.removeHook = removeHook;
this.currentStatus = Status.RUNNING;
Expand Down Expand Up @@ -135,7 +138,7 @@ public long getFileEndTime() {
return 0;
}

public DeleteDataNode getDeleteDataNode() {
public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}

Expand All @@ -149,11 +152,11 @@ public ByteBuffer serialize() {
public static DeletionResource deserialize(
final ByteBuffer buffer, final String regionId, final Consumer<DeletionResource> removeHook)
throws IOException {
DeleteDataNode node = DeleteDataNode.deserializeFromDAL(buffer);
AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer);
return new DeletionResource(node, removeHook, regionId);
}

public static boolean isDeleteNodeGeneratedInLocalByIoTV2(DeleteDataNode node) {
public static boolean isDeleteNodeGeneratedInLocalByIoTV2(AbstractDeleteDataNode node) {
if (node.getProgressIndex() instanceof RecoverProgressIndex) {
RecoverProgressIndex recoverProgressIndex = (RecoverProgressIndex) node.getProgressIndex();
return recoverProgressIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer;
import org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
import org.apache.iotdb.db.pipe.consensus.deletion.recover.DeletionReader;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;

import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
Expand Down Expand Up @@ -64,7 +64,7 @@ public class DeletionResourceManager implements AutoCloseable {
private final String dataRegionId;
private final DeletionBuffer deletionBuffer;
private final File storageDir;
private final Map<DeleteDataNode, DeletionResource> deleteNode2ResourcesMap =
private final Map<AbstractDeleteDataNode, DeletionResource> deleteNode2ResourcesMap =
new ConcurrentHashMap<>();
private final Lock recoverLock = new ReentrantLock();
private final Condition recoveryReadyCondition = recoverLock.newCondition();
Expand Down Expand Up @@ -133,7 +133,7 @@ public void close() {
LOGGER.info("Deletion resource manager for {} has been successfully closed!", dataRegionId);
}

public DeletionResource registerDeletionResource(DeleteDataNode deleteDataNode) {
public DeletionResource registerDeletionResource(AbstractDeleteDataNode deleteDataNode) {
DeletionResource deletionResource =
deleteNode2ResourcesMap.computeIfAbsent(
deleteDataNode,
Expand All @@ -144,7 +144,7 @@ public DeletionResource registerDeletionResource(DeleteDataNode deleteDataNode)
return deletionResource;
}

public DeletionResource getDeletionResource(DeleteDataNode deleteDataNode) {
public DeletionResource getDeletionResource(AbstractDeleteDataNode deleteDataNode) {
return deleteNode2ResourcesMap.get(deleteDataNode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.commons.pipe.event.SerializableEvent;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;

import org.apache.tsfile.utils.ReadWriteIOUtils;
Expand All @@ -36,7 +37,7 @@
import java.util.Optional;

public class PipeDeleteDataNodeEvent extends EnrichedEvent implements SerializableEvent {
private DeleteDataNode deleteDataNode;
private AbstractDeleteDataNode deleteDataNode;
private DeletionResource deletionResource;
private boolean isGeneratedByPipe;
private ProgressIndex progressIndex;
Expand All @@ -47,12 +48,12 @@ public PipeDeleteDataNodeEvent() {
}

public PipeDeleteDataNodeEvent(
final DeleteDataNode deleteDataNode, final boolean isGeneratedByPipe) {
final AbstractDeleteDataNode deleteDataNode, final boolean isGeneratedByPipe) {
this(deleteDataNode, null, 0, null, null, null, isGeneratedByPipe);
}

public PipeDeleteDataNodeEvent(
final DeleteDataNode deleteDataNode,
final AbstractDeleteDataNode deleteDataNode,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
Expand All @@ -73,7 +74,7 @@ public PipeDeleteDataNodeEvent(
.ifPresent(node -> this.progressIndex = deleteDataNode.getProgressIndex());
}

public DeleteDataNode getDeleteDataNode() {
public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpochManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
Expand Down Expand Up @@ -67,7 +67,7 @@ public static PipeRealtimeEvent createRealtimeEvent(
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, null, null);
}

public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode node) {
public static PipeRealtimeEvent createRealtimeEvent(final AbstractDeleteDataNode node) {
return new PipeRealtimeEvent(
new PipeDeleteDataNodeEvent(node, node.isGeneratedByPipe()), null, null, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeDataRegionAssigner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;

Expand Down Expand Up @@ -142,7 +141,8 @@ public void listenToInsertNode(
}

// TODO: record database name in enriched events?
public DeletionResource listenToDeleteData(final String regionId, final DeleteDataNode node) {
public DeletionResource listenToDeleteData(
final String regionId, final AbstractDeleteDataNode node) {
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(regionId);
// only events from registered data region will be extracted
if (assigner == null) {
Expand All @@ -169,24 +169,13 @@ public DeletionResource listenToDeleteData(final String regionId, final DeleteDa
return deletionResource;
}

public DeletionResource listenToDeleteData(
final String regionId, final RelationalDeleteDataNode node) {
// TODO: implement
return null;
}

public void listenToHeartbeat(boolean shouldPrintMessage) {
dataRegionId2Assigner.forEach(
(key, value) ->
value.publishToAssign(
PipeRealtimeEventFactory.createRealtimeEvent(key, shouldPrintMessage)));
}

public void listenToDeleteData(DeleteDataNode node) {
dataRegionId2Assigner.forEach(
(key, value) -> value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node)));
}

/////////////////////////////// singleton ///////////////////////////////

private PipeInsertionDataNodeListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusReceiverMetrics;
import org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPointCounter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
Expand Down Expand Up @@ -314,7 +314,7 @@ private TPipeConsensusTransferResp handleTransferDeletion(final PipeConsensusDel
PipeConsensusServerImpl impl =
Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(consensusGroupId));
final DeleteDataNode planNode = req.getDeleteDataNode();
final AbstractDeleteDataNode planNode = req.getDeleteDataNode();
planNode.markAsGeneratedByRemoteConsensusLeader();
planNode.setProgressIndex(
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
*
* <p>2.{@link DataExecutionVisitor}, to actually write data on data region and mark it as received
* from pipe.
*
* <p>TODO: support relational deleteNode
*/
public class PipeEnrichedDeleteDataNode extends DeleteDataNode {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;

import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

@SuppressWarnings({"java:S1854", "unused"})
public abstract class AbstractDeleteDataNode extends SearchNode implements WALEntryValue {
protected TRegionReplicaSet regionReplicaSet;
protected ProgressIndex progressIndex;

protected AbstractDeleteDataNode(PlanNodeId id) {
super(id);
}

public abstract ByteBuffer serializeToDAL();

public static AbstractDeleteDataNode deserializeFromDAL(
ByteBuffer byteBuffer, boolean isRelational) {
if (isRelational) {
return RelationalDeleteDataNode.deserializeFromDAL(byteBuffer);
} else {
return DeleteDataNode.deserializeFromDAL(byteBuffer);
}
}

@Override
public ProgressIndex getProgressIndex() {
return progressIndex;
}

@Override
public void setProgressIndex(ProgressIndex progressIndex) {
this.progressIndex = progressIndex;
}

@Override
public List<PlanNode> getChildren() {
return new ArrayList<>();
}

@Override
public void addChild(PlanNode child) {
throw new UnsupportedOperationException("Not supported.");
}

@Override
public int allowedChildCount() {
return NO_CHILD_ALLOWED;
}

@Override
public List<String> getOutputColumnNames() {
return null;
}
}
Loading

0 comments on commit a689726

Please sign in to comment.