mod sigma; use std::{ collections::HashMap, fs, mem, path::PathBuf, ptr, sync::Arc, time::Duration, }; use aya::{maps::RingBuf, Ebpf}; use common_lib::{ EventExecve, event::{EngineSource, NormalizedEvent}, }; use tokio::{io::unix::AsyncFd, sync::RwLock, time}; use uuid::Uuid; use crate::alert::{Alert, AlertBus}; use crate::sinks::AuditLog; /// Lit les arguments depuis /proc//cmdline (bytes séparés par \0) dans le buffer fourni. /// Le buffer est laissé tel quel si le fichier est absent (process déjà terminé). fn read_cmdline(pid: u32, buf: &mut [u8]) { let path = format!("/proc/{}/cmdline", pid); if let Ok(bytes) = fs::read(&path) { let len = bytes.len().min(buf.len()); buf[..len].copy_from_slice(&bytes[..len]); } } /// Lit le PPID depuis /proc//status. /// Retourne 0 si le fichier est absent ou le champ introuvable (process déjà terminé). fn read_ppid(pid: u32) -> u32 { let path = format!("/proc/{}/status", pid); let content = match fs::read_to_string(&path) { Ok(s) => s, Err(_) => return 0, }; for line in content.lines() { if let Some(rest) = line.strip_prefix("PPid:") { return rest.trim().parse().unwrap_or(0); } } 0 } /// Lit le UID réel du processus depuis `/proc//status` (ligne `Uid:`). /// Retourne 0 si le fichier est absent (processus déjà terminé). fn read_uid(pid: u32) -> u32 { let path = format!("/proc/{}/status", pid); let content = match fs::read_to_string(&path) { Ok(s) => s, Err(_) => return 0, }; for line in content.lines() { if let Some(rest) = line.strip_prefix("Uid:") { // Format : "Uid:\treal\teffective\tsaved\tfilesystem" if let Some(real_uid) = rest.split_whitespace().next() { return real_uid.parse().unwrap_or(0); } } } 0 } /// Résout un UID en nom d'utilisateur via `/etc/passwd`. /// Retourne la représentation décimale du UID si la résolution échoue. fn uid_to_username(uid: u32) -> String { if let Ok(content) = fs::read_to_string("/etc/passwd") { for line in content.lines() { let mut fields = line.splitn(4, ':'); let username = fields.next().unwrap_or(""); fields.next(); // mot de passe (x) let line_uid: u32 = fields.next().unwrap_or("").parse().unwrap_or(u32::MAX); if line_uid == uid { return username.to_string(); } } } uid.to_string() } fn bytes_to_string(buf: &[u8]) -> String { let end = buf.iter().position(|&b| b == 0).unwrap_or(buf.len()); String::from_utf8_lossy(&buf[..end]).into_owned() } fn args_to_string(buf: &[u8]) -> String { let end = buf.iter().rposition(|&b| b != 0).map(|i| i + 1).unwrap_or(0); buf[..end] .split(|&b| b == 0) .filter(|s| !s.is_empty()) .map(|s| String::from_utf8_lossy(s)) .collect::>() .join(" ") } /// Convertit un `EventExecve` brut (issu du ring buffer) en `NormalizedEvent`. /// C'est ici que le couplage fort avec le type kernel s'arrête. fn normalize_execve(event: &EventExecve) -> NormalizedEvent { let mut fields = HashMap::new(); let uid = read_uid(event.pid); let user = uid_to_username(uid); fields.insert("Image".to_string(), bytes_to_string(&event.filename)); fields.insert("CommandLine".to_string(), args_to_string(&event.args)); fields.insert("ProcessId".to_string(), event.pid.to_string()); fields.insert("ParentProcessId".to_string(), event.ppid.to_string()); fields.insert("User".to_string(), user); fields.insert("UserId".to_string(), uid.to_string()); NormalizedEvent { source: EngineSource::Execve, category: "process_creation".to_string(), fields, } } /// Démarre deux tâches tokio : /// /// 1. **Ingestion** — lit le ring buffer via `AsyncFd` et évalue les règles Sigma. /// N'acquiert qu'un read lock sur le `RuleStore` : jamais bloquée par un reload. /// Dispatche les alertes vers tous les sinks via `AlertBus`. /// /// 2. **Reload** — toutes les 5 secondes, recharge les règles depuis le disque /// dans une tâche dédiée. Une I/O lente (NFS, disque sous charge) n'impacte /// pas le pipeline d'ingestion. pub fn start( mut ebpf: Ebpf, bus: AlertBus, audit_log: AuditLog, rules_dir: PathBuf, ) -> tokio::task::JoinHandle<()> { let rule_store = Arc::new(RwLock::new( sigma::RuleStore::load_from_dir(&rules_dir) )); // Tâche de reload : write lock exclusif, toutes les 5 secondes. // Isolée du pipeline d'ingestion — une I/O lente ne bloque que cette tâche. let store_for_reload = Arc::clone(&rule_store); tokio::spawn(async move { let mut ticker = time::interval(Duration::from_secs(5)); ticker.tick().await; // consomme le tick immédiat initial loop { ticker.tick().await; store_for_reload.write().await.reload(); } }); // Tâche d'ingestion : read lock partagé, jamais bloquée par le reload. tokio::spawn(async move { let map = ebpf .take_map("EXECVE_EVENTS") .expect("Map EXECVE_EVENTS introuvable"); let ring_buf = RingBuf::try_from(map) .expect("Impossible de créer le RingBuf"); let mut async_fd = AsyncFd::new(ring_buf) .expect("Impossible de créer l'AsyncFd pour le ring buffer"); loop { let mut guard = async_fd.readable_mut().await .expect("Erreur AsyncFd readable"); while let Some(item) = guard.get_inner_mut().next() { if item.len() >= mem::size_of::() { let mut raw: EventExecve = unsafe { ptr::read_unaligned(item.as_ptr() as *const EventExecve) }; raw.ppid = read_ppid(raw.pid); read_cmdline(raw.pid, &mut raw.args); let event = normalize_execve(&raw); let event_id = Uuid::new_v4().to_string(); // Audit log : tous les events, avant évaluation des règles. audit_log.write(&event, &event_id); if let Some((rule, severity)) = rule_store.read().await.first_match(&event) { bus.emit(Alert::new( rule.to_string(), severity.to_string(), event_id, event.fields, )); } } } guard.clear_ready(); } }) }