Skip to content

Commit

Permalink
Pipe: Fixed the bug that schema region listening queue is not cleared…
Browse files Browse the repository at this point in the history
… when the schema region is deleted / migrated (#12575)
  • Loading branch information
Caideyipi authored May 23, 2024
1 parent b2c5680 commit 911d7b6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -527,34 +527,29 @@ public TSStatus visitDeleteLogicalView(DeleteLogicalViewNode node, ISchemaRegion

@Override
public TSStatus visitPipeEnrichedWritePlanNode(
PipeEnrichedWritePlanNode node, ISchemaRegion schemaRegion) {
final PipeEnrichedWritePlanNode node, final ISchemaRegion schemaRegion) {
return node.getWritePlanNode().accept(this, schemaRegion);
}

@Override
public TSStatus visitPipeEnrichedNonWritePlanNode(
PipeEnrichedNonWritePlanNode node, ISchemaRegion schemaRegion) {
final PipeEnrichedNonWritePlanNode node, final ISchemaRegion schemaRegion) {
return node.getNonWritePlanNode().accept(this, schemaRegion);
}

@Override
public TSStatus visitPipeOperateSchemaQueueNode(
PipeOperateSchemaQueueNode node, ISchemaRegion schemaRegion) {
final PipeOperateSchemaQueueNode node, final ISchemaRegion schemaRegion) {
final SchemaRegionId id = schemaRegion.getSchemaRegionId();
final SchemaRegionListeningQueue queue = PipeAgent.runtime().schemaListener(id);
try {
if (node.isOpen() && !queue.isOpened()) {
logger.info("Opened pipe listening queue on schema region {}", id);
queue.open();
} else if (!node.isOpen() && queue.isOpened()) {
logger.info("Closed pipe listening queue on schema region {}", id);
queue.close();
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (IOException e) {
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage("Failed to clear the queue, because " + e.getMessage());
if (node.isOpen() && !queue.isOpened()) {
logger.info("Opened pipe listening queue on schema region {}", id);
queue.open();
} else if (!node.isOpen() && queue.isOpened()) {
logger.info("Closed pipe listening queue on schema region {}", id);
queue.close();
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.rpc.RpcUtils;
Expand Down Expand Up @@ -203,6 +204,7 @@ public TSStatus deleteDataRegion(DataRegionId dataRegionId) {
public TSStatus deleteSchemaRegion(SchemaRegionId schemaRegionId) {
try {
schemaEngine.deleteSchemaRegion(schemaRegionId);
PipeAgent.runtime().schemaListener(schemaRegionId).close();
schemaRegionLockMap.remove(schemaRegionId);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
} catch (MetadataException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -57,15 +56,15 @@ protected AbstractPipeListeningQueue() {

/////////////////////////////// Plan ///////////////////////////////

protected synchronized void tryListen(EnrichedEvent event) {
protected synchronized void tryListen(final EnrichedEvent event) {
if (super.tryListen(event)) {
event.increaseReferenceCount(AbstractPipeListeningQueue.class.getName());
}
}

/////////////////////////////// Snapshot Cache ///////////////////////////////

protected synchronized void tryListen(List<PipeSnapshotEvent> events) {
protected synchronized void tryListen(final List<PipeSnapshotEvent> events) {
if (!isClosed.get()) {
clearSnapshots();
queueTailIndex2SnapshotsCache.setLeft(queue.getTailIndex());
Expand All @@ -87,7 +86,7 @@ public synchronized Pair<Long, List<PipeSnapshotEvent>> findAvailableSnapshots()
}

@Override
public synchronized long removeBefore(long newFirstIndex) {
public synchronized long removeBefore(final long newFirstIndex) {
final long result = super.removeBefore(newFirstIndex);
if (queueTailIndex2SnapshotsCache.getLeft() < result) {
clearSnapshots();
Expand All @@ -108,13 +107,13 @@ private synchronized void clearSnapshots() {
/////////////////////////////// Close ///////////////////////////////

@Override
public synchronized void close() throws IOException {
public synchronized void close() {
clearSnapshots();
super.close();
}

@Override
protected void releaseResource(Event event) {
protected void releaseResource(final Event event) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event)
.decreaseReferenceCount(AbstractPipeListeningQueue.class.getName(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ public abstract class AbstractSerializableListeningQueue<E> implements Closeable

protected final AtomicBoolean isClosed = new AtomicBoolean(true);

protected AbstractSerializableListeningQueue(QueueSerializerType serializerType) {
protected AbstractSerializableListeningQueue(final QueueSerializerType serializerType) {
this.serializerType = serializerType;
serializers.put(QueueSerializerType.PLAIN, PlainQueueSerializer::new);
}

/////////////////////////////// Function ///////////////////////////////

protected synchronized boolean tryListen(E element) {
protected synchronized boolean tryListen(final E element) {
if (isClosed.get()) {
return false;
}
Expand All @@ -74,7 +74,7 @@ protected synchronized boolean tryListen(E element) {
}

// Caller should ensure that the "newFirstIndex" is less than every iterators.
public synchronized long removeBefore(long newFirstIndex) {
public synchronized long removeBefore(final long newFirstIndex) {
try (final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator =
queue.iterateFromEarliest()) {
while (iterator.getNextIndex() < newFirstIndex) {
Expand All @@ -88,23 +88,24 @@ public synchronized long removeBefore(long newFirstIndex) {
return queue.tryRemoveBefore(newFirstIndex);
}

public synchronized boolean isGivenNextIndexValid(long nextIndex) {
public synchronized boolean isGivenNextIndexValid(final long nextIndex) {
// The "tailIndex" is permitted to listen to the next incoming element
return queue.isNextIndexValid(nextIndex);
}

public synchronized ConcurrentIterableLinkedQueue<E>.DynamicIterator newIterator(long nextIndex) {
public synchronized ConcurrentIterableLinkedQueue<E>.DynamicIterator newIterator(
final long nextIndex) {
return queue.iterateFrom(nextIndex);
}

public synchronized void returnIterator(
ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator) {
final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator) {
iterator.close();
}

/////////////////////////////// Snapshot ///////////////////////////////

public synchronized boolean serializeToFile(File snapshotName) throws IOException {
public synchronized boolean serializeToFile(final File snapshotName) throws IOException {
final File snapshotFile = new File(String.valueOf(snapshotName));
if (snapshotFile.exists() && snapshotFile.isFile()) {
LOGGER.error(
Expand All @@ -128,7 +129,7 @@ public synchronized boolean serializeToFile(File snapshotName) throws IOExceptio
}
}

public synchronized void deserializeFromFile(File snapshotName) throws IOException {
public synchronized void deserializeFromFile(final File snapshotName) throws IOException {
final File snapshotFile = new File(String.valueOf(snapshotName));
if (!snapshotFile.exists() || !snapshotFile.isFile()) {
LOGGER.error(
Expand All @@ -155,15 +156,15 @@ public synchronized void deserializeFromFile(File snapshotName) throws IOExcepti

/////////////////////////////// Element Ser / De Method ////////////////////////////////

protected abstract ByteBuffer serializeToByteBuffer(E element);
protected abstract ByteBuffer serializeToByteBuffer(final E element);

/**
* Deserialize a single element from byteBuffer.
*
* @param byteBuffer the byteBuffer corresponding to an element
* @return The deserialized element or {@code null} if a failure is encountered.
*/
protected abstract E deserializeFromByteBuffer(ByteBuffer byteBuffer);
protected abstract E deserializeFromByteBuffer(final ByteBuffer byteBuffer);

/////////////////////////////// Open & Close ///////////////////////////////

Expand All @@ -172,7 +173,7 @@ public synchronized void open() {
}

@Override
public synchronized void close() throws IOException {
public synchronized void close() {
isClosed.set(true);

try (final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator =
Expand All @@ -188,7 +189,7 @@ public synchronized void close() throws IOException {
queue.clear();
}

protected abstract void releaseResource(E element);
protected abstract void releaseResource(final E element);

public synchronized boolean isOpened() {
return !isClosed.get();
Expand Down

0 comments on commit 911d7b6

Please sign in to comment.