rust/hedgewars-checker/src/main.rs
changeset 15913 fe519de9c270
parent 15835 ad79e5c0885c
child 15916 0cd6996cd4c8
equal deleted inserted replaced
15912:5e8d2a8eb473 15913:fe519de9c270
     5 };
     5 };
     6 use ini::Ini;
     6 use ini::Ini;
     7 use log::{debug, info, warn};
     7 use log::{debug, info, warn};
     8 use netbuf::Buf;
     8 use netbuf::Buf;
     9 use std::{io::Write, str::FromStr};
     9 use std::{io::Write, str::FromStr};
    10 use tokio::{io, io::AsyncWriteExt, net::TcpStream, process::Command};
    10 use tokio::{io, io::AsyncWriteExt, net::TcpStream, process::Command, sync::mpsc};
    11 
    11 
    12 async fn check(executable: &str, data_prefix: &str, buffer: &[String]) -> Result<Vec<String>> {
    12 async fn check(executable: &str, data_prefix: &str, buffer: &[String]) -> Result<Vec<String>> {
    13     let mut replay = tempfile::NamedTempFile::new()?;
    13     let mut replay = tempfile::NamedTempFile::new()?;
    14 
    14 
    15     for line in buffer.into_iter() {
    15     for line in buffer.into_iter() {
    79             }
    79             }
    80             _ => break,
    80             _ => break,
    81         }
    81         }
    82     }
    82     }
    83 
    83 
    84     println!("Engine lines: {:?}", &result);
    84     // println!("Engine lines: {:?}", &result);
    85 
    85 
    86     if result.len() > 0 {
    86     if result.len() > 0 {
    87         Ok(result)
    87         Ok(result)
    88     } else {
    88     } else {
    89         bail!("no data from engine")
    89         bail!("no data from engine")
    90     }
    90     }
    91 }
    91 }
    92 
    92 
       
    93 async fn check_loop(
       
    94     executable: &str,
       
    95     data_prefix: &str,
       
    96     results_sender: mpsc::Sender<Result<Vec<String>>>,
       
    97     mut replay_receiver: mpsc::Receiver<Vec<String>>,
       
    98 ) -> Result<()> {
       
    99     while let Some(replay) = replay_receiver.recv().await {
       
   100         results_sender
       
   101             .send(check(executable, data_prefix, &replay).await)
       
   102             .await?;
       
   103     }
       
   104 
       
   105     Ok(())
       
   106 }
       
   107 
    93 async fn connect_and_run(
   108 async fn connect_and_run(
    94     username: &str,
   109     username: &str,
    95     password: &str,
   110     password: &str,
    96     protocol_number: u16,
   111     protocol_number: u16,
    97     executable: &str,
   112     replay_sender: mpsc::Sender<Vec<String>>,
    98     data_prefix: &str,
   113     mut results_receiver: mpsc::Receiver<Result<Vec<String>>>,
    99 ) -> Result<()> {
   114 ) -> Result<()> {
   100     info!("Connecting...");
   115     info!("Connecting...");
   101 
   116 
   102     let mut stream = TcpStream::connect("hedgewars.org:46631").await?;
   117     let mut stream = TcpStream::connect("hedgewars.org:46631").await?;
   103 
   118 
   104     let mut buf = Buf::new();
   119     let mut buf = Buf::new();
   105 
   120 
   106     let mut replay_lines: Option<Vec<String>> = None;
       
   107 
       
   108     loop {
   121     loop {
   109         let r = if let Some(ref lines) = replay_lines {
   122         let r = tokio::select! {
   110             let r = tokio::select! {
   123             _ = stream.readable() => None,
   111                 _ = stream.readable() => None,
   124             r = results_receiver.recv() => r
   112                 r = check(executable, data_prefix, &lines) => Some(r)
       
   113             };
       
   114 
       
   115             r
       
   116         } else {
       
   117             stream.readable().await?;
       
   118             None
       
   119         };
   125         };
   120 
   126 
   121         println!("Loop: {:?}", &r);
   127         //println!("Loop: {:?}", &r);
   122 
   128 
   123         if let Some(execution_result) = r {
   129         if let Some(execution_result) = r {
   124             replay_lines = None;
       
   125 
       
   126             match execution_result {
   130             match execution_result {
   127                 Ok(result) => {
   131                 Ok(result) => {
   128                     info!("Checked");
   132                     info!("Checked");
   129                     debug!("Check result: [{:?}]", result);
   133                     debug!("Check result: [{:?}]", result);
   130 
   134 
   171 
   175 
   172         while let Ok((tail, msg)) = parser::server_message(buf.as_ref()) {
   176         while let Ok((tail, msg)) = parser::server_message(buf.as_ref()) {
   173             let tail_len = tail.len();
   177             let tail_len = tail.len();
   174             buf.consume(buf.len() - tail_len);
   178             buf.consume(buf.len() - tail_len);
   175 
   179 
   176             println!("Message from server: {:?}", &msg);
   180             // println!("Message from server: {:?}", &msg);
   177 
   181 
   178             match msg {
   182             match msg {
   179                 Connected(_, _) => {
   183                 Connected(_, _) => {
   180                     info!("Connected");
   184                     info!("Connected");
   181                     stream
   185                     stream
   200                         .write(ClientMessage::CheckerReady.to_raw_protocol().as_bytes())
   204                         .write(ClientMessage::CheckerReady.to_raw_protocol().as_bytes())
   201                         .await?;
   205                         .await?;
   202                 }
   206                 }
   203                 Replay(lines) => {
   207                 Replay(lines) => {
   204                     info!("Got a replay");
   208                     info!("Got a replay");
   205                     replay_lines = Some(lines);
   209                     replay_sender.send(lines).await?;
   206                 }
   210                 }
   207                 Bye(message) => {
   211                 Bye(message) => {
   208                     warn!("Received BYE: {}", message);
   212                     warn!("Received BYE: {}", message);
   209                     return Ok(());
   213                     return Ok(());
   210                 }
   214                 }
   277 
   281 
   278     let protocol_number = get_protocol_number(&exe.as_str()).await.unwrap_or_default();
   282     let protocol_number = get_protocol_number(&exe.as_str()).await.unwrap_or_default();
   279 
   283 
   280     info!("Using protocol number {}", protocol_number);
   284     info!("Using protocol number {}", protocol_number);
   281 
   285 
   282     connect_and_run(&username, &password, protocol_number, &exe, &prefix).await
   286     let (replay_sender, replay_receiver) = mpsc::channel(1);
   283 }
   287     let (results_sender, results_receiver) = mpsc::channel(1);
       
   288 
       
   289     let (network_result, checker_result) = tokio::join!(
       
   290         connect_and_run(
       
   291             &username,
       
   292             &password,
       
   293             protocol_number,
       
   294             replay_sender,
       
   295             results_receiver
       
   296         ),
       
   297         check_loop(&exe, &prefix, results_sender, replay_receiver)
       
   298     );
       
   299 
       
   300     network_result?;
       
   301     checker_result
       
   302 }