|
1 extern crate slab; |
|
2 |
|
3 use std::io::ErrorKind; |
|
4 use mio::net::*; |
|
5 use super::server::{HWServer, PendingMessage, Destination}; |
|
6 use super::client::ClientId; |
|
7 use slab::Slab; |
|
8 |
|
9 use mio::net::TcpStream; |
|
10 use mio::*; |
|
11 use std::io::Write; |
|
12 use std::io; |
|
13 use netbuf; |
|
14 |
|
15 use utils; |
|
16 use protocol::ProtocolDecoder; |
|
17 use protocol::messages::*; |
|
18 use std::net::SocketAddr; |
|
19 |
|
20 pub struct NetworkClient { |
|
21 id: ClientId, |
|
22 socket: TcpStream, |
|
23 peer_addr: SocketAddr, |
|
24 decoder: ProtocolDecoder, |
|
25 buf_out: netbuf::Buf, |
|
26 closed: bool |
|
27 } |
|
28 |
|
29 impl NetworkClient { |
|
30 pub fn new(id: ClientId, socket: TcpStream, peer_addr: SocketAddr) -> NetworkClient { |
|
31 NetworkClient { |
|
32 id, socket, peer_addr, |
|
33 decoder: ProtocolDecoder::new(), |
|
34 buf_out: netbuf::Buf::new(), |
|
35 closed: false |
|
36 } |
|
37 } |
|
38 |
|
39 pub fn send_raw_msg(&mut self, msg: &[u8]) { |
|
40 self.buf_out.write(msg).unwrap(); |
|
41 self.flush(); |
|
42 } |
|
43 |
|
44 pub fn send_string(&mut self, msg: &String) { |
|
45 self.send_raw_msg(&msg.as_bytes()); |
|
46 } |
|
47 |
|
48 pub fn send_msg(&mut self, msg: HWServerMessage) { |
|
49 self.send_string(&msg.to_raw_protocol()); |
|
50 } |
|
51 |
|
52 fn flush(&mut self) { |
|
53 self.buf_out.write_to(&mut self.socket).unwrap(); |
|
54 self.socket.flush().unwrap(); |
|
55 } |
|
56 |
|
57 pub fn read_messages(&mut self) -> io::Result<Vec<HWProtocolMessage>> { |
|
58 let bytes_read = self.decoder.read_from(&mut self.socket)?; |
|
59 debug!("Read {} bytes", bytes_read); |
|
60 |
|
61 if bytes_read == 0 { |
|
62 self.closed = true; |
|
63 info!("EOF for client {} ({})", self.id, self.peer_addr); |
|
64 } |
|
65 |
|
66 Ok(self.decoder.extract_messages()) |
|
67 } |
|
68 |
|
69 pub fn write_messages(&mut self) -> io::Result<()> { |
|
70 self.buf_out.write_to(&mut self.socket)?; |
|
71 Ok(()) |
|
72 } |
|
73 } |
|
74 |
|
75 pub struct NetworkLayer { |
|
76 listener: TcpListener, |
|
77 server: HWServer, |
|
78 |
|
79 clients: Slab<NetworkClient> |
|
80 } |
|
81 |
|
82 impl NetworkLayer { |
|
83 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
|
84 let server = HWServer::new(clients_limit, rooms_limit); |
|
85 let clients = Slab::with_capacity(clients_limit); |
|
86 NetworkLayer {listener, server, clients} |
|
87 } |
|
88 |
|
89 pub fn register_server(&self, poll: &Poll) -> io::Result<()> { |
|
90 poll.register(&self.listener, utils::SERVER, Ready::readable(), |
|
91 PollOpt::edge()) |
|
92 } |
|
93 |
|
94 fn deregister_client(&mut self, poll: &Poll, id: ClientId) { |
|
95 let mut client_exists = false; |
|
96 if let Some(ref client) = self.clients.get_mut(id) { |
|
97 poll.deregister(&client.socket) |
|
98 .ok().expect("could not deregister socket"); |
|
99 info!("client {} ({}) removed", client.id, client.peer_addr); |
|
100 client_exists = true; |
|
101 } |
|
102 if client_exists { |
|
103 self.clients.remove(id); |
|
104 } |
|
105 } |
|
106 |
|
107 fn register_client(&mut self, poll: &Poll, id: ClientId, client_socket: TcpStream, addr: SocketAddr) { |
|
108 poll.register(&client_socket, Token(id), |
|
109 Ready::readable() | Ready::writable(), |
|
110 PollOpt::edge()) |
|
111 .ok().expect("could not register socket with event loop"); |
|
112 |
|
113 let entry = self.clients.vacant_entry(); |
|
114 let client = NetworkClient::new(id, client_socket, addr); |
|
115 info!("client {} ({}) added", client.id, client.peer_addr); |
|
116 entry.insert(client); |
|
117 } |
|
118 |
|
119 pub fn accept_client(&mut self, poll: &Poll) -> io::Result<()> { |
|
120 let (client_socket, addr) = self.listener.accept()?; |
|
121 info!("Connected: {}", addr); |
|
122 |
|
123 let client_id = self.server.add_client(); |
|
124 self.register_client(poll, client_id, client_socket, addr); |
|
125 self.flush_server_messages(); |
|
126 |
|
127 Ok(()) |
|
128 } |
|
129 |
|
130 fn flush_server_messages(&mut self) { |
|
131 for PendingMessage(destination, msg) in self.server.output.drain(..) { |
|
132 match destination { |
|
133 Destination::ToSelf(id) => { |
|
134 if let Some(ref mut client) = self.clients.get_mut(id) { |
|
135 client.send_msg(msg) |
|
136 } |
|
137 } |
|
138 Destination::ToOthers(id) => { |
|
139 let msg_string = msg.to_raw_protocol(); |
|
140 for item in self.clients.iter_mut() { |
|
141 if item.0 != id { |
|
142 item.1.send_string(&msg_string) |
|
143 } |
|
144 } |
|
145 } |
|
146 } |
|
147 } |
|
148 } |
|
149 |
|
150 pub fn client_readable(&mut self, poll: &Poll, |
|
151 client_id: ClientId) -> io::Result<()> { |
|
152 let mut client_lost = false; |
|
153 let messages; |
|
154 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
|
155 messages = match client.read_messages() { |
|
156 Ok(messages) => Some(messages), |
|
157 Err(ref error) if error.kind() == ErrorKind::WouldBlock => None, |
|
158 Err(error) => return Err(error) |
|
159 }; |
|
160 if client.closed { |
|
161 client_lost = true; |
|
162 } |
|
163 } else { |
|
164 warn!("invalid readable client: {}", client_id); |
|
165 messages = None; |
|
166 }; |
|
167 |
|
168 if client_lost { |
|
169 self.client_error(&poll, client_id)?; |
|
170 } else if let Some(msg) = messages { |
|
171 for message in msg { |
|
172 self.server.handle_msg(client_id, message); |
|
173 } |
|
174 self.flush_server_messages(); |
|
175 } |
|
176 |
|
177 if !self.server.removed_clients.is_empty() { |
|
178 let ids = self.server.removed_clients.to_vec(); |
|
179 self.server.removed_clients.clear(); |
|
180 for client_id in ids { |
|
181 self.deregister_client(poll, client_id); |
|
182 } |
|
183 } |
|
184 |
|
185 Ok(()) |
|
186 } |
|
187 |
|
188 pub fn client_writable(&mut self, poll: &Poll, |
|
189 client_id: ClientId) -> io::Result<()> { |
|
190 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
|
191 match client.write_messages() { |
|
192 Ok(_) => (), |
|
193 Err(ref error) if error.kind() == ErrorKind::WouldBlock => (), |
|
194 Err(error) => return Err(error) |
|
195 } |
|
196 } else { |
|
197 warn!("invalid writable client: {}", client_id); |
|
198 } |
|
199 |
|
200 Ok(()) |
|
201 } |
|
202 |
|
203 pub fn client_error(&mut self, poll: &Poll, |
|
204 client_id: ClientId) -> io::Result<()> { |
|
205 self.deregister_client(poll, client_id); |
|
206 self.server.client_lost(client_id); |
|
207 |
|
208 Ok(()) |
|
209 } |
|
210 } |
|
211 |