diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 0fc7b8a49e8c6..0fccc0cbdbcb8 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -213,11 +213,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val lock = new ReentrantReadWriteLock private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _ private var currentConfig: KafkaConfig = _ - private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) { - maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret) - } else { - Some(PasswordEncoder.NOOP) - } + private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP) private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false) @@ -373,16 +369,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging }) } - private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = { - secret.map { secret => - PasswordEncoder.encrypting(secret, - kafkaConfig.passwordEncoderKeyFactoryAlgorithm, - kafkaConfig.passwordEncoderCipherAlgorithm, - kafkaConfig.passwordEncoderKeyLength, - kafkaConfig.passwordEncoderIterations) - } - } - private def passwordEncoder: PasswordEncoder = { dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured")) } @@ -446,25 +432,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // encoded using the current secret. Ignore any errors during decoding since old secret may not // have been removed during broker restart. private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = { - val props = persistentProps.clone().asInstanceOf[Properties] - if (props.asScala.keySet.exists(isPasswordConfig)) { - maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder => - persistentProps.asScala.foreachEntry { (configName, value) => - if (isPasswordConfig(configName) && value != null) { - val decoded = try { - Some(passwordDecoder.decode(value).value) - } catch { - case _: Exception => - debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.") - None - } - decoded.foreach(value => props.put(configName, passwordEncoder.encode(new Password(value)))) - } - } - adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props) - } - } - props + persistentProps.clone().asInstanceOf[Properties] } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 513c1273d164c..9bd564140d2f8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -40,7 +40,6 @@ import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, Transacti import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.authorizer.AuthorizerUtils -import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.MetadataVersion @@ -591,14 +590,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val delegationTokenExpiryTimeMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG) val delegationTokenExpiryCheckIntervalMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG) - /** ********* Password encryption configuration for dynamic configs *********/ - def passwordEncoderSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)) - def passwordEncoderOldSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG)) - def passwordEncoderCipherAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG) - def passwordEncoderKeyFactoryAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG) - def passwordEncoderKeyLength = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG) - def passwordEncoderIterations = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG) - /** ********* Fetch Configuration **************/ val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG) val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG) diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index cc4d6a01dcc9f..5731692f98e1d 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -61,7 +61,6 @@ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG; import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG; import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; @@ -182,7 +181,6 @@ public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception { configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); // Password encoder configs - configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); // Password config update at default cluster-level should fail assertThrows(ExecutionException.class, diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 9c2d95be45c60..262b0abdc194f 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -27,13 +27,11 @@ import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.utils.TestUtils import org.apache.kafka.common.{Endpoint, Reconfigurable} import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} -import org.apache.kafka.common.config.types.Password -import org.apache.kafka.common.config.{ConfigException, SaslConfigs, SslConfigs} +import org.apache.kafka.common.config.{ConfigException, SslConfigs} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.network.SocketServerConfigs -import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig @@ -369,7 +367,6 @@ class DynamicBrokerConfigTest { private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = { val configProps = TestUtils.createBrokerConfig(0, null, port = 8181) - configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret") val config = KafkaConfig(configProps) config.dynamicConfig.initialize(None, None) @@ -416,61 +413,6 @@ class DynamicBrokerConfigTest { } } - @Test - def testPasswordConfigNotEncryption(): Unit = { - val props = TestUtils.createBrokerConfig(0, null, port = 8181) - val configWithoutSecret = KafkaConfig(props) - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret") - val configWithSecret = KafkaConfig(props) - val dynamicProps = new Properties - val password = "myLoginModule required;" - dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password) - - try { - configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) - } catch { - case _: ConfigException => // expected exception - } - val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) - assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG)) - } - - @Test - def testPasswordConfigEncoderSecretChange(): Unit = { - val props = TestUtils.createBrokerConfig(0, null, port = 8181) - props.put(SaslConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;") - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret") - val config = KafkaConfig(props) - config.dynamicConfig.initialize(None, None) - val dynamicProps = new Properties - val password = "dynamicLoginModule required;" - dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password) - - val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true) - assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG)) - config.dynamicConfig.updateBrokerConfig(0, persistedProps) - assertEquals(password, config.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) - - // New config with same secret should use the dynamic password config - val newConfigWithSameSecret = KafkaConfig(props) - newConfigWithSameSecret.dynamicConfig.initialize(None, None) - newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) - assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) - - // New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret") - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret") - val newConfigWithNewAndOldSecret = KafkaConfig(props) - newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) - assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) - - // New config with new secret alone should revert to static password config since dynamic config cannot be decoded - props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret") - val newConfigWithNewSecret = KafkaConfig(props) - newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) - assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) - } - @Test def testDynamicListenerConfig(): Unit = { val props = TestUtils.createBrokerConfig(0, null, port = 9092) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 1be0dfce3342a..32810e078ec50 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -37,7 +37,6 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig -import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2 import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs} @@ -1005,14 +1004,6 @@ class KafkaConfigTest { // Security config case SecurityConfig.SECURITY_PROVIDERS_CONFIG => - // Password encoder configs - case PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG => - case PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG => - case PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG => - case PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG => - case PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") - case PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") - //delegation token configs case DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG => // ignore case DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") diff --git a/docs/upgrade.html b/docs/upgrade.html index 57cc1882ea5ac..3263fd853df4a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -63,6 +63,10 @@
log.message.format.version
and message.format.version
configs were removed.
password.encoder.secret
, password.encoder.old.secret
,
+ password.encoder.keyfactory.algorithm
, password.encoder.cipher.algorithm
,
+ password.encoder.key.length
, and password.encoder.iterations
) were removed.
+ onNewBatch
in org.apache.kafka.clients.producer.Partitioner
class was removed.