4 collections::HashSet, |
4 collections::HashSet, |
5 io, |
5 io, |
6 io::{Error, ErrorKind, Read, Write}, |
6 io::{Error, ErrorKind, Read, Write}, |
7 mem::{replace, swap}, |
7 mem::{replace, swap}, |
8 net::{IpAddr, Ipv4Addr, SocketAddr}, |
8 net::{IpAddr, Ipv4Addr, SocketAddr}, |
|
9 num::NonZeroU32, |
|
10 time::Duration, |
|
11 time::Instant, |
9 }; |
12 }; |
10 |
13 |
11 use log::*; |
14 use log::*; |
12 use mio::{ |
15 use mio::{ |
|
16 event::Source, |
13 net::{TcpListener, TcpStream}, |
17 net::{TcpListener, TcpStream}, |
14 Evented, Poll, PollOpt, Ready, Token, |
18 Interest, Poll, Token, Waker, |
15 }; |
19 }; |
16 use mio_extras::timer; |
|
17 use netbuf; |
20 use netbuf; |
18 use slab::Slab; |
21 use slab::Slab; |
19 |
22 |
20 use crate::{ |
23 use crate::{ |
21 core::types::ClientId, |
24 core::{ |
|
25 events::{TimedEvents, Timeout}, |
|
26 types::ClientId, |
|
27 }, |
22 handlers, |
28 handlers, |
23 handlers::{IoResult, IoTask, ServerState}, |
29 handlers::{IoResult, IoTask, ServerState}, |
24 protocol::{messages::HwServerMessage::Redirect, messages::*, ProtocolDecoder}, |
30 protocol::{messages::HwServerMessage::Redirect, messages::*, ProtocolDecoder}, |
25 utils, |
31 utils, |
26 }; |
32 }; |
34 ssl::{ |
40 ssl::{ |
35 HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype, |
41 HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype, |
36 SslMethod, SslOptions, SslStream, SslStreamBuilder, SslVerifyMode, |
42 SslMethod, SslOptions, SslStream, SslStreamBuilder, SslVerifyMode, |
37 }, |
43 }, |
38 }; |
44 }; |
39 use std::time::Duration; |
|
40 |
45 |
41 const MAX_BYTES_PER_READ: usize = 2048; |
46 const MAX_BYTES_PER_READ: usize = 2048; |
42 const SEND_PING_TIMEOUT: Duration = Duration::from_secs(30); |
47 const SEND_PING_TIMEOUT: Duration = Duration::from_secs(5); |
43 const DROP_CLIENT_TIMEOUT: Duration = Duration::from_secs(30); |
48 const DROP_CLIENT_TIMEOUT: Duration = Duration::from_secs(5); |
|
49 const MAX_TIMEOUT: usize = DROP_CLIENT_TIMEOUT.as_secs() as usize; |
44 const PING_PROBES_COUNT: u8 = 2; |
50 const PING_PROBES_COUNT: u8 = 2; |
45 |
51 |
46 #[derive(Hash, Eq, PartialEq, Copy, Clone)] |
52 #[derive(Hash, Eq, PartialEq, Copy, Clone)] |
47 pub enum NetworkClientState { |
53 pub enum NetworkClientState { |
48 Idle, |
54 Idle, |
62 #[cfg(feature = "tls-connections")] |
68 #[cfg(feature = "tls-connections")] |
63 SslStream(SslStream<TcpStream>), |
69 SslStream(SslStream<TcpStream>), |
64 } |
70 } |
65 |
71 |
66 impl ClientSocket { |
72 impl ClientSocket { |
67 fn inner(&self) -> &TcpStream { |
73 fn inner_mut(&mut self) -> &mut TcpStream { |
68 match self { |
74 match self { |
69 ClientSocket::Plain(stream) => stream, |
75 ClientSocket::Plain(stream) => stream, |
70 #[cfg(feature = "tls-connections")] |
76 #[cfg(feature = "tls-connections")] |
71 ClientSocket::SslHandshake(Some(builder)) => builder.get_ref(), |
77 ClientSocket::SslHandshake(Some(builder)) => builder.get_mut(), |
72 #[cfg(feature = "tls-connections")] |
78 #[cfg(feature = "tls-connections")] |
73 ClientSocket::SslHandshake(None) => unreachable!(), |
79 ClientSocket::SslHandshake(None) => unreachable!(), |
74 #[cfg(feature = "tls-connections")] |
80 #[cfg(feature = "tls-connections")] |
75 ClientSocket::SslStream(ssl_stream) => ssl_stream.get_ref(), |
81 ClientSocket::SslStream(ssl_stream) => ssl_stream.get_mut(), |
76 } |
82 } |
77 } |
83 } |
78 } |
84 } |
79 |
85 |
80 pub struct NetworkClient { |
86 pub struct NetworkClient { |
81 id: ClientId, |
87 id: ClientId, |
82 socket: ClientSocket, |
88 socket: ClientSocket, |
83 peer_addr: SocketAddr, |
89 peer_addr: SocketAddr, |
84 decoder: ProtocolDecoder, |
90 decoder: ProtocolDecoder, |
85 buf_out: netbuf::Buf, |
91 buf_out: netbuf::Buf, |
86 timeout: timer::Timeout, |
|
87 pending_close: bool, |
92 pending_close: bool, |
|
93 timeout: Timeout, |
|
94 last_rx_time: Instant, |
88 } |
95 } |
89 |
96 |
90 impl NetworkClient { |
97 impl NetworkClient { |
91 pub fn new( |
98 pub fn new( |
92 id: ClientId, |
99 id: ClientId, |
93 socket: ClientSocket, |
100 socket: ClientSocket, |
94 peer_addr: SocketAddr, |
101 peer_addr: SocketAddr, |
95 timeout: timer::Timeout, |
102 timeout: Timeout, |
96 ) -> NetworkClient { |
103 ) -> NetworkClient { |
97 NetworkClient { |
104 NetworkClient { |
98 id, |
105 id, |
99 socket, |
106 socket, |
100 peer_addr, |
107 peer_addr, |
101 decoder: ProtocolDecoder::new(), |
108 decoder: ProtocolDecoder::new(), |
102 buf_out: netbuf::Buf::new(), |
109 buf_out: netbuf::Buf::new(), |
|
110 pending_close: false, |
103 timeout, |
111 timeout, |
104 pending_close: false, |
112 last_rx_time: Instant::now(), |
105 } |
113 } |
106 } |
114 } |
107 |
115 |
108 #[cfg(feature = "tls-connections")] |
116 #[cfg(feature = "tls-connections")] |
109 fn handshake_impl( |
117 fn handshake_impl( |
323 pending_cache: Vec<(ClientId, NetworkClientState)>, |
338 pending_cache: Vec<(ClientId, NetworkClientState)>, |
324 #[cfg(feature = "tls-connections")] |
339 #[cfg(feature = "tls-connections")] |
325 ssl: ServerSsl, |
340 ssl: ServerSsl, |
326 #[cfg(feature = "official-server")] |
341 #[cfg(feature = "official-server")] |
327 io: IoLayer, |
342 io: IoLayer, |
328 timer: timer::Timer<TimerData>, |
343 timeout_events: NetworkTimeoutEvents, |
329 } |
344 } |
330 |
345 |
331 fn register_read<E: Evented>(poll: &Poll, evented: &E, token: mio::Token) -> io::Result<()> { |
346 fn register_read<S: Source>(poll: &Poll, source: &mut S, token: mio::Token) -> io::Result<()> { |
332 poll.register(evented, token, Ready::readable(), PollOpt::edge()) |
347 poll.registry().register(source, token, Interest::READABLE) |
333 } |
348 } |
334 |
349 |
335 fn create_ping_timeout( |
350 fn create_ping_timeout( |
336 timer: &mut timer::Timer<TimerData>, |
351 timeout_events: &mut NetworkTimeoutEvents, |
337 probes_count: u8, |
352 probes_count: u8, |
338 client_id: ClientId, |
353 client_id: ClientId, |
339 ) -> timer::Timeout { |
354 ) -> Timeout { |
340 timer.set_timeout( |
355 timeout_events.set_timeout( |
341 SEND_PING_TIMEOUT, |
356 NonZeroU32::new(SEND_PING_TIMEOUT.as_secs() as u32).unwrap(), |
342 TimerData(TimeoutEvent::SendPing { probes_count }, client_id), |
357 TimerData(TimeoutEvent::SendPing { probes_count }, client_id), |
343 ) |
358 ) |
344 } |
359 } |
345 |
360 |
346 fn create_drop_timeout(timer: &mut timer::Timer<TimerData>, client_id: ClientId) -> timer::Timeout { |
361 fn create_drop_timeout(timeout_events: &mut NetworkTimeoutEvents, client_id: ClientId) -> Timeout { |
347 timer.set_timeout( |
362 timeout_events.set_timeout( |
348 DROP_CLIENT_TIMEOUT, |
363 NonZeroU32::new(DROP_CLIENT_TIMEOUT.as_secs() as u32).unwrap(), |
349 TimerData(TimeoutEvent::DropClient, client_id), |
364 TimerData(TimeoutEvent::DropClient, client_id), |
350 ) |
365 ) |
351 } |
366 } |
352 |
367 |
353 impl NetworkLayer { |
368 impl NetworkLayer { |
354 pub fn register(&self, poll: &Poll) -> io::Result<()> { |
369 pub fn register(&mut self, poll: &Poll) -> io::Result<()> { |
355 register_read(poll, &self.listener, utils::SERVER_TOKEN)?; |
370 register_read(poll, &mut self.listener, utils::SERVER_TOKEN)?; |
356 #[cfg(feature = "tls-connections")] |
371 #[cfg(feature = "tls-connections")] |
357 register_read(poll, &self.ssl.listener, utils::SECURE_SERVER_TOKEN)?; |
372 register_read(poll, &mut self.ssl.listener, utils::SECURE_SERVER_TOKEN)?; |
358 register_read(poll, &self.timer, utils::TIMER_TOKEN)?; |
|
359 |
|
360 #[cfg(feature = "official-server")] |
|
361 self.io.io_thread.register_rx(poll, utils::IO_TOKEN)?; |
|
362 |
373 |
363 Ok(()) |
374 Ok(()) |
364 } |
375 } |
365 |
376 |
366 fn deregister_client(&mut self, poll: &Poll, id: ClientId, is_error: bool) { |
377 fn deregister_client(&mut self, poll: &Poll, id: ClientId, is_error: bool) { |
367 if let Some(ref mut client) = self.clients.get_mut(id) { |
378 if let Some(ref mut client) = self.clients.get_mut(id) { |
368 poll.deregister(client.socket.inner()) |
379 poll.registry() |
|
380 .deregister(client.socket.inner_mut()) |
369 .expect("could not deregister socket"); |
381 .expect("could not deregister socket"); |
370 if client.has_pending_sends() && !is_error { |
382 if client.has_pending_sends() && !is_error { |
371 info!( |
383 info!( |
372 "client {} ({}) pending removal", |
384 "client {} ({}) pending removal", |
373 client.id, client.peer_addr |
385 client.id, client.peer_addr |
374 ); |
386 ); |
375 client.pending_close = true; |
387 client.pending_close = true; |
376 poll.register( |
388 poll.registry() |
377 client.socket.inner(), |
389 .register(client.socket.inner_mut(), Token(id), Interest::WRITABLE) |
378 Token(id), |
390 .unwrap_or_else(|_| { |
379 Ready::writable(), |
391 self.clients.remove(id); |
380 PollOpt::edge(), |
392 }); |
381 ) |
|
382 .unwrap_or_else(|_| { |
|
383 self.clients.remove(id); |
|
384 }); |
|
385 } else { |
393 } else { |
386 info!("client {} ({}) removed", client.id, client.peer_addr); |
394 info!("client {} ({}) removed", client.id, client.peer_addr); |
387 self.clients.remove(id); |
395 self.clients.remove(id); |
388 } |
396 } |
389 #[cfg(feature = "official-server")] |
397 #[cfg(feature = "official-server")] |
392 } |
400 } |
393 |
401 |
394 fn register_client( |
402 fn register_client( |
395 &mut self, |
403 &mut self, |
396 poll: &Poll, |
404 poll: &Poll, |
397 client_socket: ClientSocket, |
405 mut client_socket: ClientSocket, |
398 addr: SocketAddr, |
406 addr: SocketAddr, |
399 ) -> io::Result<ClientId> { |
407 ) -> io::Result<ClientId> { |
400 let entry = self.clients.vacant_entry(); |
408 let entry = self.clients.vacant_entry(); |
401 let client_id = entry.key(); |
409 let client_id = entry.key(); |
402 |
410 |
403 poll.register( |
411 poll.registry().register( |
404 client_socket.inner(), |
412 client_socket.inner_mut(), |
405 Token(client_id), |
413 Token(client_id), |
406 Ready::readable() | Ready::writable(), |
414 Interest::READABLE | Interest::WRITABLE, |
407 PollOpt::edge(), |
|
408 )?; |
415 )?; |
409 |
416 |
410 let client = NetworkClient::new( |
417 let client = NetworkClient::new( |
411 client_id, |
418 client_id, |
412 client_socket, |
419 client_socket, |
413 addr, |
420 addr, |
414 create_ping_timeout(&mut self.timer, PING_PROBES_COUNT - 1, client_id), |
421 create_ping_timeout(&mut self.timeout_events, PING_PROBES_COUNT - 1, client_id), |
415 ); |
422 ); |
416 info!("client {} ({}) added", client.id, client.peer_addr); |
423 info!("client {} ({}) added", client.id, client.peer_addr); |
417 entry.insert(client); |
424 entry.insert(client); |
418 |
425 |
419 Ok(client_id) |
426 Ok(client_id) |
449 self.io.send(client_id, task); |
456 self.io.send(client_id, task); |
450 } |
457 } |
451 } |
458 } |
452 } |
459 } |
453 |
460 |
454 pub fn handle_timeout(&mut self, poll: &Poll) -> io::Result<()> { |
461 pub fn handle_timeout(&mut self, poll: &mut Poll) -> io::Result<()> { |
455 while let Some(TimerData(event, client_id)) = self.timer.poll() { |
462 for TimerData(event, client_id) in self.timeout_events.poll(Instant::now()) { |
456 match event { |
463 if let Some(client) = self.clients.get_mut(client_id) { |
457 TimeoutEvent::SendPing { probes_count } => { |
464 if client.last_rx_time.elapsed() > SEND_PING_TIMEOUT { |
458 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
465 match event { |
459 client.send_string(&HwServerMessage::Ping.to_raw_protocol()); |
466 TimeoutEvent::SendPing { probes_count } => { |
460 client.write()?; |
467 client.send_string(&HwServerMessage::Ping.to_raw_protocol()); |
461 let timeout = if probes_count != 0 { |
468 client.write()?; |
462 create_ping_timeout(&mut self.timer, probes_count - 1, client_id) |
469 let timeout = if probes_count != 0 { |
463 } else { |
470 create_ping_timeout( |
464 create_drop_timeout(&mut self.timer, client_id) |
471 &mut self.timeout_events, |
465 }; |
472 probes_count - 1, |
466 client.replace_timeout(timeout); |
473 client_id, |
|
474 ) |
|
475 } else { |
|
476 create_drop_timeout(&mut self.timeout_events, client_id) |
|
477 }; |
|
478 client.replace_timeout(timeout); |
|
479 } |
|
480 TimeoutEvent::DropClient => { |
|
481 client.send_string( |
|
482 &HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol(), |
|
483 ); |
|
484 let _res = client.write(); |
|
485 |
|
486 self.operation_failed( |
|
487 poll, |
|
488 client_id, |
|
489 &ErrorKind::TimedOut.into(), |
|
490 "No ping response", |
|
491 )?; |
|
492 } |
467 } |
493 } |
468 } |
494 } else { |
469 TimeoutEvent::DropClient => { |
495 client.replace_timeout(create_ping_timeout( |
470 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
496 &mut self.timeout_events, |
471 client.send_string( |
497 PING_PROBES_COUNT - 1, |
472 &HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol(), |
|
473 ); |
|
474 let _res = client.write(); |
|
475 } |
|
476 self.operation_failed( |
|
477 poll, |
|
478 client_id, |
498 client_id, |
479 &ErrorKind::TimedOut.into(), |
499 )); |
480 "No ping response", |
|
481 )?; |
|
482 } |
500 } |
483 } |
501 } |
484 } |
502 } |
485 Ok(()) |
503 Ok(()) |
486 } |
504 } |
729 listener, |
741 listener, |
730 context: builder.build(), |
742 context: builder.build(), |
731 } |
743 } |
732 } |
744 } |
733 |
745 |
734 pub fn build(self) -> NetworkLayer { |
746 pub fn build(self, poll: &Poll) -> NetworkLayer { |
735 let server_state = ServerState::new(self.clients_capacity, self.rooms_capacity); |
747 let server_state = ServerState::new(self.clients_capacity, self.rooms_capacity); |
736 |
748 |
737 let clients = Slab::with_capacity(self.clients_capacity); |
749 let clients = Slab::with_capacity(self.clients_capacity); |
738 let pending = HashSet::with_capacity(2 * self.clients_capacity); |
750 let pending = HashSet::with_capacity(2 * self.clients_capacity); |
739 let pending_cache = Vec::with_capacity(2 * self.clients_capacity); |
751 let pending_cache = Vec::with_capacity(2 * self.clients_capacity); |
740 let timer = timer::Builder::default().build(); |
752 let timeout_events = NetworkTimeoutEvents::new(); |
|
753 |
|
754 #[cfg(feature = "official-server")] |
|
755 let waker = Waker::new(poll.registry(), utils::IO_TOKEN) |
|
756 .expect("Unable to create a waker for the IO thread"); |
741 |
757 |
742 NetworkLayer { |
758 NetworkLayer { |
743 listener: self.listener.expect("No listener provided"), |
759 listener: self.listener.expect("No listener provided"), |
744 server_state, |
760 server_state, |
745 clients, |
761 clients, |