1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::io;
use std::io::Cursor;
use std::io::prelude::*;
use std::net::TcpStream;
use std::string::FromUtf8Error;
use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt};

const RECV_BUF_SIZE: usize = 8192;
const GQTP_HEADER_SIZE: usize = 24;

#[derive(Debug)]
pub enum GQTPError {
    InvalidProtocol,
    InvalidBodySize,
    StatusError(u16),
    IO(io::Error),
    EncodingError(FromUtf8Error)
}

impl From<io::Error> for GQTPError {
    fn from(err: io::Error) -> GQTPError {
        GQTPError::IO(err)
    }
}

impl From<FromUtf8Error> for GQTPError {
    fn from(err: FromUtf8Error) -> GQTPError {
        GQTPError::EncodingError(err)
    }
}

/// Request [GQTP protocol](http://groonga.org/docs/spec/gqtp.html) over TcpStream
pub struct GQTPRequest<'a> {
    addr: &'a str,
}

impl<'a> Default for GQTPRequest<'a> {
    fn default() -> GQTPRequest<'a> {
        GQTPRequest { addr: "127.0.0.1:10043" }
    }
}

impl<'a> GQTPRequest<'a> {
    /// Create a GQTP client.
    pub fn new() -> GQTPRequest<'a> {
        GQTPRequest::default()
    }

    /// Set host address for GQTP server.
    ///
    /// # Examples
    ///
    /// ```
    /// extern crate ruroonga_client as groonga;
    ///
    /// groonga::GQTPRequest::new().with_addr("127.0.0.1:20043");
    /// ```
    pub fn with_addr(mut self, addr: &'a str) -> GQTPRequest<'a> {
        self.addr = addr;
        self
    }

    /// Send request and Receive response.
    pub fn call<C>(&self, command: C) -> Result<String, GQTPError>
        where C: AsRef<str>
    {
        // send
        let mut stream = try!(TcpStream::connect(self.addr));
        let mut send_buf = vec![];
        try!(send_buf.write_u8(0xc7));
        try!(send_buf.write_u8(0));
        try!(send_buf.write_i16::<BigEndian>(0));
        try!(send_buf.write_u8(0));
        try!(send_buf.write_u8(0x02));   // flags
        try!(send_buf.write_u16::<BigEndian>(0));
        try!(send_buf.write_u32::<BigEndian>(command.as_ref().len() as u32));
        try!(send_buf.write_u32::<BigEndian>(0));
        try!(send_buf.write_u64::<BigEndian>(0));
        send_buf.extend_from_slice(command.as_ref().as_bytes());
        let _ = stream.write_all(send_buf.as_slice());

        // receive and check protocol header value
        let mut read_buf = vec![0; RECV_BUF_SIZE];
        let _ = stream.read(&mut read_buf);
        let mut buf = Cursor::new(read_buf);

        let protocol = try!(buf.read_u8());
        let query_type = try!(buf.read_u8());
        if protocol != 0xc7 || query_type > 5 {
            return Err(GQTPError::InvalidProtocol);
        }
        let _ = try!(buf.read_i16::<BigEndian>());
        let _ = try!(buf.read_u8());

        let flags = try!(buf.read_u8());
        if !((flags & 0x01) == 0x01 || (flags & 0x02) == 0x02) {
            return Err(GQTPError::InvalidProtocol);
        }

        let status = try!(buf.read_u16::<BigEndian>());
        if status != 0 && status != 1 {
            return Err(GQTPError::StatusError(status));
        }
        let size = try!(buf.read_i32::<BigEndian>());
        let _ = try!(buf.read_i32::<BigEndian>());    // opaque
        let _ = try!(buf.read_i64::<BigEndian>());    // cas

        // read body
        let mut msg_buf_len = if (size as usize + GQTP_HEADER_SIZE) > RECV_BUF_SIZE {
            RECV_BUF_SIZE - GQTP_HEADER_SIZE
        } else {
            size as usize
        };
        let mut msg = vec![0; msg_buf_len];
        let _ = try!(buf.read(&mut msg));
        if (size as usize + GQTP_HEADER_SIZE) > RECV_BUF_SIZE {
            loop {
                let mut read_buf = vec![0; RECV_BUF_SIZE];
                let rsize = try!(stream.read(&mut read_buf));
                msg.extend_from_slice(read_buf.as_ref());
                msg_buf_len += rsize;
                if msg_buf_len >= size as usize {
                    break;
                }
            }
        }

        Ok(try!(String::from_utf8(msg)))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn smoke_gqtp() {
        let req = GQTPRequest::new();
        assert_eq!("127.0.0.1:10043", req.addr)
    }

    #[test]
    fn smoke_gqtp_with_addr() {
        let req = GQTPRequest::new().with_addr("127.0.0.1:20043");
        assert_eq!("127.0.0.1:20043", req.addr)
    }
}