Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load: convert to insert tablet on region replica set changes #14717

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,16 @@ protected void executeTabletConversion(final IAnalysis analysis, final LoadAnaly
: null;

if (status == null) {
LOGGER.warn(
"Load: Failed to convert to tablets from statement {}. Status is null.",
isTableModelStatement ? loadTsFileTableStatement : loadTsFileTreeStatement);
analysis.setFailStatus(
new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
} else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
LOGGER.warn(
"Load: Failed to convert to tablets from statement {}. Status: {}",
isTableModelStatement ? loadTsFileTableStatement : loadTsFileTreeStatement,
status);
analysis.setFailStatus(status);
}
analysis.setFinishQueryAfterAnalyze(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ public static PlanNode deserialize(ByteBuffer buffer) {
InputStream stream = new ByteArrayInputStream(buffer.array());
try {
ReadWriteIOUtils.readShort(stream); // read PlanNodeType
File tsFile = new File(ReadWriteIOUtils.readString(stream));
LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile);
int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
final File tsFile = new File(ReadWriteIOUtils.readString(stream));
final LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile);
final int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
for (int i = 0; i < tsFileDataSize; i++) {
TsFileData tsFileData = TsFileData.deserialize(stream);
pieceNode.addTsFileData(tsFileData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ public void start() {
long startTime = System.nanoTime();
final boolean isFirstPhaseSuccess;
try {
isFirstPhaseSuccess =
firstPhaseWithRetry(node, CONFIG.getLoadTsFileRetryCountOnRegionChange());
isFirstPhaseSuccess = firstPhase(node);
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() - startTime);
Expand Down Expand Up @@ -272,6 +271,7 @@ public void start() {
final long startTime = System.nanoTime();
try {
// if failed to load some TsFiles, then try to convert the TsFiles to Tablets
LOGGER.info("Load TsFile failed, will try to convert to tablets and insert.");
convertFailedTsFilesToTabletsAndRetry();
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
Expand All @@ -283,30 +283,7 @@ public void start() {
}
}

private boolean firstPhaseWithRetry(LoadSingleTsFileNode node, int retryCountOnRegionChange) {
retryCountOnRegionChange = Math.max(0, retryCountOnRegionChange);
while (true) {
try {
return firstPhase(node);
} catch (RegionReplicaSetChangedException e) {
if (retryCountOnRegionChange > 0) {
LOGGER.warn(
"Region replica set changed during loading TsFile {}, maybe due to region migration, will retry for {} times.",
node.getTsFileResource(),
retryCountOnRegionChange);
retryCountOnRegionChange--;
} else {
stateMachine.transitionToFailed(e);
LOGGER.warn(
"Region replica set changed during loading TsFile {} after retry.",
node.getTsFileResource());
return false;
}
}
}
}

private boolean firstPhase(LoadSingleTsFileNode node) throws RegionReplicaSetChangedException {
private boolean firstPhase(LoadSingleTsFileNode node) {
final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block);
try {
new TsFileSplitter(
Expand All @@ -316,8 +293,6 @@ private boolean firstPhase(LoadSingleTsFileNode node) throws RegionReplicaSetCha
stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
return false;
}
} catch (RegionReplicaSetChangedException e) {
throw e;
} catch (IllegalStateException e) {
stateMachine.transitionToFailed(e);
LOGGER.warn(
Expand Down Expand Up @@ -695,7 +670,7 @@ private boolean addOrSendChunkData(ChunkData chunkData) throws LoadFileException

dataSize -= pieceNode.getDataSize();
block.reduceMemoryUsage(pieceNode.getDataSize());
regionId2ReplicaSetAndNode.put(
regionId2ReplicaSetAndNode.replace(
sortedRegionId,
new Pair<>(
replicaSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,16 @@ public TSStatus writeLoadTsFileNode(
return status;
}

final DataRegion dataRegion = getDataRegion(dataRegionId);
if (dataRegion == null) {
LOGGER.warn(
"DataRegion {} not found on this DataNode when writing piece node"
+ "of TsFile {} (maybe due to region migration), will skip.",
dataRegionId,
pieceNode.getTsFile());
return RpcUtils.SUCCESS_STATUS;
}

LoadTsFileRateLimiter.getInstance().acquire(pieceNode.getDataSize());

try {
Expand Down Expand Up @@ -936,26 +946,18 @@ public TSStatus executeLoadCommand(
try {
switch (loadCommand) {
case EXECUTE:
if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, progressIndex)) {
status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(
String.format(
"No load TsFile uuid %s recorded for execute load command %s.",
uuid, loadCommand));
if (!loadTsFileManager.loadAll(uuid, isGeneratedByPipe, progressIndex)) {
LOGGER.warn(
"No load TsFile uuid {} recorded for execute load command {}.", uuid, loadCommand);
}
status = RpcUtils.SUCCESS_STATUS;
break;
case ROLLBACK:
if (loadTsFileManager.deleteAll(uuid)) {
status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(
String.format(
"No load TsFile uuid %s recorded for execute load command %s.",
uuid, loadCommand));
if (!loadTsFileManager.deleteAll(uuid)) {
LOGGER.warn(
"No load TsFile uuid {} recorded for rollback load command {}.", uuid, loadCommand);
}
status = RpcUtils.SUCCESS_STATUS;
break;
default:
status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| result.getCode()
== TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}, status code is {}.",
loadTsFileStatement,
result.getCode());
return Optional.empty();
}
}
Expand Down
Loading