1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::io;
use std::io::Cursor;
use std::io::prelude::*;
use std::net::TcpStream;
use std::string::FromUtf8Error;
use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt};
const RECV_BUF_SIZE: usize = 8192;
const GQTP_HEADER_SIZE: usize = 24;
#[derive(Debug)]
pub enum GQTPError {
InvalidProtocol,
InvalidBodySize,
StatusError(u16),
IO(io::Error),
EncodingError(FromUtf8Error)
}
impl From<io::Error> for GQTPError {
fn from(err: io::Error) -> GQTPError {
GQTPError::IO(err)
}
}
impl From<FromUtf8Error> for GQTPError {
fn from(err: FromUtf8Error) -> GQTPError {
GQTPError::EncodingError(err)
}
}
pub struct GQTPRequest<'a> {
addr: &'a str,
}
impl<'a> Default for GQTPRequest<'a> {
fn default() -> GQTPRequest<'a> {
GQTPRequest { addr: "127.0.0.1:10043" }
}
}
impl<'a> GQTPRequest<'a> {
pub fn new() -> GQTPRequest<'a> {
GQTPRequest::default()
}
pub fn with_addr(mut self, addr: &'a str) -> GQTPRequest<'a> {
self.addr = addr;
self
}
pub fn call<C>(&self, command: C) -> Result<String, GQTPError>
where C: AsRef<str>
{
let mut stream = try!(TcpStream::connect(self.addr));
let mut send_buf = vec![];
try!(send_buf.write_u8(0xc7));
try!(send_buf.write_u8(0));
try!(send_buf.write_i16::<BigEndian>(0));
try!(send_buf.write_u8(0));
try!(send_buf.write_u8(0x02));
try!(send_buf.write_u16::<BigEndian>(0));
try!(send_buf.write_u32::<BigEndian>(command.as_ref().len() as u32));
try!(send_buf.write_u32::<BigEndian>(0));
try!(send_buf.write_u64::<BigEndian>(0));
send_buf.extend_from_slice(command.as_ref().as_bytes());
let _ = stream.write_all(send_buf.as_slice());
let mut read_buf = vec![0; RECV_BUF_SIZE];
let _ = stream.read(&mut read_buf);
let mut buf = Cursor::new(read_buf);
let protocol = try!(buf.read_u8());
let query_type = try!(buf.read_u8());
if protocol != 0xc7 || query_type > 5 {
return Err(GQTPError::InvalidProtocol);
}
let _ = try!(buf.read_i16::<BigEndian>());
let _ = try!(buf.read_u8());
let flags = try!(buf.read_u8());
if !((flags & 0x01) == 0x01 || (flags & 0x02) == 0x02) {
return Err(GQTPError::InvalidProtocol);
}
let status = try!(buf.read_u16::<BigEndian>());
if status != 0 && status != 1 {
return Err(GQTPError::StatusError(status));
}
let size = try!(buf.read_i32::<BigEndian>());
let _ = try!(buf.read_i32::<BigEndian>());
let _ = try!(buf.read_i64::<BigEndian>());
let mut msg_buf_len = if (size as usize + GQTP_HEADER_SIZE) > RECV_BUF_SIZE {
RECV_BUF_SIZE - GQTP_HEADER_SIZE
} else {
size as usize
};
let mut msg = vec![0; msg_buf_len];
let _ = try!(buf.read(&mut msg));
if (size as usize + GQTP_HEADER_SIZE) > RECV_BUF_SIZE {
loop {
let mut read_buf = vec![0; RECV_BUF_SIZE];
let rsize = try!(stream.read(&mut read_buf));
msg.extend_from_slice(read_buf.as_ref());
msg_buf_len += rsize;
if msg_buf_len >= size as usize {
break;
}
}
}
Ok(try!(String::from_utf8(msg)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn smoke_gqtp() {
let req = GQTPRequest::new();
assert_eq!("127.0.0.1:10043", req.addr)
}
#[test]
fn smoke_gqtp_with_addr() {
let req = GQTPRequest::new().with_addr("127.0.0.1:20043");
assert_eq!("127.0.0.1:20043", req.addr)
}
}