rust/hedgewars-server/src/server/network.rs
changeset 14779 f43ab2bd76ae
parent 14697 f64e21f164a5
child 14780 65861ba8b4e8
--- a/rust/hedgewars-server/src/server/network.rs	Tue Apr 09 00:45:14 2019 +0200
+++ b/rust/hedgewars-server/src/server/network.rs	Tue Apr 09 21:08:35 2019 +0300
@@ -16,11 +16,16 @@
 use netbuf;
 use slab::Slab;
 
-use super::{core::HWServer, coretypes::ClientId, handlers, io::FileServerIO};
+use super::{core::FileServerIO, core::HWServer, coretypes::ClientId, handlers};
 use crate::{
     protocol::{messages::*, ProtocolDecoder},
     utils,
 };
+
+#[cfg(feature = "official-server")]
+use super::io::{IOThread, RequestId};
+
+use crate::server::handlers::{IoResult, IoTask};
 #[cfg(feature = "tls-connections")]
 use openssl::{
     error::ErrorStack,
@@ -234,6 +239,56 @@
     context: SslContext,
 }
 
+#[cfg(feature = "official-server")]
+pub struct IoLayer {
+    next_request_id: RequestId,
+    request_queue: Vec<(RequestId, ClientId)>,
+    io_thread: IOThread,
+}
+
+#[cfg(feature = "official-server")]
+impl IoLayer {
+    fn new() -> Self {
+        Self {
+            next_request_id: 0,
+            request_queue: vec![],
+            io_thread: IOThread::new(),
+        }
+    }
+
+    fn send(&mut self, client_id: ClientId, task: IoTask) {
+        let request_id = self.next_request_id;
+        self.next_request_id += 1;
+        self.request_queue.push((request_id, client_id));
+        self.io_thread.send(request_id, task);
+    }
+
+    fn try_recv(&mut self) -> Option<(ClientId, IoResult)> {
+        let (request_id, result) = self.io_thread.try_recv()?;
+        if let Some(index) = self
+            .request_queue
+            .iter()
+            .position(|(id, _)| *id == request_id)
+        {
+            let (_, client_id) = self.request_queue.swap_remove(index);
+            Some((client_id, result))
+        } else {
+            None
+        }
+    }
+
+    fn cancel(&mut self, client_id: ClientId) {
+        let mut index = 0;
+        while index < self.request_queue.len() {
+            if self.request_queue[index].1 == client_id {
+                self.request_queue.swap_remove(index);
+            } else {
+                index += 1;
+            }
+        }
+    }
+}
+
 pub struct NetworkLayer {
     listener: TcpListener,
     server: HWServer,
@@ -242,6 +297,8 @@
     pending_cache: Vec<(ClientId, NetworkClientState)>,
     #[cfg(feature = "tls-connections")]
     ssl: ServerSsl,
+    #[cfg(feature = "official-server")]
+    io: IoLayer,
 }
 
 impl NetworkLayer {
@@ -259,6 +316,8 @@
             pending_cache,
             #[cfg(feature = "tls-connections")]
             ssl: NetworkLayer::create_ssl_context(),
+            #[cfg(feature = "official-server")]
+            io: IoLayer::new(),
         }
     }
 
@@ -283,10 +342,15 @@
     pub fn register_server(&self, poll: &Poll) -> io::Result<()> {
         poll.register(
             &self.listener,
-            utils::SERVER,
+            utils::SERVER_TOKEN,
             Ready::readable(),
             PollOpt::edge(),
-        )
+        )?;
+
+        #[cfg(feature = "official-server")]
+        self.io.io_thread.register_rx(poll, utils::IO_TOKEN)?;
+
+        Ok(())
     }
 
     fn deregister_client(&mut self, poll: &Poll, id: ClientId) {
@@ -299,6 +363,8 @@
         }
         if client_exists {
             self.clients.remove(id);
+            #[cfg(feature = "official-server")]
+            self.io.cancel(id);
         }
     }
 
@@ -326,7 +392,7 @@
         client_id
     }
 
-    fn flush_server_messages(&mut self, mut response: handlers::Response, poll: &Poll) {
+    fn handle_response(&mut self, mut response: handlers::Response, poll: &Poll) {
         debug!("{} pending server messages", response.len());
         let output = response.extract_messages(&mut self.server);
         for (clients, message) in output {
@@ -344,6 +410,22 @@
         for client_id in response.extract_removed_clients() {
             self.deregister_client(poll, client_id);
         }
+
+        #[cfg(feature = "official-server")]
+        {
+            let client_id = response.client_id();
+            for task in response.extract_io_tasks() {
+                self.io.send(client_id, task);
+            }
+        }
+    }
+
+    #[cfg(feature = "official-server")]
+    pub fn handle_io_result(&mut self) {
+        if let Some((client_id, result)) = self.io.try_recv() {
+            let mut response = handlers::Response::new(client_id);
+            handlers::handle_io_result(&mut self.server, client_id, &mut response, result);
+        }
     }
 
     fn create_client_socket(&self, socket: TcpStream) -> io::Result<ClientSocket> {
@@ -381,7 +463,7 @@
         handlers::handle_client_accept(&mut self.server, client_id, &mut response);
 
         if !response.is_empty() {
-            self.flush_server_messages(response, poll);
+            self.handle_response(response, poll);
         }
 
         Ok(())
@@ -438,7 +520,7 @@
         }
 
         if !response.is_empty() {
-            self.flush_server_messages(response, poll);
+            self.handle_response(response, poll);
         }
 
         Ok(())
@@ -469,7 +551,7 @@
         self.deregister_client(poll, client_id);
         let mut response = handlers::Response::new(client_id);
         handlers::handle_client_loss(&mut self.server, client_id, &mut response);
-        self.flush_server_messages(response, poll);
+        self.handle_response(response, poll);
 
         Ok(())
     }