Skip to content

Commit

Permalink
Merge pull request #51 from Power2All/v4.0.7
Browse files Browse the repository at this point in the history
v4.0.7
  • Loading branch information
Power2All authored Jan 9, 2025
2 parents 52677cb + 0d4076e commit f3491b2
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 132 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "torrust-actix"
version = "4.0.6"
version = "4.0.7"
edition = "2021"
license = "AGPL-3.0"
authors = [
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ Sentry.io support is introduced, you can enable it in the configuration and the

### ChangeLog

#### v4.0.7
* Cleanup was still broken, did a big rewrite, after testing it works now as expected
* Did some tokio threading correctly for core threads
* Added a new configuration key, to set the threads, default for each shard (256), but can be changed

#### v4.0.6
* Fixed some clippy issues
* Found a performance issue on peers cleanup
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM rust:alpine

RUN apk add git musl-dev curl pkgconfig openssl-dev openssl-libs-static
RUN git clone https://github.com/Power2All/torrust-actix.git /tmp/torrust-actix
RUN cd /tmp/torrust-actix && git checkout tags/v4.0.6
RUN cd /tmp/torrust-actix && git checkout tags/v4.0.7
WORKDIR /tmp/torrust-actix
RUN cd /tmp/torrust-actix
RUN cargo build --release && rm -Rf target/release/.fingerprint target/release/build target/release/deps target/release/examples target/release/incremental
Expand Down
1 change: 1 addition & 0 deletions src/config/impls/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl Configuration {
request_interval_minimum: 1800,
peers_timeout: 2700,
peers_cleanup_interval: 900,
peers_cleanup_threads: 256,
total_downloads: 0,
swagger: false,
prometheus_id: String::from("torrust_actix")
Expand Down
1 change: 1 addition & 0 deletions src/config/structs/tracker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct TrackerConfig {
pub request_interval_minimum: u64,
pub peers_timeout: u64,
pub peers_cleanup_interval: u64,
pub peers_cleanup_threads: u64,
pub total_downloads: u64,
pub swagger: bool,
pub prometheus_id: String,
Expand Down
46 changes: 17 additions & 29 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::mem;
use std::net::SocketAddr;
use std::process::exit;
use std::sync::Arc;
Expand All @@ -8,6 +9,7 @@ use futures_util::future::{try_join_all, TryJoinAll};
use log::{error, info};
use parking_lot::deadlock;
use sentry::ClientInitGuard;
use tokio::runtime::Builder;
use tokio_shutdown::Shutdown;
use torrust_actix::api::api::api_service;
use torrust_actix::common::common::{setup_logging, shutdown_waiting, udp_check_host_and_port_used};
Expand Down Expand Up @@ -82,10 +84,12 @@ fn main() -> std::io::Result<()>

if args.import { tracker.import(&args, tracker.clone()).await; }

let tokio_core = Builder::new_multi_thread().thread_name("core").worker_threads(9).enable_all().build()?;

let tokio_shutdown = Shutdown::new().expect("shutdown creation works on first call");

let deadlocks_handler = tokio_shutdown.clone();
tokio::spawn(async move {
tokio_core.spawn(async move {
info!("[BOOT] Starting thread for deadlocks...");
loop {
if shutdown_waiting(Duration::from_secs(10), deadlocks_handler.clone()).await {
Expand Down Expand Up @@ -136,12 +140,12 @@ fn main() -> std::io::Result<()>
}
}
if !api_futures.is_empty() {
tokio::spawn(async move {
tokio_core.spawn(async move {
let _ = try_join_all(api_futures).await;
});
}
if !apis_futures.is_empty() {
tokio::spawn(async move {
tokio_core.spawn(async move {
let _ = try_join_all(apis_futures).await;
});
}
Expand Down Expand Up @@ -174,12 +178,12 @@ fn main() -> std::io::Result<()>
}
}
if !http_futures.is_empty() {
tokio::spawn(async move {
tokio_core.spawn(async move {
let _ = try_join_all(http_futures).await;
});
}
if !https_futures.is_empty() {
tokio::spawn(async move {
tokio_core.spawn(async move {
let _ = try_join_all(https_futures).await;
});
}
Expand All @@ -199,7 +203,7 @@ fn main() -> std::io::Result<()>
let stats_handler = tokio_shutdown.clone();
let tracker_spawn_stats = tracker.clone();
info!("[BOOT] Starting thread for console updates with {} seconds delay...", tracker_spawn_stats.config.log_console_interval);
tokio::spawn(async move {
tokio_core.spawn(async move {
loop {
tracker_spawn_stats.set_stats(StatsEvent::TimestampSave, chrono::Utc::now().timestamp() + 60i64);
if shutdown_waiting(Duration::from_secs(tracker_spawn_stats.config.log_console_interval), stats_handler.clone()).await {
Expand All @@ -217,34 +221,17 @@ fn main() -> std::io::Result<()>
}
});

let cleanup_peers_handler = tokio_shutdown.clone();
let tracker_spawn_cleanup_peers = tracker.clone();
info!("[BOOT] Starting thread for peers cleanup with {} seconds delay...", tracker_spawn_cleanup_peers.config.tracker_config.clone().peers_cleanup_interval);
tokio::spawn(async move {
loop {
tracker_spawn_cleanup_peers.set_stats(StatsEvent::TimestampTimeout, chrono::Utc::now().timestamp() + tracker_spawn_cleanup_peers.config.tracker_config.clone().peers_cleanup_interval as i64);
if shutdown_waiting(Duration::from_secs(tracker_spawn_cleanup_peers.config.tracker_config.clone().peers_cleanup_interval), cleanup_peers_handler.clone()).await {
info!("[BOOT] Shutting down thread for peers cleanup...");
return;
}

info!("[PEERS] Checking now for dead peers.");
let _ = tracker_spawn_cleanup_peers.torrent_peers_cleanup(tracker_spawn_cleanup_peers.clone(), Duration::from_secs(tracker_spawn_cleanup_peers.config.tracker_config.clone().peers_timeout), tracker_spawn_cleanup_peers.config.database.clone().persistent).await;
info!("[PEERS] Peers cleaned up.");

if tracker_spawn_cleanup_peers.config.tracker_config.clone().users_enabled {
info!("[USERS] Checking now for inactive torrents in users.");
tracker_spawn_cleanup_peers.clean_user_active_torrents(Duration::from_secs(tracker_spawn_cleanup_peers.config.tracker_config.clone().peers_timeout));
info!("[USERS] Inactive torrents in users cleaned up.");
}
}
let (tracker_cleanup_clone, tokio_shutdown_cleanup_clone) = (tracker.clone(), tokio_shutdown.clone());
info!("[BOOT] Starting thread for peers cleanup with {} seconds delay...", tracker_cleanup_clone.config.tracker_config.clone().peers_cleanup_interval);
tokio_core.spawn(async move {
tracker_cleanup_clone.clone().torrents_sharding.cleanup_threads(tracker_cleanup_clone.clone(), tokio_shutdown_cleanup_clone, Duration::from_secs(tracker_cleanup_clone.config.tracker_config.clone().peers_timeout), tracker_cleanup_clone.config.database.clone().persistent).await;
});

if tracker.config.tracker_config.clone().keys_enabled {
let cleanup_keys_handler = tokio_shutdown.clone();
let tracker_spawn_cleanup_keys = tracker.clone();
info!("[BOOT] Starting thread for keys cleanup with {} seconds delay...", tracker_spawn_cleanup_keys.config.tracker_config.clone().keys_cleanup_interval);
tokio::spawn(async move {
tokio_core.spawn(async move {
loop {
tracker_spawn_cleanup_keys.set_stats(StatsEvent::TimestampKeysTimeout, chrono::Utc::now().timestamp() + tracker_spawn_cleanup_keys.config.tracker_config.clone().keys_cleanup_interval as i64);
if shutdown_waiting(Duration::from_secs(tracker_spawn_cleanup_keys.config.tracker_config.clone().keys_cleanup_interval), cleanup_keys_handler.clone()).await {
Expand All @@ -263,7 +250,7 @@ fn main() -> std::io::Result<()>
let updates_handler = tokio_shutdown.clone();
let tracker_spawn_updates = tracker.clone();
info!("[BOOT] Starting thread for database updates with {} seconds delay...", tracker_spawn_updates.config.database.clone().persistent_interval);
tokio::spawn(async move {
tokio_core.spawn(async move {
loop {
tracker_spawn_updates.set_stats(StatsEvent::TimestampSave, chrono::Utc::now().timestamp() + tracker_spawn_updates.config.database.clone().persistent_interval as i64);
if shutdown_waiting(Duration::from_secs(tracker_spawn_updates.config.database.clone().persistent_interval), updates_handler.clone()).await {
Expand Down Expand Up @@ -357,6 +344,7 @@ fn main() -> std::io::Result<()>
task::sleep(Duration::from_secs(1)).await;

info!("Server shutting down completed");
mem::forget(tokio_core);
Ok(())
}
}
Expand Down
111 changes: 110 additions & 1 deletion src/tracker/impls/torrent_sharding.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::mem;
use std::sync::Arc;
use std::time::Duration;
use log::info;
use parking_lot::RwLock;
use tokio::runtime::Builder;
use tokio_shutdown::Shutdown;
use crate::common::common::shutdown_waiting;
use crate::stats::enums::stats_event::StatsEvent;
use crate::tracker::structs::info_hash::InfoHash;
use crate::tracker::structs::peer_id::PeerId;
use crate::tracker::structs::torrent_entry::TorrentEntry;
use crate::tracker::structs::torrent_sharding::TorrentSharding;
use crate::tracker::structs::torrent_tracker::TorrentTracker;

#[allow(dead_code)]
impl TorrentSharding {
Expand Down Expand Up @@ -274,6 +284,105 @@ impl TorrentSharding {
}
}

pub async fn cleanup_threads(&self, torrent_tracker: Arc<TorrentTracker>, shutdown: Shutdown, peer_timeout: Duration, persistent: bool)
{
let tokio_threading = match torrent_tracker.clone().config.tracker_config.peers_cleanup_threads {
0 => {
Builder::new_current_thread().thread_name("sharding").enable_all().build().unwrap()
}
_ => {
Builder::new_multi_thread().thread_name("sharding").worker_threads(torrent_tracker.clone().config.tracker_config.peers_cleanup_threads as usize).enable_all().build().unwrap()
}
};
for shard in 0u8..=255u8 {
let torrent_tracker_clone = torrent_tracker.clone();
let shutdown_clone = shutdown.clone();
tokio_threading.spawn(async move {
loop {
if shutdown_waiting(Duration::from_secs(torrent_tracker_clone.clone().config.tracker_config.peers_cleanup_interval), shutdown_clone.clone()).await {
return;
}

let (mut torrents, mut seeds, mut peers) = (0u64, 0u64, 0u64);
let shard_data = torrent_tracker_clone.clone().torrents_sharding.get_shard_content(shard);
for (info_hash, torrent_entry) in shard_data.iter() {
for (peer_id, torrent_peer) in torrent_entry.seeds.iter() {
if torrent_peer.updated.elapsed() > peer_timeout {
let shard = torrent_tracker_clone.clone().torrents_sharding.get_shard(shard).unwrap();
let mut lock = shard.write();
match lock.entry(*info_hash) {
Entry::Vacant(_) => {}
Entry::Occupied(mut o) => {
if o.get_mut().seeds.remove(&peer_id).is_some() {
torrent_tracker_clone.clone().update_stats(StatsEvent::Seeds, -1);
seeds += 1;
};
if o.get_mut().peers.remove(&peer_id).is_some() {
torrent_tracker_clone.clone().update_stats(StatsEvent::Peers, -1);
peers += 1;
};
if !persistent && o.get().seeds.is_empty() && o.get().peers.is_empty() {
lock.remove(info_hash);
torrent_tracker_clone.clone().update_stats(StatsEvent::Torrents, -1);
torrents += 1;
}
}
}
}
}
for (peer_id, torrent_peer) in torrent_entry.peers.iter() {
if torrent_peer.updated.elapsed() > peer_timeout {
let shard = torrent_tracker_clone.clone().torrents_sharding.get_shard(shard).unwrap();
let mut lock = shard.write();
match lock.entry(*info_hash) {
Entry::Vacant(_) => {}
Entry::Occupied(mut o) => {
if o.get_mut().seeds.remove(&peer_id).is_some() {
torrent_tracker_clone.clone().update_stats(StatsEvent::Seeds, -1);
seeds += 1;
};
if o.get_mut().peers.remove(&peer_id).is_some() {
torrent_tracker_clone.clone().update_stats(StatsEvent::Peers, -1);
peers += 1;
};
if !persistent && o.get().seeds.is_empty() && o.get().peers.is_empty() {
lock.remove(info_hash);
torrent_tracker_clone.clone().update_stats(StatsEvent::Torrents, -1);
torrents += 1;
}
}
}
}
}
}
info!("[PEERS] Shard: {} - Torrents: {} - Seeds: {} - Peers: {}", shard, torrents, seeds, peers);
}
});
}
shutdown.clone().handle().await;
mem::forget(tokio_threading);
}

#[tracing::instrument(level = "debug")]
pub fn contains_torrent(&self, info_hash: InfoHash) -> bool
{
self.get_shard_content(info_hash.0[0]).contains_key(&info_hash)
}

#[tracing::instrument(level = "debug")]
pub fn contains_peer(&self, info_hash: InfoHash, peer_id: PeerId) -> bool
{
match self.get_shard_content(info_hash.0[0]).get(&info_hash) {
None => { false }
Some(torrent_entry) => {
if torrent_entry.seeds.contains_key(&peer_id) || torrent_entry.peers.contains_key(&peer_id) {
return true;
}
false
}
}
}

#[tracing::instrument(level = "debug")]
#[allow(unreachable_patterns)]
pub fn get_shard(&self, shard: u8) -> Option<Arc<RwLock<BTreeMap<InfoHash, TorrentEntry>>>>
Expand Down Expand Up @@ -542,7 +651,7 @@ impl TorrentSharding {
#[tracing::instrument(level = "debug")]
pub fn get_shard_content(&self, shard: u8) -> BTreeMap<InfoHash, TorrentEntry>
{
self.get_shard(shard).unwrap().read().clone()
self.get_shard(shard).unwrap().read_recursive().clone()
}

#[tracing::instrument(level = "debug")]
Expand Down
3 changes: 2 additions & 1 deletion src/tracker/impls/torrent_tracker_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ impl TorrentTracker {
let torrent_entry = match data.remove_torrent_peer(
announce_query.info_hash,
announce_query.peer_id,
data.config.database.clone().persistent
data.config.database.clone().persistent,
false
) {
(Some(_), None) => {
TorrentEntry::new()
Expand Down
Loading

0 comments on commit f3491b2

Please sign in to comment.