Skip to content

Commit

Permalink
fix: backoff reset at sucess
Browse files Browse the repository at this point in the history
Signed-off-by: nabil salah <nabil.salah203@gmail.com>
  • Loading branch information
Nabil-Salah committed Dec 11, 2024
1 parent ab939c1 commit 3bddd0d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
36 changes: 23 additions & 13 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn set_limits() -> Result<()> {
Ok(())
}

async fn app(args: Args, tx: tokio::sync::oneshot::Sender<()>) -> Result<()> {
async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> {
if args.workers == 0 {
anyhow::bail!("number of workers cannot be zero");
}
Expand Down Expand Up @@ -173,29 +173,39 @@ async fn app(args: Args, tx: tokio::sync::oneshot::Sender<()>) -> Result<()> {

let mut l = events::Listener::new(args.substrate, redis_cache).await?;
tokio::spawn(async move {
let max_retries = 7; // max wait is 2^7 = 128 seconds ( 2 minutes)
let max_retries = 9; // max wait is 2^9 = 512 seconds ( 5 minutes )
let mut attempt = 0;
let mut backoff = Duration::from_secs(1);
let mut got_hit = false;

loop {
match l.listen().await.context("failed to listen to chain events") {
match l
.listen(&mut got_hit)
.await
.context("failed to listen to chain events")
{
Ok(_) => break,
Err(e) => {
if got_hit {
log::warn!("Listener got a hit, but failed to listen to chain events before no attempts will be reset");
got_hit = false;
attempt = 0;
backoff = Duration::from_secs(1);
}
attempt += 1;
if attempt > max_retries {
log::error!("Listener failed after {} attempts: {:?}", attempt, e);
log::error!("Listener failed after {} attempts: {:?}", attempt - 1, e);
let _ = tx.send(());
break;
} else {
log::warn!(
"Listener failed on attempt {}: {:?}. Retrying in {:?}...",
attempt,
e,
backoff
);
tokio::time::sleep(backoff).await;
backoff *= 2;
}
log::warn!(
"Listener failed on attempt {}: {:?}. Retrying in {:?}...",
attempt,
e,
backoff
);
tokio::time::sleep(backoff).await;
backoff *= 2;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where
anyhow::bail!("failed to connect to substrate using the provided urls")
}

pub async fn listen(&mut self) -> Result<()> {
pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> {
loop {
// always flush in case some blocks were finalized before reconnecting
if let Err(err) = self.cache.flush().await {
Expand All @@ -73,6 +73,8 @@ where
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
}
} else {
*got_hit = true
}
}
}
Expand Down

0 comments on commit 3bddd0d

Please sign in to comment.