author | alfadur |
Tue, 01 Feb 2022 20:58:35 +0300 | |
changeset 15832 | a4d505a32879 |
parent 15831 | 7d0f747afcb8 |
child 15936 | c5c53ebb2d91 |
permissions | -rw-r--r-- |
15831 | 1 |
use bytes::{Buf, Bytes}; |
2 |
use log::*; |
|
3 |
use slab::Slab; |
|
13414 | 4 |
use std::{ |
15831 | 5 |
iter::Iterator, |
15832 | 6 |
net::{IpAddr, SocketAddr}, |
15800 | 7 |
time::Duration, |
13414 | 8 |
}; |
15831 | 9 |
use tokio::{ |
10 |
io::AsyncReadExt, |
|
14457 | 11 |
net::{TcpListener, TcpStream}, |
15831 | 12 |
sync::mpsc::{channel, Receiver, Sender}, |
13414 | 13 |
}; |
13119 | 14 |
|
13666 | 15 |
use crate::{ |
15800 | 16 |
core::{ |
17 |
events::{TimedEvents, Timeout}, |
|
18 |
types::ClientId, |
|
19 |
}, |
|
15074 | 20 |
handlers, |
15520 | 21 |
handlers::{IoResult, IoTask, ServerState}, |
15832 | 22 |
protocol::{self, ProtocolDecoder, ProtocolError}, |
13666 | 23 |
utils, |
13414 | 24 |
}; |
15831 | 25 |
use hedgewars_network_protocol::{ |
26 |
messages::HwServerMessage::Redirect, messages::*, parser::server_message, |
|
27 |
}; |
|
28 |
use tokio::io::AsyncWriteExt; |
|
14779
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14697
diff
changeset
|
29 |
|
15832 | 30 |
const PING_TIMEOUT: Duration = Duration::from_secs(15); |
31 |
||
15831 | 32 |
enum ClientUpdateData { |
33 |
Message(HwProtocolMessage), |
|
34 |
Error(String), |
|
35 |
} |
|
36 |
||
37 |
struct ClientUpdate { |
|
38 |
client_id: ClientId, |
|
39 |
data: ClientUpdateData, |
|
40 |
} |
|
13414 | 41 |
|
15831 | 42 |
struct ClientUpdateSender { |
43 |
client_id: ClientId, |
|
44 |
sender: Sender<ClientUpdate>, |
|
45 |
} |
|
13119 | 46 |
|
15831 | 47 |
impl ClientUpdateSender { |
48 |
async fn send(&mut self, data: ClientUpdateData) -> bool { |
|
49 |
self.sender |
|
50 |
.send(ClientUpdate { |
|
51 |
client_id: self.client_id, |
|
52 |
data, |
|
53 |
}) |
|
54 |
.await |
|
55 |
.is_ok() |
|
56 |
} |
|
57 |
} |
|
58 |
||
59 |
struct NetworkClient { |
|
60 |
id: ClientId, |
|
61 |
socket: TcpStream, |
|
62 |
receiver: Receiver<Bytes>, |
|
63 |
peer_addr: SocketAddr, |
|
64 |
decoder: ProtocolDecoder, |
|
13414 | 65 |
} |
66 |
||
15831 | 67 |
impl NetworkClient { |
68 |
fn new( |
|
69 |
id: ClientId, |
|
70 |
socket: TcpStream, |
|
71 |
peer_addr: SocketAddr, |
|
72 |
receiver: Receiver<Bytes>, |
|
73 |
) -> Self { |
|
74 |
Self { |
|
75 |
id, |
|
76 |
socket, |
|
77 |
peer_addr, |
|
78 |
receiver, |
|
15832 | 79 |
decoder: ProtocolDecoder::new(PING_TIMEOUT), |
15831 | 80 |
} |
81 |
} |
|
13119 | 82 |
|
15832 | 83 |
async fn read( |
84 |
socket: &mut TcpStream, |
|
85 |
decoder: &mut ProtocolDecoder, |
|
86 |
) -> protocol::Result<HwProtocolMessage> { |
|
87 |
let result = decoder.read_from(socket).await; |
|
88 |
if matches!(result, Err(ProtocolError::Timeout)) { |
|
89 |
if Self::write(socket, Bytes::from(HwServerMessage::Ping.to_raw_protocol())).await { |
|
90 |
decoder.read_from(socket).await |
|
91 |
} else { |
|
92 |
Err(ProtocolError::Eof) |
|
93 |
} |
|
94 |
} else { |
|
95 |
result |
|
96 |
} |
|
15831 | 97 |
} |
98 |
||
15832 | 99 |
async fn write(socket: &mut TcpStream, mut data: Bytes) -> bool { |
100 |
!data.has_remaining() || matches!(socket.write_buf(&mut data).await, Ok(n) if n > 0) |
|
15831 | 101 |
} |
13799 | 102 |
|
15831 | 103 |
async fn run(mut self, sender: Sender<ClientUpdate>) { |
104 |
use ClientUpdateData::*; |
|
105 |
let mut sender = ClientUpdateSender { |
|
106 |
client_id: self.id, |
|
107 |
sender, |
|
108 |
}; |
|
109 |
||
110 |
loop { |
|
111 |
tokio::select! { |
|
112 |
server_message = self.receiver.recv() => { |
|
113 |
match server_message { |
|
15832 | 114 |
Some(message) => if !Self::write(&mut self.socket, message).await { |
15831 | 115 |
sender.send(Error("Connection reset by peer".to_string())).await; |
116 |
break; |
|
117 |
} |
|
118 |
None => { |
|
119 |
break; |
|
120 |
} |
|
121 |
} |
|
122 |
} |
|
15832 | 123 |
client_message = Self::read(&mut self.socket, &mut self.decoder) => { |
15831 | 124 |
match client_message { |
15832 | 125 |
Ok(message) => { |
15831 | 126 |
if !sender.send(Message(message)).await { |
127 |
break; |
|
128 |
} |
|
129 |
} |
|
15832 | 130 |
Err(e) => { |
131 |
sender.send(Error(format!("{}", e))).await; |
|
132 |
if matches!(e, ProtocolError::Timeout) { |
|
133 |
Self::write(&mut self.socket, Bytes::from(HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol())).await; |
|
134 |
} |
|
15831 | 135 |
break; |
136 |
} |
|
137 |
} |
|
138 |
} |
|
139 |
} |
|
13799 | 140 |
} |
141 |
} |
|
142 |
} |
|
143 |
||
15831 | 144 |
pub struct NetworkLayer { |
145 |
listener: TcpListener, |
|
146 |
server_state: ServerState, |
|
147 |
clients: Slab<Sender<Bytes>>, |
|
13119 | 148 |
} |
149 |
||
15831 | 150 |
impl NetworkLayer { |
151 |
pub async fn run(&mut self) { |
|
152 |
let (update_tx, mut update_rx) = channel(128); |
|
13119 | 153 |
|
15831 | 154 |
loop { |
155 |
tokio::select! { |
|
156 |
Ok((stream, addr)) = self.listener.accept() => { |
|
157 |
if let Some(client) = self.create_client(stream, addr).await { |
|
158 |
tokio::spawn(client.run(update_tx.clone())); |
|
159 |
} |
|
160 |
} |
|
161 |
client_message = update_rx.recv(), if !self.clients.is_empty() => { |
|
162 |
use ClientUpdateData::*; |
|
163 |
match client_message { |
|
164 |
Some(ClientUpdate{ client_id, data: Message(message) } ) => { |
|
165 |
self.handle_message(client_id, message).await; |
|
166 |
} |
|
15832 | 167 |
Some(ClientUpdate{ client_id, data: Error(e) } ) => { |
15831 | 168 |
let mut response = handlers::Response::new(client_id); |
15832 | 169 |
info!("Client {} error: {:?}", client_id, e); |
170 |
response.remove_client(client_id); |
|
15831 | 171 |
handlers::handle_client_loss(&mut self.server_state, client_id, &mut response); |
172 |
self.handle_response(response).await; |
|
173 |
} |
|
174 |
None => unreachable!() |
|
175 |
} |
|
176 |
} |
|
13802 | 177 |
} |
178 |
} |
|
179 |
} |
|
180 |
||
15831 | 181 |
async fn create_client( |
182 |
&mut self, |
|
183 |
stream: TcpStream, |
|
184 |
addr: SocketAddr, |
|
185 |
) -> Option<NetworkClient> { |
|
186 |
let entry = self.clients.vacant_entry(); |
|
187 |
let client_id = entry.key(); |
|
188 |
let (tx, rx) = channel(16); |
|
189 |
entry.insert(tx); |
|
190 |
||
191 |
let client = NetworkClient::new(client_id, stream, addr, rx); |
|
13414 | 192 |
|
15831 | 193 |
info!("client {} ({}) added", client.id, client.peer_addr); |
194 |
||
195 |
let mut response = handlers::Response::new(client_id); |
|
196 |
||
197 |
let added = if let IpAddr::V4(addr) = client.peer_addr.ip() { |
|
198 |
handlers::handle_client_accept( |
|
199 |
&mut self.server_state, |
|
200 |
client_id, |
|
201 |
&mut response, |
|
202 |
addr.octets(), |
|
203 |
addr.is_loopback(), |
|
204 |
) |
|
205 |
} else { |
|
206 |
todo!("implement something") |
|
15800 | 207 |
}; |
208 |
||
15831 | 209 |
self.handle_response(response).await; |
13414 | 210 |
|
15831 | 211 |
if added { |
212 |
Some(client) |
|
14779
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14697
diff
changeset
|
213 |
} else { |
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14697
diff
changeset
|
214 |
None |
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14697
diff
changeset
|
215 |
} |
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14697
diff
changeset
|
216 |
} |
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14697
diff
changeset
|
217 |
|
15831 | 218 |
async fn handle_message(&mut self, client_id: ClientId, message: HwProtocolMessage) { |
219 |
debug!("Handling message {:?} for client {}", message, client_id); |
|
220 |
let mut response = handlers::Response::new(client_id); |
|
221 |
handlers::handle(&mut self.server_state, client_id, &mut response, message); |
|
222 |
self.handle_response(response).await; |
|
13119 | 223 |
} |
224 |
||
15831 | 225 |
async fn handle_response(&mut self, mut response: handlers::Response) { |
14830
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
226 |
if response.is_empty() { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
227 |
return; |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
228 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
229 |
|
14672
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
230 |
debug!("{} pending server messages", response.len()); |
15520 | 231 |
let output = response.extract_messages(&mut self.server_state.server); |
14672
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
232 |
for (clients, message) in output { |
13419 | 233 |
debug!("Message {:?} to {:?}", message, clients); |
15831 | 234 |
Self::send_message(&mut self.clients, message, clients.iter().cloned()).await; |
13414 | 235 |
} |
14696 | 236 |
|
237 |
for client_id in response.extract_removed_clients() { |
|
15831 | 238 |
if self.clients.contains(client_id) { |
239 |
self.clients.remove(client_id); |
|
14803 | 240 |
} |
15831 | 241 |
info!("Client {} removed", client_id); |
15517 | 242 |
} |
14835 | 243 |
} |
244 |
||
15831 | 245 |
async fn send_message<I>( |
246 |
clients: &mut Slab<Sender<Bytes>>, |
|
247 |
message: HwServerMessage, |
|
248 |
to_clients: I, |
|
249 |
) where |
|
250 |
I: Iterator<Item = ClientId>, |
|
251 |
{ |
|
252 |
let msg_string = message.to_raw_protocol(); |
|
253 |
let bytes = Bytes::copy_from_slice(msg_string.as_bytes()); |
|
254 |
for client_id in to_clients { |
|
255 |
if let Some(client) = clients.get_mut(client_id) { |
|
256 |
if !client.send(bytes.clone()).await.is_ok() { |
|
257 |
clients.remove(client_id); |
|
13414 | 258 |
} |
14457 | 259 |
} |
13119 | 260 |
} |
13414 | 261 |
} |
13119 | 262 |
} |
14830
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
263 |
|
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
264 |
pub struct NetworkLayerBuilder { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
265 |
listener: Option<TcpListener>, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
266 |
clients_capacity: usize, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
267 |
rooms_capacity: usize, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
268 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
269 |
|
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
270 |
impl Default for NetworkLayerBuilder { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
271 |
fn default() -> Self { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
272 |
Self { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
273 |
clients_capacity: 1024, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
274 |
rooms_capacity: 512, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
275 |
listener: None, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
276 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
277 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
278 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
279 |
|
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
280 |
impl NetworkLayerBuilder { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
281 |
pub fn with_listener(self, listener: TcpListener) -> Self { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
282 |
Self { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
283 |
listener: Some(listener), |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
284 |
..self |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
285 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
286 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
287 |
|
15831 | 288 |
pub fn build(self) -> NetworkLayer { |
15520 | 289 |
let server_state = ServerState::new(self.clients_capacity, self.rooms_capacity); |
290 |
||
14830
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
291 |
let clients = Slab::with_capacity(self.clients_capacity); |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
292 |
|
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
293 |
NetworkLayer { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
294 |
listener: self.listener.expect("No listener provided"), |
15520 | 295 |
server_state, |
14830
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
296 |
clients, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
297 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
298 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14807
diff
changeset
|
299 |
} |