14 Poll, PollOpt, Ready, Token, |
14 Poll, PollOpt, Ready, Token, |
15 }; |
15 }; |
16 use netbuf; |
16 use netbuf; |
17 use slab::Slab; |
17 use slab::Slab; |
18 |
18 |
19 use super::{core::HWServer, coretypes::ClientId, handlers, io::FileServerIO}; |
19 use super::{core::FileServerIO, core::HWServer, coretypes::ClientId, handlers}; |
20 use crate::{ |
20 use crate::{ |
21 protocol::{messages::*, ProtocolDecoder}, |
21 protocol::{messages::*, ProtocolDecoder}, |
22 utils, |
22 utils, |
23 }; |
23 }; |
|
24 |
|
25 #[cfg(feature = "official-server")] |
|
26 use super::io::{IOThread, RequestId}; |
|
27 |
|
28 use crate::server::handlers::{IoResult, IoTask}; |
24 #[cfg(feature = "tls-connections")] |
29 #[cfg(feature = "tls-connections")] |
25 use openssl::{ |
30 use openssl::{ |
26 error::ErrorStack, |
31 error::ErrorStack, |
27 ssl::{ |
32 ssl::{ |
28 HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype, |
33 HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype, |
232 #[cfg(feature = "tls-connections")] |
237 #[cfg(feature = "tls-connections")] |
233 struct ServerSsl { |
238 struct ServerSsl { |
234 context: SslContext, |
239 context: SslContext, |
235 } |
240 } |
236 |
241 |
|
242 #[cfg(feature = "official-server")] |
|
243 pub struct IoLayer { |
|
244 next_request_id: RequestId, |
|
245 request_queue: Vec<(RequestId, ClientId)>, |
|
246 io_thread: IOThread, |
|
247 } |
|
248 |
|
249 #[cfg(feature = "official-server")] |
|
250 impl IoLayer { |
|
251 fn new() -> Self { |
|
252 Self { |
|
253 next_request_id: 0, |
|
254 request_queue: vec![], |
|
255 io_thread: IOThread::new(), |
|
256 } |
|
257 } |
|
258 |
|
259 fn send(&mut self, client_id: ClientId, task: IoTask) { |
|
260 let request_id = self.next_request_id; |
|
261 self.next_request_id += 1; |
|
262 self.request_queue.push((request_id, client_id)); |
|
263 self.io_thread.send(request_id, task); |
|
264 } |
|
265 |
|
266 fn try_recv(&mut self) -> Option<(ClientId, IoResult)> { |
|
267 let (request_id, result) = self.io_thread.try_recv()?; |
|
268 if let Some(index) = self |
|
269 .request_queue |
|
270 .iter() |
|
271 .position(|(id, _)| *id == request_id) |
|
272 { |
|
273 let (_, client_id) = self.request_queue.swap_remove(index); |
|
274 Some((client_id, result)) |
|
275 } else { |
|
276 None |
|
277 } |
|
278 } |
|
279 |
|
280 fn cancel(&mut self, client_id: ClientId) { |
|
281 let mut index = 0; |
|
282 while index < self.request_queue.len() { |
|
283 if self.request_queue[index].1 == client_id { |
|
284 self.request_queue.swap_remove(index); |
|
285 } else { |
|
286 index += 1; |
|
287 } |
|
288 } |
|
289 } |
|
290 } |
|
291 |
237 pub struct NetworkLayer { |
292 pub struct NetworkLayer { |
238 listener: TcpListener, |
293 listener: TcpListener, |
239 server: HWServer, |
294 server: HWServer, |
240 clients: Slab<NetworkClient>, |
295 clients: Slab<NetworkClient>, |
241 pending: HashSet<(ClientId, NetworkClientState)>, |
296 pending: HashSet<(ClientId, NetworkClientState)>, |
242 pending_cache: Vec<(ClientId, NetworkClientState)>, |
297 pending_cache: Vec<(ClientId, NetworkClientState)>, |
243 #[cfg(feature = "tls-connections")] |
298 #[cfg(feature = "tls-connections")] |
244 ssl: ServerSsl, |
299 ssl: ServerSsl, |
|
300 #[cfg(feature = "official-server")] |
|
301 io: IoLayer, |
245 } |
302 } |
246 |
303 |
247 impl NetworkLayer { |
304 impl NetworkLayer { |
248 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
305 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
249 let server = HWServer::new(clients_limit, rooms_limit, Box::new(FileServerIO::new())); |
306 let server = HWServer::new(clients_limit, rooms_limit, Box::new(FileServerIO::new())); |
281 } |
340 } |
282 |
341 |
283 pub fn register_server(&self, poll: &Poll) -> io::Result<()> { |
342 pub fn register_server(&self, poll: &Poll) -> io::Result<()> { |
284 poll.register( |
343 poll.register( |
285 &self.listener, |
344 &self.listener, |
286 utils::SERVER, |
345 utils::SERVER_TOKEN, |
287 Ready::readable(), |
346 Ready::readable(), |
288 PollOpt::edge(), |
347 PollOpt::edge(), |
289 ) |
348 )?; |
|
349 |
|
350 #[cfg(feature = "official-server")] |
|
351 self.io.io_thread.register_rx(poll, utils::IO_TOKEN)?; |
|
352 |
|
353 Ok(()) |
290 } |
354 } |
291 |
355 |
292 fn deregister_client(&mut self, poll: &Poll, id: ClientId) { |
356 fn deregister_client(&mut self, poll: &Poll, id: ClientId) { |
293 let mut client_exists = false; |
357 let mut client_exists = false; |
294 if let Some(ref client) = self.clients.get(id) { |
358 if let Some(ref client) = self.clients.get(id) { |
324 entry.insert(client); |
390 entry.insert(client); |
325 |
391 |
326 client_id |
392 client_id |
327 } |
393 } |
328 |
394 |
329 fn flush_server_messages(&mut self, mut response: handlers::Response, poll: &Poll) { |
395 fn handle_response(&mut self, mut response: handlers::Response, poll: &Poll) { |
330 debug!("{} pending server messages", response.len()); |
396 debug!("{} pending server messages", response.len()); |
331 let output = response.extract_messages(&mut self.server); |
397 let output = response.extract_messages(&mut self.server); |
332 for (clients, message) in output { |
398 for (clients, message) in output { |
333 debug!("Message {:?} to {:?}", message, clients); |
399 debug!("Message {:?} to {:?}", message, clients); |
334 let msg_string = message.to_raw_protocol(); |
400 let msg_string = message.to_raw_protocol(); |
342 } |
408 } |
343 |
409 |
344 for client_id in response.extract_removed_clients() { |
410 for client_id in response.extract_removed_clients() { |
345 self.deregister_client(poll, client_id); |
411 self.deregister_client(poll, client_id); |
346 } |
412 } |
|
413 |
|
414 #[cfg(feature = "official-server")] |
|
415 { |
|
416 let client_id = response.client_id(); |
|
417 for task in response.extract_io_tasks() { |
|
418 self.io.send(client_id, task); |
|
419 } |
|
420 } |
|
421 } |
|
422 |
|
423 #[cfg(feature = "official-server")] |
|
424 pub fn handle_io_result(&mut self) { |
|
425 if let Some((client_id, result)) = self.io.try_recv() { |
|
426 let mut response = handlers::Response::new(client_id); |
|
427 handlers::handle_io_result(&mut self.server, client_id, &mut response, result); |
|
428 } |
347 } |
429 } |
348 |
430 |
349 fn create_client_socket(&self, socket: TcpStream) -> io::Result<ClientSocket> { |
431 fn create_client_socket(&self, socket: TcpStream) -> io::Result<ClientSocket> { |
350 #[cfg(not(feature = "tls-connections"))] |
432 #[cfg(not(feature = "tls-connections"))] |
351 { |
433 { |
467 |
549 |
468 pub fn client_error(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> { |
550 pub fn client_error(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> { |
469 self.deregister_client(poll, client_id); |
551 self.deregister_client(poll, client_id); |
470 let mut response = handlers::Response::new(client_id); |
552 let mut response = handlers::Response::new(client_id); |
471 handlers::handle_client_loss(&mut self.server, client_id, &mut response); |
553 handlers::handle_client_loss(&mut self.server, client_id, &mut response); |
472 self.flush_server_messages(response, poll); |
554 self.handle_response(response, poll); |
473 |
555 |
474 Ok(()) |
556 Ok(()) |
475 } |
557 } |
476 |
558 |
477 pub fn has_pending_operations(&self) -> bool { |
559 pub fn has_pending_operations(&self) -> bool { |