# HG changeset patch # User unC0Rr # Date 1564060680 -7200 # Node ID 07e909ba420344c0236eea4e78cc3a85ffba1097 # Parent 7515ae6010bb10d6c3f8323c34babadba692351f Implement ipc queue which takes care of message ordering and timestamps diff -r 7515ae6010bb -r 07e909ba4203 rust/lib-hedgewars-engine/Cargo.toml --- a/rust/lib-hedgewars-engine/Cargo.toml Thu Jul 25 14:23:25 2019 +0200 +++ b/rust/lib-hedgewars-engine/Cargo.toml Thu Jul 25 15:18:00 2019 +0200 @@ -9,6 +9,7 @@ netbuf = "0.4" itertools = "0.8" png = "0.13" +queues = "1.1" fpnum = { path = "../fpnum" } land2d = { path = "../land2d" } diff -r 7515ae6010bb -r 07e909ba4203 rust/lib-hedgewars-engine/src/instance.rs --- a/rust/lib-hedgewars-engine/src/instance.rs Thu Jul 25 14:23:25 2019 +0200 +++ b/rust/lib-hedgewars-engine/src/instance.rs Thu Jul 25 15:18:00 2019 +0200 @@ -6,11 +6,12 @@ use integral_geometry::{Point, Rect, Size}; use landgen::outline_template::OutlineTemplate; -use super::{ipc::IPC, world::World}; +use super::{ipc::*, world::World}; pub struct EngineInstance { pub world: World, - pub ipc: IPC, + pub ipc_channel: Channel, + ipc_queue: MessagesQueue, } impl EngineInstance { @@ -34,7 +35,8 @@ Self { world, - ipc: IPC::new(), + ipc_channel: Channel::new(), + ipc_queue: MessagesQueue::new(QueueChatStrategy::LocalGame), } } @@ -57,7 +59,11 @@ } pub fn process_ipc_queue(&mut self) { - let messages: Vec = self.ipc.iter().collect(); + for message in self.ipc_channel.iter() { + self.ipc_queue.push(message); + } + + let messages: Vec = self.ipc_queue.iter(0).collect(); for message in messages { println!("Processing message: {:?}", message); diff -r 7515ae6010bb -r 07e909ba4203 rust/lib-hedgewars-engine/src/ipc.rs --- a/rust/lib-hedgewars-engine/src/ipc.rs Thu Jul 25 14:23:25 2019 +0200 +++ b/rust/lib-hedgewars-engine/src/ipc.rs Thu Jul 25 15:18:00 2019 +0200 @@ -1,67 +1,5 @@ -use hedgewars_engine_messages::{messages::*, parser::extract_message}; -use netbuf::*; -use std::io::*; - -pub struct IPC { - in_buffer: Buf, - out_buffer: Buf, -} - -impl IPC { - pub fn new() -> Self { - Self { - in_buffer: Buf::new(), - out_buffer: Buf::new(), - } - } - - pub fn send_message(&mut self, message: &EngineMessage) { - self.out_buffer.write(&message.to_bytes()).unwrap(); - } - - pub fn iter(&mut self) -> IPCMessagesIterator { - IPCMessagesIterator::new(self) - } -} - -impl Write for IPC { - fn write(&mut self, buf: &[u8]) -> Result { - self.in_buffer.write(buf) - } +mod channel; +mod queue; - fn flush(&mut self) -> Result<()> { - self.in_buffer.flush() - } -} - -impl Read for IPC { - fn read(&mut self, buf: &mut [u8]) -> Result { - let read_bytes = self.out_buffer.as_ref().read(buf)?; - - self.out_buffer.consume(read_bytes); - - Ok(read_bytes) - } -} - -pub struct IPCMessagesIterator<'a> { - ipc: &'a mut IPC, -} - -impl<'a> IPCMessagesIterator<'a> { - pub fn new(ipc: &'a mut IPC) -> Self { - Self { ipc } - } -} - -impl<'a> Iterator for IPCMessagesIterator<'a> { - type Item = EngineMessage; - - fn next(&mut self) -> Option { - let (consumed, message) = extract_message(&self.ipc.in_buffer[..])?; - - self.ipc.in_buffer.consume(consumed); - - Some(message) - } -} +pub use self::channel::*; +pub use self::queue::*; diff -r 7515ae6010bb -r 07e909ba4203 rust/lib-hedgewars-engine/src/ipc/channel.rs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rust/lib-hedgewars-engine/src/ipc/channel.rs Thu Jul 25 15:18:00 2019 +0200 @@ -0,0 +1,67 @@ +use hedgewars_engine_messages::{messages::*, parser::extract_message}; +use netbuf::*; +use std::io::*; + +pub struct Channel { + in_buffer: Buf, + out_buffer: Buf, +} + +impl Channel { + pub fn new() -> Self { + Self { + in_buffer: Buf::new(), + out_buffer: Buf::new(), + } + } + + pub fn send_message(&mut self, message: &EngineMessage) { + self.out_buffer.write(&message.to_bytes()).unwrap(); + } + + pub fn iter(&mut self) -> IPCMessagesIterator { + IPCMessagesIterator::new(self) + } +} + +impl Write for Channel { + fn write(&mut self, buf: &[u8]) -> Result { + self.in_buffer.write(buf) + } + + fn flush(&mut self) -> Result<()> { + self.in_buffer.flush() + } +} + +impl Read for Channel { + fn read(&mut self, buf: &mut [u8]) -> Result { + let read_bytes = self.out_buffer.as_ref().read(buf)?; + + self.out_buffer.consume(read_bytes); + + Ok(read_bytes) + } +} + +pub struct IPCMessagesIterator<'a> { + ipc: &'a mut Channel, +} + +impl<'a> IPCMessagesIterator<'a> { + pub fn new(ipc: &'a mut Channel) -> Self { + Self { ipc } + } +} + +impl<'a> Iterator for IPCMessagesIterator<'a> { + type Item = EngineMessage; + + fn next(&mut self) -> Option { + let (consumed, message) = extract_message(&self.ipc.in_buffer[..])?; + + self.ipc.in_buffer.consume(consumed); + + Some(message) + } +} diff -r 7515ae6010bb -r 07e909ba4203 rust/lib-hedgewars-engine/src/ipc/queue.rs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rust/lib-hedgewars-engine/src/ipc/queue.rs Thu Jul 25 15:18:00 2019 +0200 @@ -0,0 +1,115 @@ +use hedgewars_engine_messages::{ + messages::EngineMessage::*, messages::SyncedEngineMessage::*, + messages::UnsyncedEngineMessage::*, messages::*, +}; +use queues::*; + +#[derive(PartialEq)] +pub enum QueueChatStrategy { + NetworkGame, + LocalGame, +} + +pub struct MessagesQueue { + strategy: QueueChatStrategy, + hi_ticks: u32, + unordered: Queue, + ordered: Queue, +} + +impl MessagesQueue { + pub fn new(strategy: QueueChatStrategy) -> Self { + MessagesQueue { + strategy, + hi_ticks: 0, + unordered: queue![], + ordered: queue![], + } + } + + fn is_unordered(&self, message: &EngineMessage) -> bool { + match message { + Unordered(_) => true, + Unsynced(HogSay(_)) | Unsynced(ChatMessage(_)) | Unsynced(TeamMessage(_)) => { + self.strategy == QueueChatStrategy::NetworkGame + } + _ => false, + } + } + + pub fn push(&mut self, engine_message: EngineMessage) { + if self.is_unordered(&engine_message) { + self.unordered.add(engine_message).unwrap(); + } else if let Synced(TimeWrap, timestamp) = engine_message { + self.ordered + .add(Synced(TimeWrap, timestamp + self.hi_ticks)) + .unwrap(); + self.hi_ticks += 65536; + } else if let Synced(message, timestamp) = engine_message { + self.ordered + .add(Synced(message, timestamp + self.hi_ticks)) + .unwrap(); + } else { + self.ordered.add(engine_message).unwrap(); + } + } + + pub fn pop(&mut self, timestamp: u32) -> Option { + if let Ok(message) = self.unordered.remove() { + Some(message) + } else if let Ok(Synced(_, message_timestamp)) = self.ordered.peek() { + if message_timestamp == timestamp { + self.ordered.remove().ok() + } else { + None + } + } else { + self.ordered.remove().ok() + } + } + + pub fn iter(&mut self, timestamp: u32) -> MessagesQueueIterator { + MessagesQueueIterator { + timestamp, + queue: self, + } + } +} + +pub struct MessagesQueueIterator<'a> { + timestamp: u32, + queue: &'a mut MessagesQueue, +} + +impl<'a> Iterator for MessagesQueueIterator<'a> { + type Item = EngineMessage; + + fn next(&mut self) -> Option { + self.queue.pop(self.timestamp) + } +} + +#[test] +fn queue_order() { + use hedgewars_engine_messages::messages::UnorderedEngineMessage::*; + + let mut queue = MessagesQueue::new(QueueChatStrategy::LocalGame); + + queue.push(Synced(Skip, 1)); + queue.push(Unsynced(ChatMessage("hi".to_string()))); + queue.push(Synced(TimeWrap, 65535)); + queue.push(Unordered(Ping)); + queue.push(Synced(Skip, 2)); + + let zero_tick: Vec = queue.iter(0).collect(); + assert_eq!(zero_tick, vec![Unordered(Ping)]); + assert_eq!(queue.pop(1), Some(Synced(Skip, 1))); + assert_eq!(queue.pop(1), Some(Unsynced(ChatMessage("hi".to_string())))); + assert_eq!(queue.pop(1), None); + assert_eq!(queue.pop(2), None); + assert_eq!(queue.pop(65535), Some(Synced(TimeWrap, 65535))); + assert_eq!(queue.pop(65535), None); + assert_eq!(queue.pop(65538), Some(Synced(Skip, 65538))); + assert_eq!(queue.pop(65538), None); + assert_eq!(queue.pop(65539), None); +} diff -r 7515ae6010bb -r 07e909ba4203 rust/lib-hedgewars-engine/src/lib.rs --- a/rust/lib-hedgewars-engine/src/lib.rs Thu Jul 25 14:23:25 2019 +0200 +++ b/rust/lib-hedgewars-engine/src/lib.rs Thu Jul 25 15:18:00 2019 +0200 @@ -118,7 +118,7 @@ pub extern "C" fn send_ipc(engine_state: &mut EngineInstance, buf: *const u8, size: usize) { unsafe { (*engine_state) - .ipc + .ipc_channel .write(std::slice::from_raw_parts(buf, size)) .unwrap(); } @@ -128,7 +128,7 @@ pub extern "C" fn read_ipc(engine_state: &mut EngineInstance, buf: *mut u8, size: usize) -> usize { unsafe { (*engine_state) - .ipc + .ipc_channel .read(std::slice::from_raw_parts_mut(buf, size)) .unwrap_or(0) }