Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
fix: fix bugs for considering eviction in parallel and others (#66)
Browse files Browse the repository at this point in the history
* fix: fix bugs for considering eviction in parallel

* fix: later finding completed threads need pin

* add comments

* add debug log

---------

Co-authored-by: xx01cyx <caoyuanxin0531@outlook.com>
  • Loading branch information
lanlou1554 and xx01cyx authored Apr 30, 2024
1 parent e198865 commit 25f46af
Show file tree
Hide file tree
Showing 3 changed files with 376 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl DiskStore {
&self,
key: &str,
disk_replacer: Arc<Mutex<R>>,
key_replacer: String,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>>
where
R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue> + 'static,
Expand All @@ -68,7 +69,6 @@ impl DiskStore {
// FIXME: Shall we consider the situation where the data is not found?
let mut disk_stream = self.disk_manager.disk_read_stream(key, buffer_size).await?;
let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_DISK_CHANNEL_BUFFER_SIZE);
let key_str = key.to_string().clone();
tokio::spawn(async move {
loop {
match disk_stream.next().await {
Expand All @@ -78,11 +78,13 @@ impl DiskStore {
.unwrap();
}
Some(Err(e)) => tx.send(Err(e)).await.unwrap(),
None => break,
None => {
// TODO(lanlou): when second read, so there is no need to unpin, how to improve?
disk_replacer.lock().await.unpin(&key_replacer);
break;
}
}
}
// TODO(lanlou): when second read, so there is no need to unpin, how to improve?
disk_replacer.lock().await.unpin(&key_str);
});
Ok(Some(rx))
}
Expand Down
Loading

0 comments on commit 25f46af

Please sign in to comment.