Skip to content

Commit

Permalink
KAFKA-18339: Fix parseRequestHeader error handling (apache#18340)
Browse files Browse the repository at this point in the history
A minor refactoring just before merging apache#18295 introduced a regression and no test failed. Throw the correct exception and add test to verify it. Also refactor the code slightly to make that possible.

Thanks to Chia-Ping for catching the issue.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
ijuma authored Dec 29, 2024
1 parent 1156d5c commit 3654bc4
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public ByteBuffer buildResponseEnvelopePayload(AbstractResponse body) {
}

private boolean isUnsupportedApiVersionsRequest() {
return header.apiKey() == API_VERSIONS && !API_VERSIONS.isVersionSupported(header.apiVersion());
return header.apiKey() == API_VERSIONS && !header.isApiVersionSupported();
}

public short apiVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public int size() {
return size;
}

public boolean isApiVersionSupported() {
return apiKey().isVersionSupported(apiVersion());
}

public boolean isApiVersionDeprecated() {
return apiKey().isVersionDeprecated(apiVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ private void handleSaslToken(byte[] clientToken) throws IOException {
ByteBuffer requestBuffer = ByteBuffer.wrap(clientToken);
RequestHeader header = RequestHeader.parse(requestBuffer);
ApiKeys apiKey = header.apiKey();
short version = header.apiVersion();
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()),
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
Expand All @@ -443,7 +442,8 @@ private void handleSaslToken(byte[] clientToken) throws IOException {
buildResponseOnAuthenticateFailure(requestContext, requestAndSize.request.getErrorResponse(e));
throw e;
}
if (!apiKey.isVersionSupported(version)) {
short version = header.apiVersion();
if (!header.isApiVersionSupported()) {
// We cannot create an error response if the request version of SaslAuthenticate is not supported
// This should not normally occur since clients typically check supported versions using ApiVersionsRequest
throw new UnsupportedVersionException("Version " + version + " is not supported for apiKey " + apiKey);
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,17 @@ private[kafka] object Processor {
val NetworkProcessorMetricTag = "networkProcessor"
val ListenerMetricTag = "listener"
val ConnectionQueueSize = 20

private[network] def parseRequestHeader(apiVersionManager: ApiVersionManager, buffer: ByteBuffer): RequestHeader = {
val header = RequestHeader.parse(buffer)
if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
header
} else if (header.isApiVersionSupported()) {
throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
} else {
throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported")
}
}
}

/**
Expand Down Expand Up @@ -1103,23 +1114,12 @@ private[kafka] class Processor(
}
}

private def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
val header = RequestHeader.parse(buffer)
if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
header
} else if (header.isApiVersionDeprecated()) {
throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
} else {
throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported")
}
}

private def processCompletedReceives(): Unit = {
selector.completedReceives.forEach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = parseRequestHeader(receive.payload)
val header = parseRequestHeader(apiVersionManager, receive.payload)
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
() => time.nanoseconds()))
trace(s"Begin re-authentication: $channel")
Expand Down
54 changes: 54 additions & 0 deletions core/src/test/scala/unit/kafka/network/ProcessorTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.network

import kafka.server.SimpleApiVersionManager
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable

import java.util.Collections

class ProcessorTest {

@Test
def testParseRequestHeaderWithDisabledApi(): Unit = {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
assertThrows(classOf[InvalidRequestException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
"INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception")
}

@Test
def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0))
val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
assertThrows(classOf[UnsupportedVersionException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
"PRODUCE v0 should throw UnsupportedVersionException exception")
}

}

0 comments on commit 3654bc4

Please sign in to comment.