rust/hedgewars-server/src/server/network.rs
changeset 14800 f43ab2bd76ae
parent 14718 f64e21f164a5
child 14801 65861ba8b4e8
equal deleted inserted replaced
14799:bbec6b28d072 14800:f43ab2bd76ae
    14     Poll, PollOpt, Ready, Token,
    14     Poll, PollOpt, Ready, Token,
    15 };
    15 };
    16 use netbuf;
    16 use netbuf;
    17 use slab::Slab;
    17 use slab::Slab;
    18 
    18 
    19 use super::{core::HWServer, coretypes::ClientId, handlers, io::FileServerIO};
    19 use super::{core::FileServerIO, core::HWServer, coretypes::ClientId, handlers};
    20 use crate::{
    20 use crate::{
    21     protocol::{messages::*, ProtocolDecoder},
    21     protocol::{messages::*, ProtocolDecoder},
    22     utils,
    22     utils,
    23 };
    23 };
       
    24 
       
    25 #[cfg(feature = "official-server")]
       
    26 use super::io::{IOThread, RequestId};
       
    27 
       
    28 use crate::server::handlers::{IoResult, IoTask};
    24 #[cfg(feature = "tls-connections")]
    29 #[cfg(feature = "tls-connections")]
    25 use openssl::{
    30 use openssl::{
    26     error::ErrorStack,
    31     error::ErrorStack,
    27     ssl::{
    32     ssl::{
    28         HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype,
    33         HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype,
   232 #[cfg(feature = "tls-connections")]
   237 #[cfg(feature = "tls-connections")]
   233 struct ServerSsl {
   238 struct ServerSsl {
   234     context: SslContext,
   239     context: SslContext,
   235 }
   240 }
   236 
   241 
       
   242 #[cfg(feature = "official-server")]
       
   243 pub struct IoLayer {
       
   244     next_request_id: RequestId,
       
   245     request_queue: Vec<(RequestId, ClientId)>,
       
   246     io_thread: IOThread,
       
   247 }
       
   248 
       
   249 #[cfg(feature = "official-server")]
       
   250 impl IoLayer {
       
   251     fn new() -> Self {
       
   252         Self {
       
   253             next_request_id: 0,
       
   254             request_queue: vec![],
       
   255             io_thread: IOThread::new(),
       
   256         }
       
   257     }
       
   258 
       
   259     fn send(&mut self, client_id: ClientId, task: IoTask) {
       
   260         let request_id = self.next_request_id;
       
   261         self.next_request_id += 1;
       
   262         self.request_queue.push((request_id, client_id));
       
   263         self.io_thread.send(request_id, task);
       
   264     }
       
   265 
       
   266     fn try_recv(&mut self) -> Option<(ClientId, IoResult)> {
       
   267         let (request_id, result) = self.io_thread.try_recv()?;
       
   268         if let Some(index) = self
       
   269             .request_queue
       
   270             .iter()
       
   271             .position(|(id, _)| *id == request_id)
       
   272         {
       
   273             let (_, client_id) = self.request_queue.swap_remove(index);
       
   274             Some((client_id, result))
       
   275         } else {
       
   276             None
       
   277         }
       
   278     }
       
   279 
       
   280     fn cancel(&mut self, client_id: ClientId) {
       
   281         let mut index = 0;
       
   282         while index < self.request_queue.len() {
       
   283             if self.request_queue[index].1 == client_id {
       
   284                 self.request_queue.swap_remove(index);
       
   285             } else {
       
   286                 index += 1;
       
   287             }
       
   288         }
       
   289     }
       
   290 }
       
   291 
   237 pub struct NetworkLayer {
   292 pub struct NetworkLayer {
   238     listener: TcpListener,
   293     listener: TcpListener,
   239     server: HWServer,
   294     server: HWServer,
   240     clients: Slab<NetworkClient>,
   295     clients: Slab<NetworkClient>,
   241     pending: HashSet<(ClientId, NetworkClientState)>,
   296     pending: HashSet<(ClientId, NetworkClientState)>,
   242     pending_cache: Vec<(ClientId, NetworkClientState)>,
   297     pending_cache: Vec<(ClientId, NetworkClientState)>,
   243     #[cfg(feature = "tls-connections")]
   298     #[cfg(feature = "tls-connections")]
   244     ssl: ServerSsl,
   299     ssl: ServerSsl,
       
   300     #[cfg(feature = "official-server")]
       
   301     io: IoLayer,
   245 }
   302 }
   246 
   303 
   247 impl NetworkLayer {
   304 impl NetworkLayer {
   248     pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer {
   305     pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer {
   249         let server = HWServer::new(clients_limit, rooms_limit, Box::new(FileServerIO::new()));
   306         let server = HWServer::new(clients_limit, rooms_limit, Box::new(FileServerIO::new()));
   257             clients,
   314             clients,
   258             pending,
   315             pending,
   259             pending_cache,
   316             pending_cache,
   260             #[cfg(feature = "tls-connections")]
   317             #[cfg(feature = "tls-connections")]
   261             ssl: NetworkLayer::create_ssl_context(),
   318             ssl: NetworkLayer::create_ssl_context(),
       
   319             #[cfg(feature = "official-server")]
       
   320             io: IoLayer::new(),
   262         }
   321         }
   263     }
   322     }
   264 
   323 
   265     #[cfg(feature = "tls-connections")]
   324     #[cfg(feature = "tls-connections")]
   266     fn create_ssl_context() -> ServerSsl {
   325     fn create_ssl_context() -> ServerSsl {
   281     }
   340     }
   282 
   341 
   283     pub fn register_server(&self, poll: &Poll) -> io::Result<()> {
   342     pub fn register_server(&self, poll: &Poll) -> io::Result<()> {
   284         poll.register(
   343         poll.register(
   285             &self.listener,
   344             &self.listener,
   286             utils::SERVER,
   345             utils::SERVER_TOKEN,
   287             Ready::readable(),
   346             Ready::readable(),
   288             PollOpt::edge(),
   347             PollOpt::edge(),
   289         )
   348         )?;
       
   349 
       
   350         #[cfg(feature = "official-server")]
       
   351         self.io.io_thread.register_rx(poll, utils::IO_TOKEN)?;
       
   352 
       
   353         Ok(())
   290     }
   354     }
   291 
   355 
   292     fn deregister_client(&mut self, poll: &Poll, id: ClientId) {
   356     fn deregister_client(&mut self, poll: &Poll, id: ClientId) {
   293         let mut client_exists = false;
   357         let mut client_exists = false;
   294         if let Some(ref client) = self.clients.get(id) {
   358         if let Some(ref client) = self.clients.get(id) {
   297             info!("client {} ({}) removed", client.id, client.peer_addr);
   361             info!("client {} ({}) removed", client.id, client.peer_addr);
   298             client_exists = true;
   362             client_exists = true;
   299         }
   363         }
   300         if client_exists {
   364         if client_exists {
   301             self.clients.remove(id);
   365             self.clients.remove(id);
       
   366             #[cfg(feature = "official-server")]
       
   367             self.io.cancel(id);
   302         }
   368         }
   303     }
   369     }
   304 
   370 
   305     fn register_client(
   371     fn register_client(
   306         &mut self,
   372         &mut self,
   324         entry.insert(client);
   390         entry.insert(client);
   325 
   391 
   326         client_id
   392         client_id
   327     }
   393     }
   328 
   394 
   329     fn flush_server_messages(&mut self, mut response: handlers::Response, poll: &Poll) {
   395     fn handle_response(&mut self, mut response: handlers::Response, poll: &Poll) {
   330         debug!("{} pending server messages", response.len());
   396         debug!("{} pending server messages", response.len());
   331         let output = response.extract_messages(&mut self.server);
   397         let output = response.extract_messages(&mut self.server);
   332         for (clients, message) in output {
   398         for (clients, message) in output {
   333             debug!("Message {:?} to {:?}", message, clients);
   399             debug!("Message {:?} to {:?}", message, clients);
   334             let msg_string = message.to_raw_protocol();
   400             let msg_string = message.to_raw_protocol();
   342         }
   408         }
   343 
   409 
   344         for client_id in response.extract_removed_clients() {
   410         for client_id in response.extract_removed_clients() {
   345             self.deregister_client(poll, client_id);
   411             self.deregister_client(poll, client_id);
   346         }
   412         }
       
   413 
       
   414         #[cfg(feature = "official-server")]
       
   415         {
       
   416             let client_id = response.client_id();
       
   417             for task in response.extract_io_tasks() {
       
   418                 self.io.send(client_id, task);
       
   419             }
       
   420         }
       
   421     }
       
   422 
       
   423     #[cfg(feature = "official-server")]
       
   424     pub fn handle_io_result(&mut self) {
       
   425         if let Some((client_id, result)) = self.io.try_recv() {
       
   426             let mut response = handlers::Response::new(client_id);
       
   427             handlers::handle_io_result(&mut self.server, client_id, &mut response, result);
       
   428         }
   347     }
   429     }
   348 
   430 
   349     fn create_client_socket(&self, socket: TcpStream) -> io::Result<ClientSocket> {
   431     fn create_client_socket(&self, socket: TcpStream) -> io::Result<ClientSocket> {
   350         #[cfg(not(feature = "tls-connections"))]
   432         #[cfg(not(feature = "tls-connections"))]
   351         {
   433         {
   379         let mut response = handlers::Response::new(client_id);
   461         let mut response = handlers::Response::new(client_id);
   380 
   462 
   381         handlers::handle_client_accept(&mut self.server, client_id, &mut response);
   463         handlers::handle_client_accept(&mut self.server, client_id, &mut response);
   382 
   464 
   383         if !response.is_empty() {
   465         if !response.is_empty() {
   384             self.flush_server_messages(response, poll);
   466             self.handle_response(response, poll);
   385         }
   467         }
   386 
   468 
   387         Ok(())
   469         Ok(())
   388     }
   470     }
   389 
   471 
   436                 "Error while reading from client socket",
   518                 "Error while reading from client socket",
   437             )?,
   519             )?,
   438         }
   520         }
   439 
   521 
   440         if !response.is_empty() {
   522         if !response.is_empty() {
   441             self.flush_server_messages(response, poll);
   523             self.handle_response(response, poll);
   442         }
   524         }
   443 
   525 
   444         Ok(())
   526         Ok(())
   445     }
   527     }
   446 
   528 
   467 
   549 
   468     pub fn client_error(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> {
   550     pub fn client_error(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> {
   469         self.deregister_client(poll, client_id);
   551         self.deregister_client(poll, client_id);
   470         let mut response = handlers::Response::new(client_id);
   552         let mut response = handlers::Response::new(client_id);
   471         handlers::handle_client_loss(&mut self.server, client_id, &mut response);
   553         handlers::handle_client_loss(&mut self.server, client_id, &mut response);
   472         self.flush_server_messages(response, poll);
   554         self.handle_response(response, poll);
   473 
   555 
   474         Ok(())
   556         Ok(())
   475     }
   557     }
   476 
   558 
   477     pub fn has_pending_operations(&self) -> bool {
   559     pub fn has_pending_operations(&self) -> bool {