Skip to content

Commit

Permalink
refactor: rename BatchFlushEndpoint->AutoBatchFlushEndpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 8, 2024
1 parent 3241d42 commit 205813a
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 26 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.protocol.CommandExpiryWriter;
import io.lettuce.core.protocol.CommandHandler;
import io.lettuce.core.protocol.DefaultBatchFlushEndpoint;
import io.lettuce.core.protocol.DefaultAutoBatchFlushEndpoint;
import io.lettuce.core.protocol.DefaultEndpoint;
import io.lettuce.core.protocol.Endpoint;
import io.lettuce.core.protocol.PushHandler;
Expand Down Expand Up @@ -277,7 +277,7 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone
logger.debug("Trying to get a Redis connection for: {}", redisURI);

Endpoint endpoint = getOptions().getAutoBatchFlushOptions().isAutoBatchFlushEnabled()
? new DefaultBatchFlushEndpoint(getOptions(), getResources())
? new DefaultAutoBatchFlushEndpoint(getOptions(), getResources())
: new DefaultEndpoint(getOptions(), getResources());
RedisChannelWriter writer = (RedisChannelWriter) endpoint;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisException;
import io.lettuce.core.protocol.DefaultBatchFlushEndpoint;
import io.lettuce.core.protocol.DefaultAutoBatchFlushEndpoint;
import io.lettuce.core.resource.ClientResources;

/**
Expand All @@ -28,7 +28,7 @@
*
* @author Mark Paluch
*/
public class ClusterNodeBatchFlushEndpoint extends DefaultBatchFlushEndpoint {
public class ClusterNodeAutoBatchFlushEndpoint extends DefaultAutoBatchFlushEndpoint {

/**
* Initialize a new instance that handles commands from the supplied queue.
Expand All @@ -37,7 +37,7 @@ public class ClusterNodeBatchFlushEndpoint extends DefaultBatchFlushEndpoint {
* @param clientResources client resources for this connection.
* @param clusterChannelWriter top-most channel writer.
*/
public ClusterNodeBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources,
public ClusterNodeAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources,
RedisChannelWriter clusterChannelWriter) {
super(clientOptions, clientResources, clusterChannelWriter != null ? cmd -> {
if (cmd.isDone()) {
Expand All @@ -49,7 +49,7 @@ public ClusterNodeBatchFlushEndpoint(ClientOptions clientOptions, ClientResource
} catch (RedisException e) {
cmd.completeExceptionally(e);
}
} : DefaultBatchFlushEndpoint::cancelCommandOnEndpointClose);
} : DefaultAutoBatchFlushEndpoint::cancelCommandOnEndpointClose);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisC
LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");

Endpoint endpoint = getClusterClientOptions().getAutoBatchFlushOptions().isAutoBatchFlushEnabled()
? new ClusterNodeBatchFlushEndpoint(getClusterClientOptions(), getResources(), clusterWriter)
? new ClusterNodeAutoBatchFlushEndpoint(getClusterClientOptions(), getResources(), clusterWriter)
: new ClusterNodeEndpoint(getClusterClientOptions(), getResources(), clusterWriter);

RedisChannelWriter writer = (RedisChannelWriter) endpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/**
* @author chenxiaofan
*/
public interface BatchFlushEndpoint extends Endpoint {
public interface AutoBatchFlushEndpoint extends Endpoint {

@Override
default void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc
this.clientOptions = clientOptions;
this.clientResources = clientResources;
this.endpoint = endpoint;
this.supportsBatchFlush = endpoint instanceof BatchFlushEndpoint;
this.supportsBatchFlush = endpoint instanceof AutoBatchFlushEndpoint;
this.commandLatencyRecorder = clientResources.commandLatencyRecorder();
this.latencyMetricsEnabled = commandLatencyRecorder.isEnabled();
this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
Expand Down Expand Up @@ -399,7 +399,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

if (supportsBatchFlush) {
// Needs decision of watchdog
((BatchFlushEndpoint) endpoint).notifyChannelInactiveAfterWatchdogDecision(ctx.channel(),
((AutoBatchFlushEndpoint) endpoint).notifyChannelInactiveAfterWatchdogDecision(ctx.channel(),
batchFlushRetryableDrainQueuedCommands);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo
this.redisUri = (String) bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI);
this.epid = endpoint.getId();
this.endpoint = endpoint;
this.useBatchFlushEndpoint = endpoint instanceof BatchFlushEndpoint;
this.useBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint;

Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr)
.onErrorResume(t -> {
Expand Down Expand Up @@ -308,7 +308,7 @@ private void notifyEndpointFailedToConnectIfNeeded() {

private void notifyEndpointFailedToConnectIfNeeded(Exception e) {
if (useBatchFlushEndpoint) {
((BatchFlushEndpoint) endpoint).notifyReconnectFailed(e);
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,20 @@
*
* @author Mark Paluch
*/
public class DefaultBatchFlushEndpoint implements RedisChannelWriter, BatchFlushEndpoint, PushHandler {
public class DefaultAutoBatchFlushEndpoint implements RedisChannelWriter, AutoBatchFlushEndpoint, PushHandler {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(BatchFlushEndpoint.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AutoBatchFlushEndpoint.class);

private static final AtomicLong ENDPOINT_COUNTER = new AtomicLong();

private static final AtomicReferenceFieldUpdater<DefaultBatchFlushEndpoint, ContextualChannel> CHANNEL = AtomicReferenceFieldUpdater
.newUpdater(DefaultBatchFlushEndpoint.class, ContextualChannel.class, "channel");
private static final AtomicReferenceFieldUpdater<DefaultAutoBatchFlushEndpoint, ContextualChannel> CHANNEL = AtomicReferenceFieldUpdater
.newUpdater(DefaultAutoBatchFlushEndpoint.class, ContextualChannel.class, "channel");

private static final AtomicIntegerFieldUpdater<DefaultBatchFlushEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater
.newUpdater(DefaultBatchFlushEndpoint.class, "queueSize");
private static final AtomicIntegerFieldUpdater<DefaultAutoBatchFlushEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater
.newUpdater(DefaultAutoBatchFlushEndpoint.class, "queueSize");

private static final AtomicIntegerFieldUpdater<DefaultBatchFlushEndpoint> STATUS = AtomicIntegerFieldUpdater
.newUpdater(DefaultBatchFlushEndpoint.class, "status");
private static final AtomicIntegerFieldUpdater<DefaultAutoBatchFlushEndpoint> STATUS = AtomicIntegerFieldUpdater
.newUpdater(DefaultAutoBatchFlushEndpoint.class, "status");

private static final int ST_OPEN = 0;

Expand Down Expand Up @@ -170,16 +170,16 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
private final int batchSize;

/**
* Create a new {@link BatchFlushEndpoint}.
* Create a new {@link AutoBatchFlushEndpoint}.
*
* @param clientOptions client options for this connection, must not be {@code null}.
* @param clientResources client resources for this connection, must not be {@code null}.
*/
public DefaultBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources) {
this(clientOptions, clientResources, DefaultBatchFlushEndpoint::cancelCommandOnEndpointClose);
public DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources) {
this(clientOptions, clientResources, DefaultAutoBatchFlushEndpoint::cancelCommandOnEndpointClose);
}

protected DefaultBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources,
protected DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources,
Consumer<RedisCommand<?, ?, ?>> callbackOnClose) {

LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
Expand Down Expand Up @@ -976,7 +976,7 @@ protected WrittenToChannel newObject(Recycler.Handle<WrittenToChannel> handle) {

private final Recycler.Handle<WrittenToChannel> handle;

private DefaultBatchFlushEndpoint endpoint;
private DefaultAutoBatchFlushEndpoint endpoint;

private RedisCommand<?, ?, ?> command;

Expand All @@ -991,7 +991,7 @@ private WrittenToChannel(Recycler.Handle<WrittenToChannel> handle) {
*
* @return new instance
*/
static WrittenToChannel newInstance(DefaultBatchFlushEndpoint endpoint, ContextualChannel chan,
static WrittenToChannel newInstance(DefaultAutoBatchFlushEndpoint endpoint, ContextualChannel chan,
RedisCommand<?, ?, ?> command) {

WrittenToChannel entry = RECYCLER.get();
Expand Down

0 comments on commit 205813a

Please sign in to comment.