From 3161115adae4de9f2b2ec6b87462ea10a8993456 Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Tue, 31 Dec 2024 00:18:23 +0800 Subject: [PATCH] KAFKA-18361 Remove PasswordEncoderConfigs (#18347) Reviewers: Chia-Ping Tsai --- .../kafka/server/DynamicBrokerConfig.scala | 36 +---- .../main/scala/kafka/server/KafkaConfig.scala | 9 -- .../admin/ConfigCommandIntegrationTest.java | 2 - .../server/DynamicBrokerConfigTest.scala | 60 +-------- .../unit/kafka/server/KafkaConfigTest.scala | 9 -- docs/upgrade.html | 4 + .../security/PasswordEncoderConfigs.java | 60 --------- .../kafka/security/PasswordEncoderTest.java | 125 ------------------ .../server/config/AbstractKafkaConfig.java | 4 +- 9 files changed, 8 insertions(+), 301 deletions(-) delete mode 100644 server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java delete mode 100644 server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 0fc7b8a49e8c6..0fccc0cbdbcb8 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -213,11 +213,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val lock = new ReentrantReadWriteLock private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _ private var currentConfig: KafkaConfig = _ - private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) { - maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret) - } else { - Some(PasswordEncoder.NOOP) - } + private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP) private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false) @@ -373,16 +369,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging }) } - private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = { - secret.map { secret => - PasswordEncoder.encrypting(secret, - kafkaConfig.passwordEncoderKeyFactoryAlgorithm, - kafkaConfig.passwordEncoderCipherAlgorithm, - kafkaConfig.passwordEncoderKeyLength, - kafkaConfig.passwordEncoderIterations) - } - } - private def passwordEncoder: PasswordEncoder = { dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured")) } @@ -446,25 +432,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // encoded using the current secret. Ignore any errors during decoding since old secret may not // have been removed during broker restart. private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = { - val props = persistentProps.clone().asInstanceOf[Properties] - if (props.asScala.keySet.exists(isPasswordConfig)) { - maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder => - persistentProps.asScala.foreachEntry { (configName, value) => - if (isPasswordConfig(configName) && value != null) { - val decoded = try { - Some(passwordDecoder.decode(value).value) - } catch { - case _: Exception => - debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.") - None - } - decoded.foreach(value => props.put(configName, passwordEncoder.encode(new Password(value)))) - } - } - adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props) - } - } - props + persistentProps.clone().asInstanceOf[Properties] } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 513c1273d164c..9bd564140d2f8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -40,7 +40,6 @@ import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, Transacti import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.authorizer.AuthorizerUtils -import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.MetadataVersion @@ -591,14 +590,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val delegationTokenExpiryTimeMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG) val delegationTokenExpiryCheckIntervalMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG) - /** ********* Password encryption configuration for dynamic configs *********/ - def passwordEncoderSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)) - def passwordEncoderOldSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG)) - def passwordEncoderCipherAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG) - def passwordEncoderKeyFactoryAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG) - def passwordEncoderKeyLength = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG) - def passwordEncoderIterations = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG) - /** ********* Fetch Configuration **************/ val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG) val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG) diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index cc4d6a01dcc9f..5731692f98e1d 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -61,7 +61,6 @@ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG; import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG; import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; @@ -182,7 +181,6 @@ public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception { configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); // Password encoder configs - configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); // Password config update at default cluster-level should fail assertThrows(ExecutionException.class, diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 9c2d95be45c60..262b0abdc194f 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -27,13 +27,11 @@ import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.utils.TestUtils import org.apache.kafka.common.{Endpoint, Reconfigurable} import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} -import org.apache.kafka.common.config.types.Password -import org.apache.kafka.common.config.{ConfigException, SaslConfigs, SslConfigs} +import org.apache.kafka.common.config.{ConfigException, SslConfigs} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.network.SocketServerConfigs -import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig @@ -369,7 +367,6 @@ class DynamicBrokerConfigTest { private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = { val configProps = TestUtils.createBrokerConfig(0, null, port = 8181) - configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret") val config = KafkaConfig(configProps) config.dynamicConfig.initialize(None, None) @@ -416,61 +413,6 @@ class DynamicBrokerConfigTest { } } - @Test - def testPasswordConfigNotEncryption(): Unit = { - val props = TestUtils.createBrokerConfig(0, null, port = 8181) - val configWithoutSecret = KafkaConfig(props) - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret") - val configWithSecret = KafkaConfig(props) - val dynamicProps = new Properties - val password = "myLoginModule required;" - dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password) - - try { - configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) - } catch { - case _: ConfigException => // expected exception - } - val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) - assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG)) - } - - @Test - def testPasswordConfigEncoderSecretChange(): Unit = { - val props = TestUtils.createBrokerConfig(0, null, port = 8181) - props.put(SaslConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;") - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret") - val config = KafkaConfig(props) - config.dynamicConfig.initialize(None, None) - val dynamicProps = new Properties - val password = "dynamicLoginModule required;" - dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password) - - val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) - assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG)) - config.dynamicConfig.updateBrokerConfig(0, persistedProps) - assertEquals(password, config.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) - - // New config with same secret should use the dynamic password config - val newConfigWithSameSecret = KafkaConfig(props) - newConfigWithSameSecret.dynamicConfig.initialize(None, None) - newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) - assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) - - // New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret") - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret") - val newConfigWithNewAndOldSecret = KafkaConfig(props) - newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) - assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) - - // New config with new secret alone should revert to static password config since dynamic config cannot be decoded - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret") - val newConfigWithNewSecret = KafkaConfig(props) - newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) - assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) - } - @Test def testDynamicListenerConfig(): Unit = { val props = TestUtils.createBrokerConfig(0, null, port = 9092) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 1be0dfce3342a..32810e078ec50 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -37,7 +37,6 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig -import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2 import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs} @@ -1005,14 +1004,6 @@ class KafkaConfigTest { // Security config case SecurityConfig.SECURITY_PROVIDERS_CONFIG => - // Password encoder configs - case PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG => - case PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG => - case PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG => - case PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG => - case PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") - case PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") - //delegation token configs case DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG => // ignore case DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") diff --git a/docs/upgrade.html b/docs/upgrade.html index 57cc1882ea5ac..3263fd853df4a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -63,6 +63,10 @@
Notable changes in 4
  • The log.message.format.version and message.format.version configs were removed.
  • +
  • The password encoder related configs (password.encoder.secret, password.encoder.old.secret, + password.encoder.keyfactory.algorithm, password.encoder.cipher.algorithm, + password.encoder.key.length, and password.encoder.iterations) were removed. +
  • The function onNewBatch in org.apache.kafka.clients.producer.Partitioner class was removed.
  • diff --git a/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java b/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java deleted file mode 100644 index ddb724420d73a..0000000000000 --- a/server-common/src/main/java/org/apache/kafka/security/PasswordEncoderConfigs.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.security; - -import org.apache.kafka.common.config.ConfigDef; - -import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; -import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; -import static org.apache.kafka.common.config.ConfigDef.Type.INT; -import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; -import static org.apache.kafka.common.config.ConfigDef.Type.STRING; - -public class PasswordEncoderConfigs { - - public static final String PASSWORD_ENCODER_SECRET_CONFIG = "password.encoder.secret"; - public static final String PASSWORD_ENCODER_SECRET_DOC = "The secret used for encoding dynamically configured passwords for this broker."; - - public static final String PASSWORD_ENCODER_OLD_SECRET_CONFIG = "password.encoder.old.secret"; - public static final String PASSWORD_ENCODER_OLD_SECRET_DOC = "The old secret that was used for encoding dynamically configured passwords. " + - "This is required only when the secret is updated. If specified, all dynamically encoded passwords are " + - "decoded using this old secret and re-encoded using " + PASSWORD_ENCODER_SECRET_CONFIG + " when broker starts up."; - - public static final String PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG = "password.encoder.keyfactory.algorithm"; - public static final String PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " + - "Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise."; - - public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG = "password.encoder.cipher.algorithm"; - public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC = "The Cipher algorithm used for encoding dynamically configured passwords."; - public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT = "AES/CBC/PKCS5Padding"; - - public static final String PASSWORD_ENCODER_KEY_LENGTH_CONFIG = "password.encoder.key.length"; - public static final String PASSWORD_ENCODER_KEY_LENGTH_DOC = "The key length used for encoding dynamically configured passwords."; - public static final int PASSWORD_ENCODER_KEY_LENGTH_DEFAULT = 128; - - public static final String PASSWORD_ENCODER_ITERATIONS_CONFIG = "password.encoder.iterations"; - public static final String PASSWORD_ENCODER_ITERATIONS_DOC = "The iteration count used for encoding dynamically configured passwords."; - public static final int PASSWORD_ENCODER_ITERATIONS_DEFAULT = 4096; - public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_DOC) - .define(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_DOC) - .define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, STRING, null, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC) - .define(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, STRING, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC) - .define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, atLeast(8), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DOC) - .define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC); -} diff --git a/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java b/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java deleted file mode 100644 index eaa8c3ec87de7..0000000000000 --- a/server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.security; - -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.server.util.Csv; - -import org.junit.jupiter.api.Test; - -import java.security.GeneralSecurityException; -import java.util.Map; - -import javax.crypto.SecretKeyFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - -class PasswordEncoderTest { - - @Test - public void testEncodeDecode() throws GeneralSecurityException { - PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), - null, - PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, - PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, - PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT); - String password = "test-password"; - String encoded = encoder.encode(new Password(password)); - Map encodedMap = Csv.parseCsvMap(encoded); - assertEquals("4096", encodedMap.get(PasswordEncoder.ITERATIONS)); - assertEquals("128", encodedMap.get(PasswordEncoder.KEY_LENGTH)); - String defaultKeyFactoryAlgorithm; - try { - SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512"); - defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA512"; - - } catch (Exception e) { - defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA1"; - } - assertEquals(defaultKeyFactoryAlgorithm, encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM)); - assertEquals("AES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM)); - verifyEncodedPassword(encoder, password, encoded); - } - - @Test - public void testEncoderConfigChange() throws GeneralSecurityException { - PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), - "PBKDF2WithHmacSHA1", - "DES/CBC/PKCS5Padding", - 64, - 1024); - String password = "test-password"; - String encoded = encoder.encode(new Password(password)); - Map encodedMap = Csv.parseCsvMap(encoded); - assertEquals("1024", encodedMap.get(PasswordEncoder.ITERATIONS)); - assertEquals("64", encodedMap.get(PasswordEncoder.KEY_LENGTH)); - assertEquals("PBKDF2WithHmacSHA1", encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM)); - assertEquals("DES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM)); - - // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered - PasswordEncoder decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), - "PBKDF2WithHmacSHA1", - "AES/CBC/PKCS5Padding", - 128, - 2048); - assertEquals(password, decoder.decode(encoded).value()); - - // Test that decoding fails if secret is altered - PasswordEncoder decoder2 = PasswordEncoder.encrypting(new Password("secret-2"), - "PBKDF2WithHmacSHA1", - "AES/CBC/PKCS5Padding", - 128, - 1024); - assertThrows(ConfigException.class, () -> decoder2.decode(encoded)); - } - - @Test - public void testEncodeDecodeAlgorithms() throws GeneralSecurityException { - verifyEncodeDecode(null, "DES/CBC/PKCS5Padding", 64); - verifyEncodeDecode(null, "DESede/CBC/PKCS5Padding", 192); - verifyEncodeDecode(null, "AES/CBC/PKCS5Padding", 128); - verifyEncodeDecode(null, "AES/CFB/PKCS5Padding", 128); - verifyEncodeDecode(null, "AES/OFB/PKCS5Padding", 128); - verifyEncodeDecode("PBKDF2WithHmacSHA1", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128); - verifyEncodeDecode(null, "AES/GCM/NoPadding", 128); - verifyEncodeDecode("PBKDF2WithHmacSHA256", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128); - verifyEncodeDecode("PBKDF2WithHmacSHA512", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128); - } - - private void verifyEncodeDecode(String keyFactoryAlg, String cipherAlg, int keyLength) throws GeneralSecurityException { - PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), - keyFactoryAlg, - cipherAlg, - keyLength, - PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT); - String password = "test-password"; - String encoded = encoder.encode(new Password(password)); - verifyEncodedPassword(encoder, password, encoded); - } - - private void verifyEncodedPassword(PasswordEncoder encoder, String password, String encoded) throws GeneralSecurityException { - Map encodedMap = Csv.parseCsvMap(encoded); - assertEquals(String.valueOf(password.length()), encodedMap.get(PasswordEncoder.PASSWORD_LENGTH)); - assertNotNull(PasswordEncoder.base64Decode(encodedMap.get("salt")), "Invalid salt"); - assertNotNull(PasswordEncoder.base64Decode(encodedMap.get(PasswordEncoder.INITIALIZATION_VECTOR)), "Invalid encoding parameters"); - assertNotNull(PasswordEncoder.base64Decode(encodedMap.get(PasswordEncoder.ENCRYPTED_PASSWORD)), "Invalid encoded password"); - assertEquals(password, encoder.decode(encoded).value()); - } -} diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index c32b89087a2b9..2e03cafe77a19 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -26,7 +26,6 @@ import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig; import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.raft.QuorumConfig; -import org.apache.kafka.security.PasswordEncoderConfigs; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.server.metrics.MetricConfigs; import org.apache.kafka.storage.internals.log.CleanerConfig; @@ -64,8 +63,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { MetricConfigs.CONFIG_DEF, QuotaConfig.CONFIG_DEF, BrokerSecurityConfigs.CONFIG_DEF, - DelegationTokenManagerConfigs.CONFIG_DEF, - PasswordEncoderConfigs.CONFIG_DEF + DelegationTokenManagerConfigs.CONFIG_DEF )); public AbstractKafkaConfig(ConfigDef definition, Map originals, Map configProviderProps, boolean doLog) {