1 use bytes::{Buf, Bytes}; |
1 use bytes::{Buf, Bytes}; |
2 use log::*; |
2 use log::*; |
3 use slab::Slab; |
3 use slab::Slab; |
4 use std::{ |
4 use std::{ |
5 collections::HashSet, |
|
6 io, |
|
7 io::{Error, ErrorKind, Read, Write}, |
|
8 iter::Iterator, |
5 iter::Iterator, |
9 mem::{replace, swap}, |
6 net::{IpAddr, SocketAddr}, |
10 net::{IpAddr, Ipv4Addr, SocketAddr}, |
|
11 num::NonZeroU32, |
|
12 time::Duration, |
7 time::Duration, |
13 time::Instant, |
|
14 }; |
8 }; |
15 use tokio::{ |
9 use tokio::{ |
16 io::AsyncReadExt, |
10 io::AsyncReadExt, |
17 net::{TcpListener, TcpStream}, |
11 net::{TcpListener, TcpStream}, |
18 sync::mpsc::{channel, Receiver, Sender}, |
12 sync::mpsc::{channel, Receiver, Sender}, |
23 events::{TimedEvents, Timeout}, |
17 events::{TimedEvents, Timeout}, |
24 types::ClientId, |
18 types::ClientId, |
25 }, |
19 }, |
26 handlers, |
20 handlers, |
27 handlers::{IoResult, IoTask, ServerState}, |
21 handlers::{IoResult, IoTask, ServerState}, |
28 protocol::ProtocolDecoder, |
22 protocol::{self, ProtocolDecoder, ProtocolError}, |
29 utils, |
23 utils, |
30 }; |
24 }; |
31 use hedgewars_network_protocol::{ |
25 use hedgewars_network_protocol::{ |
32 messages::HwServerMessage::Redirect, messages::*, parser::server_message, |
26 messages::HwServerMessage::Redirect, messages::*, parser::server_message, |
33 }; |
27 }; |
34 use tokio::io::AsyncWriteExt; |
28 use tokio::io::AsyncWriteExt; |
|
29 |
|
30 const PING_TIMEOUT: Duration = Duration::from_secs(15); |
35 |
31 |
36 enum ClientUpdateData { |
32 enum ClientUpdateData { |
37 Message(HwProtocolMessage), |
33 Message(HwProtocolMessage), |
38 Error(String), |
34 Error(String), |
39 } |
35 } |
78 Self { |
74 Self { |
79 id, |
75 id, |
80 socket, |
76 socket, |
81 peer_addr, |
77 peer_addr, |
82 receiver, |
78 receiver, |
83 decoder: ProtocolDecoder::new(), |
79 decoder: ProtocolDecoder::new(PING_TIMEOUT), |
84 } |
80 } |
85 } |
81 } |
86 |
82 |
87 async fn read(&mut self) -> Option<HwProtocolMessage> { |
83 async fn read( |
88 self.decoder.read_from(&mut self.socket).await |
84 socket: &mut TcpStream, |
89 } |
85 decoder: &mut ProtocolDecoder, |
90 |
86 ) -> protocol::Result<HwProtocolMessage> { |
91 async fn write(&mut self, mut data: Bytes) -> bool { |
87 let result = decoder.read_from(socket).await; |
92 !data.has_remaining() || matches!(self.socket.write_buf(&mut data).await, Ok(n) if n > 0) |
88 if matches!(result, Err(ProtocolError::Timeout)) { |
|
89 if Self::write(socket, Bytes::from(HwServerMessage::Ping.to_raw_protocol())).await { |
|
90 decoder.read_from(socket).await |
|
91 } else { |
|
92 Err(ProtocolError::Eof) |
|
93 } |
|
94 } else { |
|
95 result |
|
96 } |
|
97 } |
|
98 |
|
99 async fn write(socket: &mut TcpStream, mut data: Bytes) -> bool { |
|
100 !data.has_remaining() || matches!(socket.write_buf(&mut data).await, Ok(n) if n > 0) |
93 } |
101 } |
94 |
102 |
95 async fn run(mut self, sender: Sender<ClientUpdate>) { |
103 async fn run(mut self, sender: Sender<ClientUpdate>) { |
96 use ClientUpdateData::*; |
104 use ClientUpdateData::*; |
97 let mut sender = ClientUpdateSender { |
105 let mut sender = ClientUpdateSender { |
101 |
109 |
102 loop { |
110 loop { |
103 tokio::select! { |
111 tokio::select! { |
104 server_message = self.receiver.recv() => { |
112 server_message = self.receiver.recv() => { |
105 match server_message { |
113 match server_message { |
106 Some(message) => if !self.write(message).await { |
114 Some(message) => if !Self::write(&mut self.socket, message).await { |
107 sender.send(Error("Connection reset by peer".to_string())).await; |
115 sender.send(Error("Connection reset by peer".to_string())).await; |
108 break; |
116 break; |
109 } |
117 } |
110 None => { |
118 None => { |
111 break; |
119 break; |
112 } |
120 } |
113 } |
121 } |
114 } |
122 } |
115 client_message = self.decoder.read_from(&mut self.socket) => { |
123 client_message = Self::read(&mut self.socket, &mut self.decoder) => { |
116 match client_message { |
124 match client_message { |
117 Some(message) => { |
125 Ok(message) => { |
118 if !sender.send(Message(message)).await { |
126 if !sender.send(Message(message)).await { |
119 break; |
127 break; |
120 } |
128 } |
121 } |
129 } |
122 None => { |
130 Err(e) => { |
123 sender.send(Error("Connection reset by peer".to_string())).await; |
131 sender.send(Error(format!("{}", e))).await; |
|
132 if matches!(e, ProtocolError::Timeout) { |
|
133 Self::write(&mut self.socket, Bytes::from(HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol())).await; |
|
134 } |
124 break; |
135 break; |
125 } |
136 } |
126 } |
137 } |
127 } |
138 } |
128 } |
139 } |
151 use ClientUpdateData::*; |
162 use ClientUpdateData::*; |
152 match client_message { |
163 match client_message { |
153 Some(ClientUpdate{ client_id, data: Message(message) } ) => { |
164 Some(ClientUpdate{ client_id, data: Message(message) } ) => { |
154 self.handle_message(client_id, message).await; |
165 self.handle_message(client_id, message).await; |
155 } |
166 } |
156 Some(ClientUpdate{ client_id, .. } ) => { |
167 Some(ClientUpdate{ client_id, data: Error(e) } ) => { |
157 let mut response = handlers::Response::new(client_id); |
168 let mut response = handlers::Response::new(client_id); |
|
169 info!("Client {} error: {:?}", client_id, e); |
|
170 response.remove_client(client_id); |
158 handlers::handle_client_loss(&mut self.server_state, client_id, &mut response); |
171 handlers::handle_client_loss(&mut self.server_state, client_id, &mut response); |
159 self.handle_response(response).await; |
172 self.handle_response(response).await; |
160 } |
173 } |
161 None => unreachable!() |
174 None => unreachable!() |
162 } |
175 } |