Skip to content

Commit

Permalink
server: sender into the worker structure
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaobo Liu <cppcoffee@gmail.com>
  • Loading branch information
cppcoffee committed Jan 7, 2024
1 parent c6e117a commit d956980
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions server/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct Worker {
config: Arc<Config>,
conntrack_map: Arc<ConntrackMap>,
verifying_key: VerifyingKey<Sha256>,
sender: Sender,
}

impl Worker {
Expand All @@ -33,16 +34,18 @@ impl Worker {
) -> Result<Worker> {
let public_key = RsaPublicKey::read_pkcs1_pem_file(&config.auth.key)?;
let verifying_key = VerifyingKey::<Sha256>::new(public_key);
let sender = Sender::new()?;

Ok(Worker {
config,
queue_num,
conntrack_map,
verifying_key,
sender,
})
}

pub fn start(self) -> Result<()> {
pub fn start(mut self) -> Result<()> {
let queue_num = self.queue_num;

let mut queue = Queue::open()?;
Expand All @@ -53,16 +56,14 @@ impl Worker {
queue.set_recv_security_context(queue_num, true)?;
queue.set_recv_uid_gid(queue_num, true)?;

let mut sender = Sender::new()?;

thread::spawn(move || {
if let Err(e) = util::set_thread_priority() {
error!("nfq {queue_num} failed to set thread priority: {e}");
return;
}

loop {
if let Err(e) = self.event_handler(&mut sender, &mut queue) {
if let Err(e) = self.event_handler(&mut queue) {
error!("nfq {queue_num} failed handle event: {e}");
continue;
}
Expand All @@ -72,16 +73,15 @@ impl Worker {
Ok(())
}

fn event_handler(&self, sender: &mut Sender, queue: &mut Queue) -> Result<()> {
fn event_handler(&mut self, queue: &mut Queue) -> Result<()> {
let mut verdict = Verdict::Drop;
let mut msg = queue.recv()?;
let payload = msg.get_payload();
let version = (payload[0] >> 4) & 0xF;

match version {
4 => verdict = self.ipv4_packet_handler(sender, payload)?,
// TODO: implement ipv6
//6 => verdict = self.handle_ipv6_packet(sender, payload)?,
4 => verdict = self.ipv4_packet_handler(payload)?,
//6 => verdict = self.ipv6_packet_handler( payload)?,
x => error!("nfq {} received unknown IP version: {x}", self.queue_num),
}

Expand All @@ -91,7 +91,7 @@ impl Worker {
Ok(())
}

fn ipv4_packet_handler(&self, sender: &mut Sender, payload: &[u8]) -> Result<Verdict> {
fn ipv4_packet_handler(&mut self, payload: &[u8]) -> Result<Verdict> {
let ip_header = Ipv4Packet::new(payload).ok_or(anyhow!("Malformed IPv4 packet"))?;

let source = IpAddr::V4(ip_header.get_source());
Expand All @@ -101,7 +101,6 @@ impl Worker {
let ip_packet = util::packet_header(&ip_header);

let verdict = self.transport_protocol_handler(
sender,
source,
destination,
protocol,
Expand All @@ -113,8 +112,7 @@ impl Worker {
}

fn transport_protocol_handler(
&self,
sender: &mut Sender,
&mut self,
src_ip: IpAddr,
dst_ip: IpAddr,
protocol: IpNextHeaderProtocol,
Expand All @@ -123,12 +121,11 @@ impl Worker {
) -> Result<Verdict> {
match protocol {
IpNextHeaderProtocols::Udp => {
let verdict =
self.udp_packet_handler(sender, src_ip, dst_ip, ip_packet, payload)?;
let verdict = self.udp_packet_handler(src_ip, dst_ip, ip_packet, payload)?;
return Ok(verdict);
}
IpNextHeaderProtocols::Tcp => {
let verdict = self.tcp_packet_handler(sender, src_ip, dst_ip, payload)?;
let verdict = self.tcp_packet_handler(src_ip, dst_ip, payload)?;
return Ok(verdict);
}
_ => {
Expand All @@ -143,8 +140,7 @@ impl Worker {
}

fn udp_packet_handler(
&self,
sender: &mut Sender,
&mut self,
src_ip: IpAddr,
dst_ip: IpAddr,
ip_packet: &[u8],
Expand All @@ -160,16 +156,17 @@ impl Worker {

let verdict =
self.verify_packet(src_ip, src_port, dst_ip, dst_port, Protocol::Udp, payload)?;

if verdict != Verdict::Accept {
sender.emit_icmpv4_unreachable(&src_ip, ip_packet, &udp_header)?;
self.sender
.emit_icmpv4_unreachable(&src_ip, ip_packet, &udp_header)?;
}

Ok(verdict)
}

fn tcp_packet_handler(
&self,
sender: &mut Sender,
&mut self,
src_ip: IpAddr,
dst_ip: IpAddr,
payload: &[u8],
Expand All @@ -184,8 +181,9 @@ impl Worker {

let verdict =
self.verify_packet(src_ip, src_port, dst_ip, dst_port, Protocol::Tcp, payload)?;

if verdict != Verdict::Accept {
sender.emit_tcp_rst(&src_ip, &dst_ip, &tcp_header)?;
self.sender.emit_tcp_rst(&src_ip, &dst_ip, &tcp_header)?;
}

Ok(verdict)
Expand Down

0 comments on commit d956980

Please sign in to comment.