28 messages::HwServerMessage::Redirect, messages::*, parser::server_message, |
28 messages::HwServerMessage::Redirect, messages::*, parser::server_message, |
29 }; |
29 }; |
30 |
30 |
31 const PING_TIMEOUT: Duration = Duration::from_secs(15); |
31 const PING_TIMEOUT: Duration = Duration::from_secs(15); |
32 |
32 |
|
33 #[derive(Debug)] |
33 enum ClientUpdateData { |
34 enum ClientUpdateData { |
34 Message(HwProtocolMessage), |
35 Message(HwProtocolMessage), |
35 Error(String), |
36 Error(String), |
36 } |
37 } |
37 |
38 |
|
39 #[derive(Debug)] |
38 struct ClientUpdate { |
40 struct ClientUpdate { |
39 client_id: ClientId, |
41 client_id: ClientId, |
40 data: ClientUpdateData, |
42 data: ClientUpdateData, |
41 } |
43 } |
42 |
44 |
185 break; |
187 break; |
186 } |
188 } |
187 } |
189 } |
188 Err(e) => { |
190 Err(e) => { |
189 //todo!("send cmdline errors"); |
191 //todo!("send cmdline errors"); |
|
192 //todo!("more graceful shutdown to prevent errors from explicitly closed clients") |
190 sender.send(Error(format!("{}", e))).await; |
193 sender.send(Error(format!("{}", e))).await; |
191 if matches!(e, ProtocolError::Timeout) { |
194 if matches!(e, ProtocolError::Timeout) { |
192 Self::write(&mut self.stream, Bytes::from(HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol())).await; |
195 Self::write(&mut self.stream, Bytes::from(HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol())).await; |
193 } |
196 } |
194 break; |
197 break; |
210 listener: TcpListener, |
213 listener: TcpListener, |
211 #[cfg(feature = "tls-connections")] |
214 #[cfg(feature = "tls-connections")] |
212 tls: TlsListener, |
215 tls: TlsListener, |
213 server_state: ServerState, |
216 server_state: ServerState, |
214 clients: Slab<Sender<Bytes>>, |
217 clients: Slab<Sender<Bytes>>, |
|
218 update_tx: Sender<ClientUpdate>, |
|
219 update_rx: Receiver<ClientUpdate> |
215 } |
220 } |
216 |
221 |
217 impl NetworkLayer { |
222 impl NetworkLayer { |
218 pub async fn run(&mut self) { |
223 pub async fn run(&mut self) { |
219 let (update_tx, mut update_rx) = channel(128); |
|
220 |
|
221 async fn accept_plain_branch( |
224 async fn accept_plain_branch( |
222 layer: &mut NetworkLayer, |
225 layer: &mut NetworkLayer, |
223 value: (TcpStream, SocketAddr), |
226 value: (TcpStream, SocketAddr), |
224 update_tx: Sender<ClientUpdate>, |
227 update_tx: Sender<ClientUpdate>, |
225 ) { |
228 ) { |
278 //todo!("add the DB task"); |
281 //todo!("add the DB task"); |
279 //todo!("add certfile watcher task"); |
282 //todo!("add certfile watcher task"); |
280 loop { |
283 loop { |
281 #[cfg(not(feature = "tls-connections"))] |
284 #[cfg(not(feature = "tls-connections"))] |
282 tokio::select! { |
285 tokio::select! { |
283 Ok(value) = self.listener.accept() => accept_plain_branch(self, value, update_tx.clone()).await, |
286 Ok(value) = self.listener.accept() => accept_plain_branch(self, value, self.update_tx.clone()).await, |
284 client_message = update_rx.recv(), if !self.clients.is_empty() => client_message_branch(self, client_message).await |
287 client_message = self.update_rx.recv(), if !self.clients.is_empty() => client_message_branch(self, client_message).await |
285 } |
288 } |
286 |
289 |
287 #[cfg(feature = "tls-connections")] |
290 #[cfg(feature = "tls-connections")] |
288 tokio::select! { |
291 tokio::select! { |
289 Ok(value) = self.listener.accept() => accept_plain_branch(self, value, update_tx.clone()).await, |
292 Ok(value) = self.listener.accept() => accept_plain_branch(self, value, self.update_tx.clone()).await, |
290 Ok(value) = self.tls.listener.accept() => accept_tls_branch(self, value, update_tx.clone()).await, |
293 Ok(value) = self.tls.listener.accept() => accept_tls_branch(self, value, self.update_tx.clone()).await, |
291 client_message = update_rx.recv(), if !self.clients.is_empty() => client_message_branch(self, client_message).await |
294 client_message = self.update_rx.recv(), if !self.clients.is_empty() => client_message_branch(self, client_message).await |
292 } |
295 } |
293 } |
296 } |
294 } |
297 } |
295 |
298 |
296 async fn create_client( |
299 async fn create_client( |
340 async fn handle_response(&mut self, mut response: handlers::Response) { |
343 async fn handle_response(&mut self, mut response: handlers::Response) { |
341 if response.is_empty() { |
344 if response.is_empty() { |
342 return; |
345 return; |
343 } |
346 } |
344 |
347 |
|
348 for client_id in response.extract_removed_clients() { |
|
349 if self.clients.contains(client_id) { |
|
350 self.clients.remove(client_id); |
|
351 if self.clients.is_empty() { |
|
352 let (update_tx, update_rx) = channel(128); |
|
353 self.update_rx = update_rx; |
|
354 self.update_tx = update_tx; |
|
355 } |
|
356 } |
|
357 info!("Client {} removed", client_id); |
|
358 } |
|
359 |
345 debug!("{} pending server messages", response.len()); |
360 debug!("{} pending server messages", response.len()); |
346 let output = response.extract_messages(&mut self.server_state.server); |
361 let output = response.extract_messages(&mut self.server_state.server); |
347 for (clients, message) in output { |
362 for (clients, message) in output { |
348 debug!("Message {:?} to {:?}", message, clients); |
363 debug!("Message {:?} to {:?}", message, clients); |
349 Self::send_message(&mut self.clients, message, clients.iter().cloned()).await; |
364 Self::send_message(&mut self.clients, message, clients.iter().cloned()).await; |
350 } |
|
351 |
|
352 for client_id in response.extract_removed_clients() { |
|
353 if self.clients.contains(client_id) { |
|
354 self.clients.remove(client_id); |
|
355 } |
|
356 info!("Client {} removed", client_id); |
|
357 } |
365 } |
358 } |
366 } |
359 |
367 |
360 async fn send_message<I>( |
368 async fn send_message<I>( |
361 clients: &mut Slab<Sender<Bytes>>, |
369 clients: &mut Slab<Sender<Bytes>>, |
426 |
434 |
427 pub fn build(self) -> NetworkLayer { |
435 pub fn build(self) -> NetworkLayer { |
428 let server_state = ServerState::new(self.clients_capacity, self.rooms_capacity); |
436 let server_state = ServerState::new(self.clients_capacity, self.rooms_capacity); |
429 |
437 |
430 let clients = Slab::with_capacity(self.clients_capacity); |
438 let clients = Slab::with_capacity(self.clients_capacity); |
|
439 let (update_tx, update_rx) = channel(128); |
431 |
440 |
432 NetworkLayer { |
441 NetworkLayer { |
433 listener: self.listener.expect("No listener provided"), |
442 listener: self.listener.expect("No listener provided"), |
434 #[cfg(feature = "tls-connections")] |
443 #[cfg(feature = "tls-connections")] |
435 tls: TlsListener { |
444 tls: TlsListener { |
436 listener: self.tls_listener.expect("No TLS listener provided"), |
445 listener: self.tls_listener.expect("No TLS listener provided"), |
437 acceptor: self.tls_acceptor.expect("No TLS acceptor provided"), |
446 acceptor: self.tls_acceptor.expect("No TLS acceptor provided"), |
438 }, |
447 }, |
439 server_state, |
448 server_state, |
440 clients, |
449 clients, |
441 } |
450 update_tx, |
442 } |
451 update_rx |
443 } |
452 } |
|
453 } |
|
454 } |