1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//! Implement record manupulation mechanisms.

use time;
use time::Tm;
use serde_json;
use serde::ser::{Serialize, Serializer};
use serde::ser::SerializeTuple;
use dumpable::Dumpable;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Record<T: Serialize> {
    tag: String,
    time: Tm,
    record: T,
}

impl<T: Serialize> Record<T> {
    pub fn new(tag: String, time: Tm, record: T) -> Record<T> {
        Record {
            tag: tag,
            time: time,
            record: record,
        }
    }
}

impl<T: Serialize> Dumpable for Record<T> {
    fn dump(self) -> String {
        format!(
            "{}\t{}\t{}\n",
            time::strftime("%FT%T%z", &self.time).unwrap(),
            self.tag,
            serde_json::to_string(&self.record).unwrap()
        )
    }
}

/// Construct custom encoding json/msgpack style.
///
/// Because `Record` struct should map following style json/msgpack:
///
/// `[tag, unixtime/eventtime, record]`
///
/// ref: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v0#message-mode
impl<T: Serialize> Serialize for Record<T> {
    fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
        let mut seq = s.serialize_tuple(4)?;
        seq.serialize_element(&self.tag)?;
        seq.serialize_element(&self.time.to_timespec().sec)?;
        seq.serialize_element(&self.record)?;
        seq.serialize_element(&None::<T>)?;
        seq.end()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use time;
    use std::collections::HashMap;
    use serde_json;

    #[test]
    fn test_json_format() {
        let tag = "fruently".to_string();
        let time = time::now();
        let mut obj: HashMap<String, String> = HashMap::new();
        obj.insert("name".to_string(), "fruently".to_string());
        let record = Record::new(tag.clone(), time, obj.clone());
        let forwardable_json = serde_json::to_string(&record).ok().unwrap();
        let json_tag = serde_json::to_string(&tag.clone()).ok().unwrap();
        let json_obj = serde_json::to_string(&obj.clone()).ok().unwrap();
        let expected = format!(
            "[{},{},{},{}]",
            json_tag,
            time.to_timespec().sec,
            json_obj,
            serde_json::Value::Null
        );
        assert_eq!(expected, forwardable_json);
    }
}