183 NetworkClient::read_impl(&mut self.decoder, stream, self.id, &self.peer_addr) |
185 NetworkClient::read_impl(&mut self.decoder, stream, self.id, &self.peer_addr) |
184 } |
186 } |
185 } |
187 } |
186 } |
188 } |
187 |
189 |
188 fn write_impl<W: Write>(buf_out: &mut netbuf::Buf, destination: &mut W) -> NetworkResult<()> { |
190 fn write_impl<W: Write>( |
|
191 buf_out: &mut netbuf::Buf, |
|
192 destination: &mut W, |
|
193 close_on_empty: bool, |
|
194 ) -> NetworkResult<()> { |
189 let result = loop { |
195 let result = loop { |
190 match buf_out.write_to(destination) { |
196 match buf_out.write_to(destination) { |
191 Ok(bytes) if buf_out.is_empty() || bytes == 0 => { |
197 Ok(bytes) if buf_out.is_empty() || bytes == 0 => { |
192 break Ok(((), NetworkClientState::Idle)); |
198 let status = if buf_out.is_empty() && close_on_empty { |
|
199 NetworkClientState::Closed |
|
200 } else { |
|
201 NetworkClientState::Idle |
|
202 }; |
|
203 break Ok(((), status)); |
193 } |
204 } |
194 Ok(_) => (), |
205 Ok(_) => (), |
195 Err(ref error) |
206 Err(ref error) |
196 if error.kind() == ErrorKind::Interrupted |
207 if error.kind() == ErrorKind::Interrupted |
197 || error.kind() == ErrorKind::WouldBlock => |
208 || error.kind() == ErrorKind::WouldBlock => |
205 } |
216 } |
206 |
217 |
207 pub fn write(&mut self) -> NetworkResult<()> { |
218 pub fn write(&mut self) -> NetworkResult<()> { |
208 let result = match self.socket { |
219 let result = match self.socket { |
209 ClientSocket::Plain(ref mut stream) => { |
220 ClientSocket::Plain(ref mut stream) => { |
210 NetworkClient::write_impl(&mut self.buf_out, stream) |
221 NetworkClient::write_impl(&mut self.buf_out, stream, self.pending_close) |
211 } |
222 } |
212 #[cfg(feature = "tls-connections")] |
223 #[cfg(feature = "tls-connections")] |
213 ClientSocket::SslHandshake(ref mut handshake_opt) => { |
224 ClientSocket::SslHandshake(ref mut handshake_opt) => { |
214 let handshake = std::mem::replace(handshake_opt, None).unwrap(); |
225 let handshake = std::mem::replace(handshake_opt, None).unwrap(); |
215 Ok(((), self.handshake_impl(handshake)?)) |
226 Ok(((), self.handshake_impl(handshake)?)) |
216 } |
227 } |
217 #[cfg(feature = "tls-connections")] |
228 #[cfg(feature = "tls-connections")] |
218 ClientSocket::SslStream(ref mut stream) => { |
229 ClientSocket::SslStream(ref mut stream) => { |
219 NetworkClient::write_impl(&mut self.buf_out, stream) |
230 NetworkClient::write_impl(&mut self.buf_out, stream, self.pending_close) |
220 } |
231 } |
221 }; |
232 }; |
222 |
233 |
223 self.socket.inner().flush()?; |
234 self.socket.inner().flush()?; |
224 result |
235 result |
232 self.send_raw_msg(&msg.as_bytes()); |
243 self.send_raw_msg(&msg.as_bytes()); |
233 } |
244 } |
234 |
245 |
235 pub fn replace_timeout(&mut self, timeout: timer::Timeout) -> timer::Timeout { |
246 pub fn replace_timeout(&mut self, timeout: timer::Timeout) -> timer::Timeout { |
236 replace(&mut self.timeout, timeout) |
247 replace(&mut self.timeout, timeout) |
|
248 } |
|
249 |
|
250 pub fn has_pending_sends(&self) -> bool { |
|
251 !self.buf_out.is_empty() |
237 } |
252 } |
238 } |
253 } |
239 |
254 |
240 #[cfg(feature = "tls-connections")] |
255 #[cfg(feature = "tls-connections")] |
241 struct ServerSsl { |
256 struct ServerSsl { |
347 |
362 |
348 Ok(()) |
363 Ok(()) |
349 } |
364 } |
350 |
365 |
351 fn deregister_client(&mut self, poll: &Poll, id: ClientId) { |
366 fn deregister_client(&mut self, poll: &Poll, id: ClientId) { |
352 if let Some(ref client) = self.clients.get(id) { |
367 if let Some(ref mut client) = self.clients.get_mut(id) { |
353 poll.deregister(client.socket.inner()) |
368 poll.deregister(client.socket.inner()) |
354 .expect("could not deregister socket"); |
369 .expect("could not deregister socket"); |
355 info!("client {} ({}) removed", client.id, client.peer_addr); |
370 if client.has_pending_sends() { |
356 self.clients.remove(id); |
371 info!( |
|
372 "client {} ({}) pending removal", |
|
373 client.id, client.peer_addr |
|
374 ); |
|
375 client.pending_close = true; |
|
376 poll.register( |
|
377 client.socket.inner(), |
|
378 Token(id), |
|
379 Ready::writable(), |
|
380 PollOpt::edge(), |
|
381 ) |
|
382 .unwrap_or_else(|_| { |
|
383 self.clients.remove(id); |
|
384 }); |
|
385 } else { |
|
386 info!("client {} ({}) removed", client.id, client.peer_addr); |
|
387 self.clients.remove(id); |
|
388 } |
357 #[cfg(feature = "official-server")] |
389 #[cfg(feature = "official-server")] |
358 self.io.cancel(id); |
390 self.io.cancel(id); |
359 } |
391 } |
360 } |
392 } |
361 |
393 |
581 |
613 |
582 match result { |
614 match result { |
583 Ok(((), state)) if state == NetworkClientState::NeedsWrite => { |
615 Ok(((), state)) if state == NetworkClientState::NeedsWrite => { |
584 self.pending.insert((client_id, state)); |
616 self.pending.insert((client_id, state)); |
585 } |
617 } |
586 Ok(_) => {} |
618 Ok(((), state)) if state == NetworkClientState::Closed => { |
|
619 self.deregister_client(poll, client_id); |
|
620 } |
|
621 Ok(_) => (), |
587 Err(e) => { |
622 Err(e) => { |
588 self.operation_failed(poll, client_id, &e, "Error while writing to client socket")? |
623 self.operation_failed(poll, client_id, &e, "Error while writing to client socket")? |
589 } |
624 } |
590 } |
625 } |
591 |
626 |