--- a/rust/hedgewars-server/src/server/network.rs Tue Apr 09 00:45:14 2019 +0200
+++ b/rust/hedgewars-server/src/server/network.rs Tue Apr 09 21:08:35 2019 +0300
@@ -16,11 +16,16 @@
use netbuf;
use slab::Slab;
-use super::{core::HWServer, coretypes::ClientId, handlers, io::FileServerIO};
+use super::{core::FileServerIO, core::HWServer, coretypes::ClientId, handlers};
use crate::{
protocol::{messages::*, ProtocolDecoder},
utils,
};
+
+#[cfg(feature = "official-server")]
+use super::io::{IOThread, RequestId};
+
+use crate::server::handlers::{IoResult, IoTask};
#[cfg(feature = "tls-connections")]
use openssl::{
error::ErrorStack,
@@ -234,6 +239,56 @@
context: SslContext,
}
+#[cfg(feature = "official-server")]
+pub struct IoLayer {
+ next_request_id: RequestId,
+ request_queue: Vec<(RequestId, ClientId)>,
+ io_thread: IOThread,
+}
+
+#[cfg(feature = "official-server")]
+impl IoLayer {
+ fn new() -> Self {
+ Self {
+ next_request_id: 0,
+ request_queue: vec![],
+ io_thread: IOThread::new(),
+ }
+ }
+
+ fn send(&mut self, client_id: ClientId, task: IoTask) {
+ let request_id = self.next_request_id;
+ self.next_request_id += 1;
+ self.request_queue.push((request_id, client_id));
+ self.io_thread.send(request_id, task);
+ }
+
+ fn try_recv(&mut self) -> Option<(ClientId, IoResult)> {
+ let (request_id, result) = self.io_thread.try_recv()?;
+ if let Some(index) = self
+ .request_queue
+ .iter()
+ .position(|(id, _)| *id == request_id)
+ {
+ let (_, client_id) = self.request_queue.swap_remove(index);
+ Some((client_id, result))
+ } else {
+ None
+ }
+ }
+
+ fn cancel(&mut self, client_id: ClientId) {
+ let mut index = 0;
+ while index < self.request_queue.len() {
+ if self.request_queue[index].1 == client_id {
+ self.request_queue.swap_remove(index);
+ } else {
+ index += 1;
+ }
+ }
+ }
+}
+
pub struct NetworkLayer {
listener: TcpListener,
server: HWServer,
@@ -242,6 +297,8 @@
pending_cache: Vec<(ClientId, NetworkClientState)>,
#[cfg(feature = "tls-connections")]
ssl: ServerSsl,
+ #[cfg(feature = "official-server")]
+ io: IoLayer,
}
impl NetworkLayer {
@@ -259,6 +316,8 @@
pending_cache,
#[cfg(feature = "tls-connections")]
ssl: NetworkLayer::create_ssl_context(),
+ #[cfg(feature = "official-server")]
+ io: IoLayer::new(),
}
}
@@ -283,10 +342,15 @@
pub fn register_server(&self, poll: &Poll) -> io::Result<()> {
poll.register(
&self.listener,
- utils::SERVER,
+ utils::SERVER_TOKEN,
Ready::readable(),
PollOpt::edge(),
- )
+ )?;
+
+ #[cfg(feature = "official-server")]
+ self.io.io_thread.register_rx(poll, utils::IO_TOKEN)?;
+
+ Ok(())
}
fn deregister_client(&mut self, poll: &Poll, id: ClientId) {
@@ -299,6 +363,8 @@
}
if client_exists {
self.clients.remove(id);
+ #[cfg(feature = "official-server")]
+ self.io.cancel(id);
}
}
@@ -326,7 +392,7 @@
client_id
}
- fn flush_server_messages(&mut self, mut response: handlers::Response, poll: &Poll) {
+ fn handle_response(&mut self, mut response: handlers::Response, poll: &Poll) {
debug!("{} pending server messages", response.len());
let output = response.extract_messages(&mut self.server);
for (clients, message) in output {
@@ -344,6 +410,22 @@
for client_id in response.extract_removed_clients() {
self.deregister_client(poll, client_id);
}
+
+ #[cfg(feature = "official-server")]
+ {
+ let client_id = response.client_id();
+ for task in response.extract_io_tasks() {
+ self.io.send(client_id, task);
+ }
+ }
+ }
+
+ #[cfg(feature = "official-server")]
+ pub fn handle_io_result(&mut self) {
+ if let Some((client_id, result)) = self.io.try_recv() {
+ let mut response = handlers::Response::new(client_id);
+ handlers::handle_io_result(&mut self.server, client_id, &mut response, result);
+ }
}
fn create_client_socket(&self, socket: TcpStream) -> io::Result<ClientSocket> {
@@ -381,7 +463,7 @@
handlers::handle_client_accept(&mut self.server, client_id, &mut response);
if !response.is_empty() {
- self.flush_server_messages(response, poll);
+ self.handle_response(response, poll);
}
Ok(())
@@ -438,7 +520,7 @@
}
if !response.is_empty() {
- self.flush_server_messages(response, poll);
+ self.handle_response(response, poll);
}
Ok(())
@@ -469,7 +551,7 @@
self.deregister_client(poll, client_id);
let mut response = handlers::Response::new(client_id);
handlers::handle_client_loss(&mut self.server, client_id, &mut response);
- self.flush_server_messages(response, poll);
+ self.handle_response(response, poll);
Ok(())
}