8 pub mod test; |
10 pub mod test; |
9 |
11 |
10 pub struct ProtocolDecoder { |
12 pub struct ProtocolDecoder { |
11 buf: netbuf::Buf, |
13 buf: netbuf::Buf, |
12 consumed: usize, |
14 consumed: usize, |
|
15 is_recovering: bool, |
13 } |
16 } |
14 |
17 |
15 impl ProtocolDecoder { |
18 impl ProtocolDecoder { |
16 pub fn new() -> ProtocolDecoder { |
19 pub fn new() -> ProtocolDecoder { |
17 ProtocolDecoder { |
20 ProtocolDecoder { |
18 buf: netbuf::Buf::new(), |
21 buf: netbuf::Buf::new(), |
19 consumed: 0, |
22 consumed: 0, |
|
23 is_recovering: false, |
20 } |
24 } |
21 } |
25 } |
22 |
26 |
|
27 fn recover(&mut self) -> bool { |
|
28 self.is_recovering = match parser::malformed_message(&self.buf[..]) { |
|
29 Ok((tail, ())) => { |
|
30 self.buf.consume(self.buf.len() - tail.len()); |
|
31 false |
|
32 } |
|
33 _ => { |
|
34 self.buf.consume(self.buf.len()); |
|
35 true |
|
36 } |
|
37 }; |
|
38 !self.is_recovering |
|
39 } |
|
40 |
23 pub fn read_from<R: Read>(&mut self, stream: &mut R) -> Result<usize> { |
41 pub fn read_from<R: Read>(&mut self, stream: &mut R) -> Result<usize> { |
24 self.buf.read_from(stream) |
42 let count = self.buf.read_from(stream)?; |
|
43 if count > 0 && self.is_recovering { |
|
44 self.recover(); |
|
45 } |
|
46 Ok(count) |
25 } |
47 } |
26 |
48 |
27 pub fn extract_messages(&mut self) -> Vec<messages::HWProtocolMessage> { |
49 pub fn extract_messages(&mut self) -> Vec<messages::HWProtocolMessage> { |
28 let parse_result = parser::extract_messages(&self.buf[..]); |
50 let mut messages = vec![]; |
29 match parse_result { |
51 let mut consumed = 0; |
30 Ok((tail, msgs)) => { |
52 if !self.is_recovering { |
31 self.consumed = self.buf.len() - self.consumed - tail.len(); |
53 loop { |
32 msgs |
54 match parser::message(&self.buf[consumed..]) { |
|
55 Ok((tail, message)) => { |
|
56 messages.push(message); |
|
57 consumed += self.buf.len() - tail.len(); |
|
58 } |
|
59 Err(nom::Err::Incomplete(_)) => break, |
|
60 Err(nom::Err::Failure(e)) | Err(nom::Err::Error(e)) => { |
|
61 debug!("Invalid message: {:?}", e); |
|
62 self.buf.consume(consumed); |
|
63 consumed = 0; |
|
64 if !self.recover() || self.buf.is_empty() { |
|
65 break; |
|
66 } |
|
67 } |
|
68 } |
33 } |
69 } |
34 _ => unreachable!(), |
|
35 } |
70 } |
36 } |
71 self.buf.consume(consumed); |
37 |
72 messages |
38 pub fn sweep(&mut self) { |
|
39 self.buf.consume(self.consumed); |
|
40 self.consumed = 0; |
|
41 } |
73 } |
42 } |
74 } |