rust/hedgewars-server/src/server/network.rs
changeset 16019 c40f5e27aaf0
parent 16018 fb389df02e3e
child 16029 d9f1b239b6d7
equal deleted inserted replaced
16018:fb389df02e3e 16019:c40f5e27aaf0
    28     messages::HwServerMessage::Redirect, messages::*, parser::server_message,
    28     messages::HwServerMessage::Redirect, messages::*, parser::server_message,
    29 };
    29 };
    30 
    30 
    31 const PING_TIMEOUT: Duration = Duration::from_secs(15);
    31 const PING_TIMEOUT: Duration = Duration::from_secs(15);
    32 
    32 
       
    33 #[derive(Debug)]
    33 enum ClientUpdateData {
    34 enum ClientUpdateData {
    34     Message(HwProtocolMessage),
    35     Message(HwProtocolMessage),
    35     Error(String),
    36     Error(String),
    36 }
    37 }
    37 
    38 
       
    39 #[derive(Debug)]
    38 struct ClientUpdate {
    40 struct ClientUpdate {
    39     client_id: ClientId,
    41     client_id: ClientId,
    40     data: ClientUpdateData,
    42     data: ClientUpdateData,
    41 }
    43 }
    42 
    44 
   185                                 break;
   187                                 break;
   186                             }
   188                             }
   187                         }
   189                         }
   188                         Err(e) => {
   190                         Err(e) => {
   189                             //todo!("send cmdline errors");
   191                             //todo!("send cmdline errors");
       
   192                             //todo!("more graceful shutdown to prevent errors from explicitly closed clients")
   190                             sender.send(Error(format!("{}", e))).await;
   193                             sender.send(Error(format!("{}", e))).await;
   191                             if matches!(e, ProtocolError::Timeout) {
   194                             if matches!(e, ProtocolError::Timeout) {
   192                                 Self::write(&mut self.stream, Bytes::from(HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol())).await;
   195                                 Self::write(&mut self.stream, Bytes::from(HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol())).await;
   193                             }
   196                             }
   194                             break;
   197                             break;
   210     listener: TcpListener,
   213     listener: TcpListener,
   211     #[cfg(feature = "tls-connections")]
   214     #[cfg(feature = "tls-connections")]
   212     tls: TlsListener,
   215     tls: TlsListener,
   213     server_state: ServerState,
   216     server_state: ServerState,
   214     clients: Slab<Sender<Bytes>>,
   217     clients: Slab<Sender<Bytes>>,
       
   218     update_tx: Sender<ClientUpdate>,
       
   219     update_rx: Receiver<ClientUpdate>
   215 }
   220 }
   216 
   221 
   217 impl NetworkLayer {
   222 impl NetworkLayer {
   218     pub async fn run(&mut self) {
   223     pub async fn run(&mut self) {
   219         let (update_tx, mut update_rx) = channel(128);
       
   220 
       
   221         async fn accept_plain_branch(
   224         async fn accept_plain_branch(
   222             layer: &mut NetworkLayer,
   225             layer: &mut NetworkLayer,
   223             value: (TcpStream, SocketAddr),
   226             value: (TcpStream, SocketAddr),
   224             update_tx: Sender<ClientUpdate>,
   227             update_tx: Sender<ClientUpdate>,
   225         ) {
   228         ) {
   278         //todo!("add the DB task");
   281         //todo!("add the DB task");
   279         //todo!("add certfile watcher task");
   282         //todo!("add certfile watcher task");
   280         loop {
   283         loop {
   281             #[cfg(not(feature = "tls-connections"))]
   284             #[cfg(not(feature = "tls-connections"))]
   282             tokio::select! {
   285             tokio::select! {
   283                 Ok(value) = self.listener.accept() => accept_plain_branch(self, value, update_tx.clone()).await,
   286                 Ok(value) = self.listener.accept() => accept_plain_branch(self, value, self.update_tx.clone()).await,
   284                 client_message = update_rx.recv(), if !self.clients.is_empty() => client_message_branch(self, client_message).await
   287                 client_message = self.update_rx.recv(), if !self.clients.is_empty() => client_message_branch(self, client_message).await
   285             }
   288             }
   286 
   289 
   287             #[cfg(feature = "tls-connections")]
   290             #[cfg(feature = "tls-connections")]
   288             tokio::select! {
   291             tokio::select! {
   289                 Ok(value) = self.listener.accept() => accept_plain_branch(self, value, update_tx.clone()).await,
   292                 Ok(value) = self.listener.accept() => accept_plain_branch(self, value, self.update_tx.clone()).await,
   290                 Ok(value) = self.tls.listener.accept() => accept_tls_branch(self, value, update_tx.clone()).await,
   293                 Ok(value) = self.tls.listener.accept() => accept_tls_branch(self, value, self.update_tx.clone()).await,
   291                 client_message = update_rx.recv(), if !self.clients.is_empty() => client_message_branch(self, client_message).await
   294                 client_message = self.update_rx.recv(), if !self.clients.is_empty() => client_message_branch(self, client_message).await
   292             }
   295             }
   293         }
   296         }
   294     }
   297     }
   295 
   298 
   296     async fn create_client(
   299     async fn create_client(
   340     async fn handle_response(&mut self, mut response: handlers::Response) {
   343     async fn handle_response(&mut self, mut response: handlers::Response) {
   341         if response.is_empty() {
   344         if response.is_empty() {
   342             return;
   345             return;
   343         }
   346         }
   344 
   347 
       
   348         for client_id in response.extract_removed_clients() {
       
   349             if self.clients.contains(client_id) {
       
   350                 self.clients.remove(client_id);
       
   351                 if self.clients.is_empty() {
       
   352                     let (update_tx, update_rx) = channel(128);
       
   353                     self.update_rx = update_rx;
       
   354                     self.update_tx = update_tx;
       
   355                 }
       
   356             }
       
   357             info!("Client {} removed", client_id);
       
   358         }
       
   359 
   345         debug!("{} pending server messages", response.len());
   360         debug!("{} pending server messages", response.len());
   346         let output = response.extract_messages(&mut self.server_state.server);
   361         let output = response.extract_messages(&mut self.server_state.server);
   347         for (clients, message) in output {
   362         for (clients, message) in output {
   348             debug!("Message {:?} to {:?}", message, clients);
   363             debug!("Message {:?} to {:?}", message, clients);
   349             Self::send_message(&mut self.clients, message, clients.iter().cloned()).await;
   364             Self::send_message(&mut self.clients, message, clients.iter().cloned()).await;
   350         }
       
   351 
       
   352         for client_id in response.extract_removed_clients() {
       
   353             if self.clients.contains(client_id) {
       
   354                 self.clients.remove(client_id);
       
   355             }
       
   356             info!("Client {} removed", client_id);
       
   357         }
   365         }
   358     }
   366     }
   359 
   367 
   360     async fn send_message<I>(
   368     async fn send_message<I>(
   361         clients: &mut Slab<Sender<Bytes>>,
   369         clients: &mut Slab<Sender<Bytes>>,
   426 
   434 
   427     pub fn build(self) -> NetworkLayer {
   435     pub fn build(self) -> NetworkLayer {
   428         let server_state = ServerState::new(self.clients_capacity, self.rooms_capacity);
   436         let server_state = ServerState::new(self.clients_capacity, self.rooms_capacity);
   429 
   437 
   430         let clients = Slab::with_capacity(self.clients_capacity);
   438         let clients = Slab::with_capacity(self.clients_capacity);
       
   439         let (update_tx, update_rx) = channel(128);
   431 
   440 
   432         NetworkLayer {
   441         NetworkLayer {
   433             listener: self.listener.expect("No listener provided"),
   442             listener: self.listener.expect("No listener provided"),
   434             #[cfg(feature = "tls-connections")]
   443             #[cfg(feature = "tls-connections")]
   435             tls: TlsListener {
   444             tls: TlsListener {
   436                 listener: self.tls_listener.expect("No TLS listener provided"),
   445                 listener: self.tls_listener.expect("No TLS listener provided"),
   437                 acceptor: self.tls_acceptor.expect("No TLS acceptor provided"),
   446                 acceptor: self.tls_acceptor.expect("No TLS acceptor provided"),
   438             },
   447             },
   439             server_state,
   448             server_state,
   440             clients,
   449             clients,
   441         }
   450             update_tx,
   442     }
   451             update_rx
   443 }
   452         }
       
   453     }
       
   454 }