Skip to content

Commit 5ad5adb

Browse files
Az107LoboGuardianmiky-rola
authored
Fix/shutdown (#35)
* Update Cargo.toml, README, and CONTRIBUTING link for better documentation and accessibility 🦜🕺 (#32) * Update the mod in src * Add shutdown logger * Add the handler for shutdown * Use libc to handle unix * Add shutdown.rs * Update the mod in src * Add the handler for shutdown * Use libc to handle unix * Fix shutdown module compilation error * Fix shared reference in unix * Fix shared reference in unix * Access the global without shared reference * Fix temporary value errors in shutdown.rs * Simplified exit logic in unix Add force quiting for repeatedly ctrl+c press * Move close logic to main thread. * Change signal handling and add safety checks to avoid UB * Basic implementation to force the awake of listener thread. this will attempt to connect to itself to wake the listener and check if the flag is set to false * Refactor to reach scalability and security in unix sign handler * Add setup for graceful shutdown --------- Co-authored-by: LoboGuardian 🐺 <[email protected]> Co-authored-by: miky-rola <[email protected]>
1 parent c5a3804 commit 5ad5adb

File tree

7 files changed

+330
-74
lines changed

7 files changed

+330
-74
lines changed

Cargo.lock

Lines changed: 10 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ documentation = "https://docs.rs/hteapot/"
1010
homepage = "https://github.com/az107/HTeaPot"
1111
repository = "https://github.com/az107/HTeaPot"
1212
keywords = ["http", "server", "web", "lightweight", "rust"]
13-
categories = ["network-programming", "web-programming", "command-line-utilities"]
13+
categories = [
14+
"network-programming",
15+
"web-programming",
16+
"command-line-utilities",
17+
]
1418
exclude = ["config.toml", "demo/", "README.md"]
1519

1620
[lib]
@@ -20,5 +24,9 @@ path = "src/hteapot/mod.rs"
2024
[[bin]]
2125
name = "hteapot"
2226

27+
[dependencies]
28+
libc = "0.2.172"
29+
30+
2331
[package.metadata.docs.rs]
24-
no-readme = true
32+
no-readme = true

src/hteapot/mod.rs

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Written by Alberto Ruiz 2024-03-08
2-
//
2+
//
33
// This is the HTTP server module, it will handle the requests and responses
44
// Also provides utilities to parse the requests and build the response
55

@@ -17,11 +17,11 @@
1717
//! ```
1818
1919
/// Submodules for HTTP functionality.
20-
pub mod brew; // HTTP client implementation
21-
mod methods; // HTTP method and status enums
22-
mod request; // Request parsing and builder
23-
mod response; // Response types and streaming
24-
mod status; // Status code mapping
20+
pub mod brew; // HTTP client implementation
21+
mod methods; // HTTP method and status enums
22+
mod request; // Request parsing and builder
23+
mod response; // Response types and streaming
24+
mod status; // Status code mapping
2525

2626
// Internal types used for connection management
2727
use self::response::{EmptyHttpResponse, HttpResponseCommon, IterError};
@@ -37,6 +37,7 @@ pub use self::status::HttpStatus;
3737
use std::collections::VecDeque;
3838
use std::io::{self, Read, Write};
3939
use std::net::{Shutdown, TcpListener, TcpStream};
40+
use std::sync::atomic::{AtomicBool, Ordering};
4041
use std::sync::{Arc, Condvar, Mutex};
4142
use std::thread;
4243
use std::time::{Duration, Instant};
@@ -76,6 +77,8 @@ pub struct Hteapot {
7677
port: u16,
7778
address: String,
7879
threads: u16,
80+
shutdown_signal: Option<Arc<AtomicBool>>,
81+
shutdown_hooks: Vec<Arc<dyn Fn() + Send + Sync + 'static>>,
7982
}
8083

8184
/// Represents the state of a connection's lifecycle.
@@ -95,12 +98,33 @@ struct SocketData {
9598
}
9699

97100
impl Hteapot {
101+
pub fn set_shutdown_signal(&mut self, signal: Arc<AtomicBool>) {
102+
self.shutdown_signal = Some(signal);
103+
}
104+
105+
pub fn get_shutdown_signal(&self) -> Option<Arc<AtomicBool>> {
106+
self.shutdown_signal.clone()
107+
}
108+
109+
pub fn add_shutdown_hook<F>(&mut self, hook: F)
110+
where
111+
F: Fn() + Send + Sync + 'static,
112+
{
113+
self.shutdown_hooks.push(Arc::new(hook));
114+
}
115+
116+
pub fn get_addr(&self) -> (String, u16) {
117+
return (self.address.clone(), self.port);
118+
}
119+
98120
// Constructor
99121
pub fn new(address: &str, port: u16) -> Self {
100122
Hteapot {
101123
port,
102124
address: address.to_string(),
103125
threads: 1,
126+
shutdown_signal: None,
127+
shutdown_hooks: Vec::new(),
104128
}
105129
}
106130

@@ -109,6 +133,8 @@ impl Hteapot {
109133
port,
110134
address: address.to_string(),
111135
threads: if threads == 0 { 1 } else { threads },
136+
shutdown_signal: None,
137+
shutdown_hooks: Vec::new(),
112138
}
113139
}
114140

@@ -132,23 +158,34 @@ impl Hteapot {
132158
Arc::new(Mutex::new(vec![0; self.threads as usize]));
133159
let arc_action = Arc::new(action);
134160

161+
// Clone shutdown_signal and share the shutdown_hooks via Arc
162+
let shutdown_signal = self.shutdown_signal.clone();
163+
let shutdown_hooks = Arc::new(self.shutdown_hooks.clone());
164+
135165
for thread_index in 0..self.threads {
136166
let pool_clone = pool.clone();
137167
let action_clone = arc_action.clone();
138168
let priority_list_clone = priority_list.clone();
169+
let shutdown_signal_clone = shutdown_signal.clone();
139170

140171
thread::spawn(move || {
141172
let mut streams_to_handle = Vec::new();
142173
loop {
143174
{
144175
let (lock, cvar) = &*pool_clone;
145176
let mut pool = lock.lock().expect("Error locking pool");
146-
147177
if streams_to_handle.is_empty() {
148178
// Store the returned guard back into pool
149-
pool = cvar.wait_while(pool, |pool| pool.is_empty())
179+
pool = cvar
180+
.wait_while(pool, |pool| pool.is_empty())
150181
.expect("Error waiting on cvar");
151182
}
183+
//TODO: move this to allow process the last request
184+
if let Some(signal) = &shutdown_signal_clone {
185+
if !signal.load(Ordering::SeqCst) {
186+
break; // Exit the server loop
187+
}
188+
}
152189

153190
while let Some(stream) = pool.pop_back() {
154191
let socket_status = SocketStatus {
@@ -185,6 +222,17 @@ impl Hteapot {
185222
}
186223

187224
loop {
225+
if let Some(signal) = &shutdown_signal {
226+
if !signal.load(Ordering::SeqCst) {
227+
let (lock, cvar) = &*pool;
228+
let _guard = lock.lock().unwrap();
229+
cvar.notify_all();
230+
for hook in shutdown_hooks.iter() {
231+
hook();
232+
}
233+
break;
234+
}
235+
}
188236
let stream = match listener.accept() {
189237
Ok((stream, _)) => stream,
190238
Err(_) => continue,
@@ -216,17 +264,14 @@ impl Hteapot {
216264
) -> Option<()> {
217265
let status = socket_data.status.as_mut()?;
218266

219-
// Fix by miky-rola 2025-04-08
220267
// Check if the TTL (time-to-live) for the connection has expired.
221-
// If the connection is idle for longer than `KEEP_ALIVE_TTL` and no data is being written,
222-
// the connection is gracefully shut down to free resources.
223268
if Instant::now().duration_since(status.ttl) > KEEP_ALIVE_TTL && !status.write {
224269
let _ = socket_data.stream.shutdown(Shutdown::Both);
225270
return None;
226271
}
272+
227273
// If the request is not yet complete, read data from the stream into a buffer.
228274
// This ensures that the server can handle partial or chunked requests.
229-
230275
if !status.request.done {
231276
let mut buffer = [0; BUFFER_SIZE];
232277
match socket_data.stream.read(&mut buffer) {
@@ -286,8 +331,7 @@ impl Hteapot {
286331
status.response = response;
287332
}
288333

289-
// Write the response to the client in chunks using the `peek` and `next` methods.
290-
// This ensures that large responses are sent incrementally without blocking the server.
334+
// Write the response to the client in chunks
291335
loop {
292336
match status.response.peek() {
293337
Ok(n) => match socket_data.stream.write(&n) {
@@ -390,4 +434,4 @@ mod tests {
390434
assert!(response_str.contains("Server: HTeaPot/"));
391435
assert!(second_response_str.contains("Second Request"));
392436
}
393-
}
437+
}

src/hteapot/request.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ impl HttpRequestBuilder {
190190
if parts.len() != 3 {
191191
return Err("Invalid method + path + version request");
192192
}
193-
194193
self.request.method = HttpMethod::from_str(parts[0]);
195194
let path_parts: Vec<&str> = parts[1].split('?').collect();
196195
self.request.path = path_parts[0].to_string();

src/logger.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
use std::fmt;
12
use std::io::Write;
2-
use std::sync::mpsc::{channel, Sender};
3+
use std::sync::Arc;
4+
use std::sync::mpsc::{Sender, channel};
35
use std::thread::{self, JoinHandle};
46
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5-
use std::fmt;
6-
use std::sync::Arc;
77

88
/// Differnt log levels
99
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Copy)]
@@ -86,7 +86,15 @@ impl SimpleTime {
8686
// calculate millisecs from nanosecs
8787
let millis = nanos / 1_000_000;
8888

89-
(year, month as u32 + 1, day as u32, hour, minute, second, millis)
89+
(
90+
year,
91+
month as u32 + 1,
92+
day as u32,
93+
hour,
94+
minute,
95+
second,
96+
millis,
97+
)
9098
}
9199

92100
/// Returns a formatted timestamp string for the current system time.
@@ -133,7 +141,7 @@ impl Logger {
133141
pub fn new<W: Sized + Write + Send + Sync + 'static>(
134142
mut writer: W,
135143
min_level: LogLevel,
136-
component: &str
144+
component: &str,
137145
) -> Logger {
138146
let (tx, rx) = channel::<LogMessage>();
139147
let thread = thread::spawn(move || {
@@ -151,7 +159,7 @@ impl Logger {
151159
msg.timestamp, msg.level, msg.component, msg.content
152160
);
153161
buff.push(formatted);
154-
},
162+
}
155163
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
156164
Err(_) => break,
157165
}
@@ -238,8 +246,8 @@ impl Logger {
238246
pub fn fatal(&self, content: String) {
239247
self.log(LogLevel::FATAL, content);
240248
}
241-
/// Log a message with TRACE level
242-
#[allow(dead_code)]
249+
/// Log a message with TRACE level
250+
#[allow(dead_code)]
243251
pub fn trace(&self, content: String) {
244252
self.log(LogLevel::TRACE, content);
245253
}
@@ -255,20 +263,19 @@ impl Clone for Logger {
255263
}
256264
}
257265

258-
259266
#[cfg(test)]
260267
mod tests {
261268
use super::*;
262269
use std::io::stdout;
263-
270+
264271
#[test]
265272
fn test_basic() {
266273
let logs = Logger::new(stdout(), LogLevel::DEBUG, "test");
267274
logs.info("test message".to_string());
268275
logs.debug("debug info".to_string());
269-
276+
270277
// Create a sub-logger with a different component
271278
let sub_logger = logs.with_component("sub-component");
272279
sub_logger.warn("warning from sub-component".to_string());
273280
}
274-
}
281+
}

0 commit comments

Comments
 (0)