diff --git a/.circleci/config.yml b/.circleci/config.yml index 35523f3..7fddbd1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -279,6 +279,7 @@ jobs: # Don't use the first node here since this is likely the controller and we want to ensure that we automatically # pick the controller for certain actions (e.g. topic creation) and don't just get lucky. KAFKA_CONNECT: "invalid:9093,kafka-1:9093" + KAFKA_SASL_CONNECT: kafka-1:9093 SOCKS_PROXY: "proxy:1080" steps: - checkout diff --git a/src/messenger.rs b/src/messenger.rs index 34fea00..06a8d60 100644 --- a/src/messenger.rs +++ b/src/messenger.rs @@ -610,7 +610,9 @@ where let authentication_response = self.sasl_authentication(to_sent.into_inner()).await?; data_received = Some(authentication_response.auth_bytes.0); - } else { + } + + if state.is_finished() { break; } } diff --git a/tests/client.rs b/tests/client.rs index 024dc7b..641a4ab 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -40,13 +40,17 @@ async fn test_sasl() { if test_cfg.broker_impl != BrokerImpl::Kafka { return; } - ClientBuilder::new(vec![env::var("KAFKA_SASL_CONNECT").unwrap()]) - .sasl_config(rskafka::client::SaslConfig::Plain( - rskafka::client::Credentials::new("admin".to_string(), "admin-secret".to_string()), - )) - .build() - .await - .unwrap(); + tokio::time::timeout( + Duration::from_secs(2), + ClientBuilder::new(vec![env::var("KAFKA_SASL_CONNECT").unwrap()]) + .sasl_config(rskafka::client::SaslConfig::Plain( + rskafka::client::Credentials::new("admin".to_string(), "admin-secret".to_string()), + )) + .build(), + ) + .await + .unwrap() + .unwrap(); } #[tokio::test]