diff --git a/.circleci/config.yml b/.circleci/config.yml index 35523f3..d55635d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -226,7 +226,7 @@ jobs: name: zookeeper environment: - ALLOW_ANONYMOUS_LOGIN=yes - - image: docker.io/bitnami/kafka:3 + - image: docker.io/bitnami/kafka:3.9.0 name: kafka-0 environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 @@ -237,18 +237,19 @@ jobs: - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://kafka-0:9093 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false - - image: docker.io/bitnami/kafka:3 + - image: docker.io/bitnami/kafka:3.9.0 name: kafka-1 environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_BROKER_ID=1 - ALLOW_PLAINTEXT_LISTENER=yes - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://kafka-1:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,SASL_PLAIN:SASL_PLAINTEXT + - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093,SASL_PLAIN://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://kafka-1:9093,SASL_PLAIN://kafka-1:9094 + - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false - - image: docker.io/bitnami/kafka:3 + - image: docker.io/bitnami/kafka:3.9.0 name: kafka-2 environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 @@ -279,6 +280,7 @@ jobs: # Don't use the first node here since this is likely the controller and we want to ensure that we automatically # pick the controller for certain actions (e.g. topic creation) and don't just get lucky. KAFKA_CONNECT: "invalid:9093,kafka-1:9093" + KAFKA_SASL_CONNECT: kafka-1:9094 SOCKS_PROXY: "proxy:1080" steps: - checkout diff --git a/src/messenger.rs b/src/messenger.rs index 34fea00..06a8d60 100644 --- a/src/messenger.rs +++ b/src/messenger.rs @@ -610,7 +610,9 @@ where let authentication_response = self.sasl_authentication(to_sent.into_inner()).await?; data_received = Some(authentication_response.auth_bytes.0); - } else { + } + + if state.is_finished() { break; } } diff --git a/tests/client.rs b/tests/client.rs index 024dc7b..b87d4bb 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -44,6 +44,10 @@ async fn test_sasl() { .sasl_config(rskafka::client::SaslConfig::Plain( rskafka::client::Credentials::new("admin".to_string(), "admin-secret".to_string()), )) + .backoff_config(BackoffConfig { + deadline: Some(Duration::from_secs(1)), + ..Default::default() + }) .build() .await .unwrap();