From 3654bc451347d35f788296bb314a86d3902bdaac Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 29 Dec 2024 09:31:14 -0800 Subject: [PATCH] KAFKA-18339: Fix parseRequestHeader error handling (#18340) A minor refactoring just before merging #18295 introduced a regression and no test failed. Throw the correct exception and add test to verify it. Also refactor the code slightly to make that possible. Thanks to Chia-Ping for catching the issue. Reviewers: Chia-Ping Tsai --- .../kafka/common/requests/RequestContext.java | 2 +- .../kafka/common/requests/RequestHeader.java | 4 ++ .../SaslServerAuthenticator.java | 4 +- .../scala/kafka/network/SocketServer.scala | 24 ++++----- .../unit/kafka/network/ProcessorTest.scala | 54 +++++++++++++++++++ 5 files changed, 73 insertions(+), 15 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/network/ProcessorTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java index ece4a13fd88d0..c15fa960abf12 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java @@ -151,7 +151,7 @@ public ByteBuffer buildResponseEnvelopePayload(AbstractResponse body) { } private boolean isUnsupportedApiVersionsRequest() { - return header.apiKey() == API_VERSIONS && !API_VERSIONS.isVersionSupported(header.apiVersion()); + return header.apiKey() == API_VERSIONS && !header.isApiVersionSupported(); } public short apiVersion() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index f63c3d98e1e1e..45063b816bc4c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -108,6 +108,10 @@ public int size() { return size; } + public boolean isApiVersionSupported() { + return apiKey().isVersionSupported(apiVersion()); + } + public boolean isApiVersionDeprecated() { return apiKey().isVersionDeprecated(apiVersion()); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 9a71192c8c44d..e2ebaa31cd260 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -434,7 +434,6 @@ private void handleSaslToken(byte[] clientToken) throws IOException { ByteBuffer requestBuffer = ByteBuffer.wrap(clientToken); RequestHeader header = RequestHeader.parse(requestBuffer); ApiKeys apiKey = header.apiKey(); - short version = header.apiVersion(); RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()), KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false); RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer); @@ -443,7 +442,8 @@ private void handleSaslToken(byte[] clientToken) throws IOException { buildResponseOnAuthenticateFailure(requestContext, requestAndSize.request.getErrorResponse(e)); throw e; } - if (!apiKey.isVersionSupported(version)) { + short version = header.apiVersion(); + if (!header.isApiVersionSupported()) { // We cannot create an error response if the request version of SaslAuthenticate is not supported // This should not normally occur since clients typically check supported versions using ApiVersionsRequest throw new UnsupportedVersionException("Version " + version + " is not supported for apiKey " + apiKey); diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 639fc9b73f82f..8f60861ec7f1a 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -888,6 +888,17 @@ private[kafka] object Processor { val NetworkProcessorMetricTag = "networkProcessor" val ListenerMetricTag = "listener" val ConnectionQueueSize = 20 + + private[network] def parseRequestHeader(apiVersionManager: ApiVersionManager, buffer: ByteBuffer): RequestHeader = { + val header = RequestHeader.parse(buffer) + if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) { + header + } else if (header.isApiVersionSupported()) { + throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled") + } else { + throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported") + } + } } /** @@ -1103,23 +1114,12 @@ private[kafka] class Processor( } } - private def parseRequestHeader(buffer: ByteBuffer): RequestHeader = { - val header = RequestHeader.parse(buffer) - if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) { - header - } else if (header.isApiVersionDeprecated()) { - throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled") - } else { - throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported") - } - } - private def processCompletedReceives(): Unit = { selector.completedReceives.forEach { receive => try { openOrClosingChannel(receive.source) match { case Some(channel) => - val header = parseRequestHeader(receive.payload) + val header = parseRequestHeader(apiVersionManager, receive.payload) if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, () => time.nanoseconds())) trace(s"Begin re-authentication: $channel") diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala new file mode 100644 index 0000000000000..d42ae11bae64f --- /dev/null +++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import kafka.server.SimpleApiVersionManager +import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException} +import org.apache.kafka.common.message.ApiMessageType.ListenerType +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils} +import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.function.Executable + +import java.util.Collections + +class ProcessorTest { + + @Test + def testParseRequestHeaderWithDisabledApi(): Unit = { + val requestHeader = RequestTestUtils.serializeRequestHeader( + new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0)) + val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) + assertThrows(classOf[InvalidRequestException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, + "INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception") + } + + @Test + def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = { + val requestHeader = RequestTestUtils.serializeRequestHeader( + new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0)) + val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true)) + assertThrows(classOf[UnsupportedVersionException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, + "PRODUCE v0 should throw UnsupportedVersionException exception") + } + +}