author | alfadur <mail@none> |
Mon, 04 Feb 2019 19:22:21 +0300 | |
changeset 14672 | 6e6632068a33 |
parent 14671 | 455865ccd36c |
child 14673 | 08a8605bafaf |
permissions | -rw-r--r-- |
13119 | 1 |
extern crate slab; |
2 |
||
13414 | 3 |
use std::{ |
13415 | 4 |
collections::HashSet, |
14457 | 5 |
io, |
6 |
io::{Error, ErrorKind, Read, Write}, |
|
7 |
mem::{replace, swap}, |
|
8 |
net::{IpAddr, Ipv4Addr, SocketAddr}, |
|
13414 | 9 |
}; |
10 |
||
14457 | 11 |
use log::*; |
13414 | 12 |
use mio::{ |
14457 | 13 |
net::{TcpListener, TcpStream}, |
14 |
Poll, PollOpt, Ready, Token, |
|
13414 | 15 |
}; |
16 |
use netbuf; |
|
13119 | 17 |
use slab::Slab; |
18 |
||
14671
455865ccd36c
Server action refactoring part 2 of N
alfadur <mail@none>
parents:
14457
diff
changeset
|
19 |
use super::{core::HWServer, coretypes::ClientId, handlers, io::FileServerIO}; |
13666 | 20 |
use crate::{ |
14457 | 21 |
protocol::{messages::*, ProtocolDecoder}, |
13666 | 22 |
utils, |
13414 | 23 |
}; |
13799 | 24 |
#[cfg(feature = "tls-connections")] |
25 |
use openssl::{ |
|
14457 | 26 |
error::ErrorStack, |
13799 | 27 |
ssl::{ |
14457 | 28 |
HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype, |
29 |
SslMethod, SslOptions, SslStream, SslStreamBuilder, SslVerifyMode, |
|
13799 | 30 |
}, |
31 |
}; |
|
13414 | 32 |
|
33 |
const MAX_BYTES_PER_READ: usize = 2048; |
|
13119 | 34 |
|
13415 | 35 |
#[derive(Hash, Eq, PartialEq, Copy, Clone)] |
13414 | 36 |
pub enum NetworkClientState { |
37 |
Idle, |
|
38 |
NeedsWrite, |
|
39 |
NeedsRead, |
|
40 |
Closed, |
|
41 |
} |
|
42 |
||
43 |
type NetworkResult<T> = io::Result<(T, NetworkClientState)>; |
|
13119 | 44 |
|
13799 | 45 |
#[cfg(not(feature = "tls-connections"))] |
46 |
pub enum ClientSocket { |
|
14457 | 47 |
Plain(TcpStream), |
13799 | 48 |
} |
49 |
||
50 |
#[cfg(feature = "tls-connections")] |
|
51 |
pub enum ClientSocket { |
|
52 |
SslHandshake(Option<MidHandshakeSslStream<TcpStream>>), |
|
14457 | 53 |
SslStream(SslStream<TcpStream>), |
13799 | 54 |
} |
55 |
||
56 |
impl ClientSocket { |
|
57 |
fn inner(&self) -> &TcpStream { |
|
58 |
#[cfg(not(feature = "tls-connections"))] |
|
59 |
match self { |
|
60 |
ClientSocket::Plain(stream) => stream, |
|
61 |
} |
|
62 |
||
63 |
#[cfg(feature = "tls-connections")] |
|
64 |
match self { |
|
65 |
ClientSocket::SslHandshake(Some(builder)) => builder.get_ref(), |
|
66 |
ClientSocket::SslHandshake(None) => unreachable!(), |
|
14457 | 67 |
ClientSocket::SslStream(ssl_stream) => ssl_stream.get_ref(), |
13799 | 68 |
} |
69 |
} |
|
70 |
} |
|
71 |
||
13119 | 72 |
pub struct NetworkClient { |
73 |
id: ClientId, |
|
13799 | 74 |
socket: ClientSocket, |
13119 | 75 |
peer_addr: SocketAddr, |
76 |
decoder: ProtocolDecoder, |
|
14457 | 77 |
buf_out: netbuf::Buf, |
13119 | 78 |
} |
79 |
||
80 |
impl NetworkClient { |
|
13799 | 81 |
pub fn new(id: ClientId, socket: ClientSocket, peer_addr: SocketAddr) -> NetworkClient { |
13119 | 82 |
NetworkClient { |
14457 | 83 |
id, |
84 |
socket, |
|
85 |
peer_addr, |
|
13119 | 86 |
decoder: ProtocolDecoder::new(), |
14457 | 87 |
buf_out: netbuf::Buf::new(), |
13119 | 88 |
} |
89 |
} |
|
90 |
||
13802 | 91 |
#[cfg(feature = "tls-connections")] |
14457 | 92 |
fn handshake_impl( |
93 |
&mut self, |
|
94 |
handshake: MidHandshakeSslStream<TcpStream>, |
|
95 |
) -> io::Result<NetworkClientState> { |
|
13802 | 96 |
match handshake.handshake() { |
97 |
Ok(stream) => { |
|
98 |
self.socket = ClientSocket::SslStream(stream); |
|
14457 | 99 |
debug!( |
100 |
"TLS handshake with {} ({}) completed", |
|
101 |
self.id, self.peer_addr |
|
102 |
); |
|
13802 | 103 |
Ok(NetworkClientState::Idle) |
104 |
} |
|
105 |
Err(HandshakeError::WouldBlock(new_handshake)) => { |
|
106 |
self.socket = ClientSocket::SslHandshake(Some(new_handshake)); |
|
107 |
Ok(NetworkClientState::Idle) |
|
108 |
} |
|
13803 | 109 |
Err(HandshakeError::Failure(new_handshake)) => { |
110 |
self.socket = ClientSocket::SslHandshake(Some(new_handshake)); |
|
13802 | 111 |
debug!("TLS handshake with {} ({}) failed", self.id, self.peer_addr); |
112 |
Err(Error::new(ErrorKind::Other, "Connection failure")) |
|
113 |
} |
|
14457 | 114 |
Err(HandshakeError::SetupFailure(_)) => unreachable!(), |
13802 | 115 |
} |
116 |
} |
|
117 |
||
14457 | 118 |
fn read_impl<R: Read>( |
119 |
decoder: &mut ProtocolDecoder, |
|
120 |
source: &mut R, |
|
121 |
id: ClientId, |
|
122 |
addr: &SocketAddr, |
|
123 |
) -> NetworkResult<Vec<HWProtocolMessage>> { |
|
13414 | 124 |
let mut bytes_read = 0; |
125 |
let result = loop { |
|
13799 | 126 |
match decoder.read_from(source) { |
13414 | 127 |
Ok(bytes) => { |
13799 | 128 |
debug!("Client {}: read {} bytes", id, bytes); |
13414 | 129 |
bytes_read += bytes; |
130 |
if bytes == 0 { |
|
131 |
let result = if bytes_read == 0 { |
|
13799 | 132 |
info!("EOF for client {} ({})", id, addr); |
13414 | 133 |
(Vec::new(), NetworkClientState::Closed) |
134 |
} else { |
|
13799 | 135 |
(decoder.extract_messages(), NetworkClientState::NeedsRead) |
13414 | 136 |
}; |
137 |
break Ok(result); |
|
14457 | 138 |
} else if bytes_read >= MAX_BYTES_PER_READ { |
139 |
break Ok((decoder.extract_messages(), NetworkClientState::NeedsRead)); |
|
13414 | 140 |
} |
141 |
} |
|
142 |
Err(ref error) if error.kind() == ErrorKind::WouldBlock => { |
|
14457 | 143 |
let messages = if bytes_read == 0 { |
13414 | 144 |
Vec::new() |
145 |
} else { |
|
13799 | 146 |
decoder.extract_messages() |
13414 | 147 |
}; |
148 |
break Ok((messages, NetworkClientState::Idle)); |
|
149 |
} |
|
14457 | 150 |
Err(error) => break Err(error), |
13414 | 151 |
} |
152 |
}; |
|
13799 | 153 |
decoder.sweep(); |
13414 | 154 |
result |
155 |
} |
|
156 |
||
13799 | 157 |
pub fn read(&mut self) -> NetworkResult<Vec<HWProtocolMessage>> { |
158 |
#[cfg(not(feature = "tls-connections"))] |
|
159 |
match self.socket { |
|
14457 | 160 |
ClientSocket::Plain(ref mut stream) => { |
161 |
NetworkClient::read_impl(&mut self.decoder, stream, self.id, &self.peer_addr) |
|
162 |
} |
|
13799 | 163 |
} |
164 |
||
165 |
#[cfg(feature = "tls-connections")] |
|
166 |
match self.socket { |
|
167 |
ClientSocket::SslHandshake(ref mut handshake_opt) => { |
|
13802 | 168 |
let handshake = std::mem::replace(handshake_opt, None).unwrap(); |
169 |
Ok((Vec::new(), self.handshake_impl(handshake)?)) |
|
14457 | 170 |
} |
171 |
ClientSocket::SslStream(ref mut stream) => { |
|
13799 | 172 |
NetworkClient::read_impl(&mut self.decoder, stream, self.id, &self.peer_addr) |
14457 | 173 |
} |
13799 | 174 |
} |
175 |
} |
|
176 |
||
177 |
fn write_impl<W: Write>(buf_out: &mut netbuf::Buf, destination: &mut W) -> NetworkResult<()> { |
|
13414 | 178 |
let result = loop { |
13799 | 179 |
match buf_out.write_to(destination) { |
14457 | 180 |
Ok(bytes) if buf_out.is_empty() || bytes == 0 => { |
14671
455865ccd36c
Server action refactoring part 2 of N
alfadur <mail@none>
parents:
14457
diff
changeset
|
181 |
break Ok(((), NetworkClientState::Idle)); |
14457 | 182 |
} |
13415 | 183 |
Ok(_) => (), |
14457 | 184 |
Err(ref error) |
185 |
if error.kind() == ErrorKind::Interrupted |
|
186 |
|| error.kind() == ErrorKind::WouldBlock => |
|
187 |
{ |
|
13414 | 188 |
break Ok(((), NetworkClientState::NeedsWrite)); |
14457 | 189 |
} |
190 |
Err(error) => break Err(error), |
|
13414 | 191 |
} |
192 |
}; |
|
13799 | 193 |
result |
194 |
} |
|
195 |
||
196 |
pub fn write(&mut self) -> NetworkResult<()> { |
|
197 |
let result = { |
|
198 |
#[cfg(not(feature = "tls-connections"))] |
|
199 |
match self.socket { |
|
14457 | 200 |
ClientSocket::Plain(ref mut stream) => { |
13799 | 201 |
NetworkClient::write_impl(&mut self.buf_out, stream) |
14457 | 202 |
} |
13799 | 203 |
} |
204 |
||
14457 | 205 |
#[cfg(feature = "tls-connections")] |
206 |
{ |
|
13799 | 207 |
match self.socket { |
13802 | 208 |
ClientSocket::SslHandshake(ref mut handshake_opt) => { |
209 |
let handshake = std::mem::replace(handshake_opt, None).unwrap(); |
|
210 |
Ok(((), self.handshake_impl(handshake)?)) |
|
211 |
} |
|
14457 | 212 |
ClientSocket::SslStream(ref mut stream) => { |
13799 | 213 |
NetworkClient::write_impl(&mut self.buf_out, stream) |
14457 | 214 |
} |
13799 | 215 |
} |
216 |
} |
|
217 |
}; |
|
218 |
||
219 |
self.socket.inner().flush()?; |
|
13414 | 220 |
result |
221 |
} |
|
222 |
||
13119 | 223 |
pub fn send_raw_msg(&mut self, msg: &[u8]) { |
13524 | 224 |
self.buf_out.write_all(msg).unwrap(); |
13119 | 225 |
} |
226 |
||
13524 | 227 |
pub fn send_string(&mut self, msg: &str) { |
13119 | 228 |
self.send_raw_msg(&msg.as_bytes()); |
229 |
} |
|
230 |
||
13524 | 231 |
pub fn send_msg(&mut self, msg: &HWServerMessage) { |
13119 | 232 |
self.send_string(&msg.to_raw_protocol()); |
233 |
} |
|
234 |
} |
|
235 |
||
13799 | 236 |
#[cfg(feature = "tls-connections")] |
237 |
struct ServerSsl { |
|
14457 | 238 |
context: SslContext, |
13799 | 239 |
} |
240 |
||
13119 | 241 |
pub struct NetworkLayer { |
242 |
listener: TcpListener, |
|
243 |
server: HWServer, |
|
13414 | 244 |
clients: Slab<NetworkClient>, |
13415 | 245 |
pending: HashSet<(ClientId, NetworkClientState)>, |
13799 | 246 |
pending_cache: Vec<(ClientId, NetworkClientState)>, |
247 |
#[cfg(feature = "tls-connections")] |
|
14457 | 248 |
ssl: ServerSsl, |
13119 | 249 |
} |
250 |
||
251 |
impl NetworkLayer { |
|
252 |
pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
|
14392 | 253 |
let server = HWServer::new(clients_limit, rooms_limit, Box::new(FileServerIO::new())); |
13119 | 254 |
let clients = Slab::with_capacity(clients_limit); |
13415 | 255 |
let pending = HashSet::with_capacity(2 * clients_limit); |
256 |
let pending_cache = Vec::with_capacity(2 * clients_limit); |
|
13799 | 257 |
|
258 |
NetworkLayer { |
|
14457 | 259 |
listener, |
260 |
server, |
|
261 |
clients, |
|
262 |
pending, |
|
263 |
pending_cache, |
|
13799 | 264 |
#[cfg(feature = "tls-connections")] |
14457 | 265 |
ssl: NetworkLayer::create_ssl_context(), |
13799 | 266 |
} |
267 |
} |
|
268 |
||
269 |
#[cfg(feature = "tls-connections")] |
|
270 |
fn create_ssl_context() -> ServerSsl { |
|
271 |
let mut builder = SslContextBuilder::new(SslMethod::tls()).unwrap(); |
|
272 |
builder.set_verify(SslVerifyMode::NONE); |
|
273 |
builder.set_read_ahead(true); |
|
14457 | 274 |
builder |
275 |
.set_certificate_file("ssl/cert.pem", SslFiletype::PEM) |
|
276 |
.unwrap(); |
|
277 |
builder |
|
278 |
.set_private_key_file("ssl/key.pem", SslFiletype::PEM) |
|
279 |
.unwrap(); |
|
13799 | 280 |
builder.set_options(SslOptions::NO_COMPRESSION); |
281 |
builder.set_cipher_list("DEFAULT:!LOW:!RC4:!EXP").unwrap(); |
|
14457 | 282 |
ServerSsl { |
283 |
context: builder.build(), |
|
284 |
} |
|
13119 | 285 |
} |
286 |
||
287 |
pub fn register_server(&self, poll: &Poll) -> io::Result<()> { |
|
14457 | 288 |
poll.register( |
289 |
&self.listener, |
|
290 |
utils::SERVER, |
|
291 |
Ready::readable(), |
|
292 |
PollOpt::edge(), |
|
293 |
) |
|
13119 | 294 |
} |
295 |
||
296 |
fn deregister_client(&mut self, poll: &Poll, id: ClientId) { |
|
297 |
let mut client_exists = false; |
|
13414 | 298 |
if let Some(ref client) = self.clients.get(id) { |
13799 | 299 |
poll.deregister(client.socket.inner()) |
13524 | 300 |
.expect("could not deregister socket"); |
13119 | 301 |
info!("client {} ({}) removed", client.id, client.peer_addr); |
302 |
client_exists = true; |
|
303 |
} |
|
304 |
if client_exists { |
|
305 |
self.clients.remove(id); |
|
306 |
} |
|
307 |
} |
|
308 |
||
14457 | 309 |
fn register_client( |
310 |
&mut self, |
|
311 |
poll: &Poll, |
|
312 |
id: ClientId, |
|
313 |
client_socket: ClientSocket, |
|
314 |
addr: SocketAddr, |
|
315 |
) { |
|
316 |
poll.register( |
|
317 |
client_socket.inner(), |
|
318 |
Token(id), |
|
319 |
Ready::readable() | Ready::writable(), |
|
320 |
PollOpt::edge(), |
|
321 |
) |
|
322 |
.expect("could not register socket with event loop"); |
|
13119 | 323 |
|
324 |
let entry = self.clients.vacant_entry(); |
|
325 |
let client = NetworkClient::new(id, client_socket, addr); |
|
326 |
info!("client {} ({}) added", client.id, client.peer_addr); |
|
327 |
entry.insert(client); |
|
328 |
} |
|
329 |
||
14672
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
330 |
fn flush_server_messages(&mut self, mut response: handlers::Response) { |
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
331 |
debug!("{} pending server messages", response.len()); |
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
332 |
let output = response.extract_messages(&mut self.server); |
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
333 |
for (clients, message) in output { |
13419 | 334 |
debug!("Message {:?} to {:?}", message, clients); |
335 |
let msg_string = message.to_raw_protocol(); |
|
336 |
for client_id in clients { |
|
337 |
if let Some(client) = self.clients.get_mut(client_id) { |
|
338 |
client.send_string(&msg_string); |
|
14457 | 339 |
self.pending |
340 |
.insert((client_id, NetworkClientState::NeedsWrite)); |
|
13414 | 341 |
} |
342 |
} |
|
343 |
} |
|
344 |
} |
|
345 |
||
13799 | 346 |
fn create_client_socket(&self, socket: TcpStream) -> io::Result<ClientSocket> { |
14457 | 347 |
#[cfg(not(feature = "tls-connections"))] |
348 |
{ |
|
13799 | 349 |
Ok(ClientSocket::Plain(socket)) |
350 |
} |
|
351 |
||
14457 | 352 |
#[cfg(feature = "tls-connections")] |
353 |
{ |
|
13799 | 354 |
let ssl = Ssl::new(&self.ssl.context).unwrap(); |
355 |
let mut builder = SslStreamBuilder::new(ssl, socket); |
|
356 |
builder.set_accept_state(); |
|
357 |
match builder.handshake() { |
|
14457 | 358 |
Ok(stream) => Ok(ClientSocket::SslStream(stream)), |
359 |
Err(HandshakeError::WouldBlock(stream)) => { |
|
360 |
Ok(ClientSocket::SslHandshake(Some(stream))) |
|
361 |
} |
|
13799 | 362 |
Err(e) => { |
363 |
debug!("OpenSSL handshake failed: {}", e); |
|
364 |
Err(Error::new(ErrorKind::Other, "Connection failure")) |
|
365 |
} |
|
366 |
} |
|
367 |
} |
|
368 |
} |
|
369 |
||
13119 | 370 |
pub fn accept_client(&mut self, poll: &Poll) -> io::Result<()> { |
371 |
let (client_socket, addr) = self.listener.accept()?; |
|
372 |
info!("Connected: {}", addr); |
|
373 |
||
374 |
let client_id = self.server.add_client(); |
|
14457 | 375 |
self.register_client( |
376 |
poll, |
|
377 |
client_id, |
|
378 |
self.create_client_socket(client_socket)?, |
|
379 |
addr, |
|
380 |
); |
|
14672
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
381 |
//TODO: create response for initial messages |
13119 | 382 |
|
383 |
Ok(()) |
|
384 |
} |
|
385 |
||
14457 | 386 |
fn operation_failed( |
387 |
&mut self, |
|
388 |
poll: &Poll, |
|
389 |
client_id: ClientId, |
|
390 |
error: &Error, |
|
391 |
msg: &str, |
|
392 |
) -> io::Result<()> { |
|
13414 | 393 |
let addr = if let Some(ref mut client) = self.clients.get_mut(client_id) { |
394 |
client.peer_addr |
|
395 |
} else { |
|
396 |
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) |
|
397 |
}; |
|
398 |
debug!("{}({}): {}", msg, addr, error); |
|
399 |
self.client_error(poll, client_id) |
|
13119 | 400 |
} |
401 |
||
14457 | 402 |
pub fn client_readable(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> { |
403 |
let messages = if let Some(ref mut client) = self.clients.get_mut(client_id) { |
|
404 |
client.read() |
|
405 |
} else { |
|
406 |
warn!("invalid readable client: {}", client_id); |
|
407 |
Ok((Vec::new(), NetworkClientState::Idle)) |
|
408 |
}; |
|
13414 | 409 |
|
14671
455865ccd36c
Server action refactoring part 2 of N
alfadur <mail@none>
parents:
14457
diff
changeset
|
410 |
let mut response = handlers::Response::new(client_id); |
455865ccd36c
Server action refactoring part 2 of N
alfadur <mail@none>
parents:
14457
diff
changeset
|
411 |
|
13414 | 412 |
match messages { |
413 |
Ok((messages, state)) => { |
|
414 |
for message in messages { |
|
14671
455865ccd36c
Server action refactoring part 2 of N
alfadur <mail@none>
parents:
14457
diff
changeset
|
415 |
debug!("Handling message {:?} for client {}", message, client_id); |
455865ccd36c
Server action refactoring part 2 of N
alfadur <mail@none>
parents:
14457
diff
changeset
|
416 |
if self.server.clients.contains(client_id) { |
455865ccd36c
Server action refactoring part 2 of N
alfadur <mail@none>
parents:
14457
diff
changeset
|
417 |
handlers::handle(&mut self.server, client_id, &mut response, message); |
455865ccd36c
Server action refactoring part 2 of N
alfadur <mail@none>
parents:
14457
diff
changeset
|
418 |
} |
13414 | 419 |
} |
420 |
match state { |
|
13415 | 421 |
NetworkClientState::NeedsRead => { |
422 |
self.pending.insert((client_id, state)); |
|
14457 | 423 |
} |
424 |
NetworkClientState::Closed => self.client_error(&poll, client_id)?, |
|
13414 | 425 |
_ => {} |
426 |
}; |
|
13119 | 427 |
} |
13414 | 428 |
Err(e) => self.operation_failed( |
14457 | 429 |
poll, |
430 |
client_id, |
|
431 |
&e, |
|
432 |
"Error while reading from client socket", |
|
433 |
)?, |
|
13119 | 434 |
} |
435 |
||
14672
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
436 |
if !response.is_empty() { |
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
437 |
self.flush_server_messages(response); |
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14671
diff
changeset
|
438 |
} |
13414 | 439 |
|
13119 | 440 |
if !self.server.removed_clients.is_empty() { |
13414 | 441 |
let ids: Vec<_> = self.server.removed_clients.drain(..).collect(); |
13119 | 442 |
for client_id in ids { |
443 |
self.deregister_client(poll, client_id); |
|
444 |
} |
|
445 |
} |
|
446 |
||
447 |
Ok(()) |
|
448 |
} |
|
449 |
||
14457 | 450 |
pub fn client_writable(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> { |
451 |
let result = if let Some(ref mut client) = self.clients.get_mut(client_id) { |
|
452 |
client.write() |
|
453 |
} else { |
|
454 |
warn!("invalid writable client: {}", client_id); |
|
455 |
Ok(((), NetworkClientState::Idle)) |
|
456 |
}; |
|
13414 | 457 |
|
458 |
match result { |
|
13415 | 459 |
Ok(((), state)) if state == NetworkClientState::NeedsWrite => { |
460 |
self.pending.insert((client_id, state)); |
|
14457 | 461 |
} |
13415 | 462 |
Ok(_) => {} |
14457 | 463 |
Err(e) => { |
464 |
self.operation_failed(poll, client_id, &e, "Error while writing to client socket")? |
|
465 |
} |
|
13119 | 466 |
} |
467 |
||
468 |
Ok(()) |
|
469 |
} |
|
470 |
||
14457 | 471 |
pub fn client_error(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> { |
13119 | 472 |
self.deregister_client(poll, client_id); |
473 |
self.server.client_lost(client_id); |
|
474 |
||
475 |
Ok(()) |
|
476 |
} |
|
13414 | 477 |
|
478 |
pub fn has_pending_operations(&self) -> bool { |
|
479 |
!self.pending.is_empty() |
|
480 |
} |
|
481 |
||
482 |
pub fn on_idle(&mut self, poll: &Poll) -> io::Result<()> { |
|
13415 | 483 |
if self.has_pending_operations() { |
13478 | 484 |
let mut cache = replace(&mut self.pending_cache, Vec::new()); |
13415 | 485 |
cache.extend(self.pending.drain()); |
486 |
for (id, state) in cache.drain(..) { |
|
487 |
match state { |
|
14457 | 488 |
NetworkClientState::NeedsRead => self.client_readable(poll, id)?, |
489 |
NetworkClientState::NeedsWrite => self.client_writable(poll, id)?, |
|
13415 | 490 |
_ => {} |
491 |
} |
|
13414 | 492 |
} |
13415 | 493 |
swap(&mut cache, &mut self.pending_cache); |
13414 | 494 |
} |
495 |
Ok(()) |
|
496 |
} |
|
13119 | 497 |
} |