1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
//! Send record as msgpack.
//!
//! ## Usage
//!
//! This trait is used as follows:
//!
//! ```no_run
//! extern crate fruently;
//! use fruently::fluent::Fluent;
//! use std::collections::HashMap;
//! use fruently::forwardable::MsgpackForwardable;
//!
//! fn main() {
//!     let mut obj: HashMap<String, String> = HashMap::new();
//!     obj.insert("name".to_string(), "fruently".to_string());
//!     let fruently = Fluent::new("127.0.0.1:24224", "test");
//!     let _ = fruently.post(&obj);
//! }
//! ```

use std::fmt::Debug;
use std::net::ToSocketAddrs;
use time;
use retry::retry_exponentially;
#[cfg(not(feature = "time-as-integer"))]
use event_record::EventRecord;
#[cfg(feature = "time-as-integer")]
use record::Record;
use error::FluentError;
use forwardable::MsgpackForwardable;
use fluent::Fluent;
use store_buffer;
use serde::ser::Serialize;

impl<'a, A: ToSocketAddrs> MsgpackForwardable for Fluent<'a, A> {
    /// Post record into Fluentd. Without time version.
    fn post<T>(self, record: T) -> Result<(), FluentError>
    where
        T: Serialize + Debug,
    {
        let time = time::now();
        self.post_with_time(record, time)
    }

    /// Post record into Fluentd. With time version.
    fn post_with_time<T>(self, record: T, time: time::Tm) -> Result<(), FluentError>
    where
        T: Serialize + Debug,
    {
        #[inline]
        #[cfg(feature = "time-as-integer")]
        fn make_record<T>(tag: String, time: time::Tm, record: T) -> Record<T>
        where
            T: Serialize + Debug,
        {
            Record::new(tag, time, record)
        }
        #[inline]
        #[cfg(not(feature = "time-as-integer"))]
        fn make_record<T>(tag: String, time: time::Tm, record: T) -> EventRecord<T>
        where
            T: Serialize + Debug,
        {
            EventRecord::new(tag, time, record)
        }
        let record = make_record(self.get_tag().into_owned(), time, record);
        let addr = self.get_addr();
        let (max, multiplier) = self.get_conf().into_owned().clone().build();
        match retry_exponentially(max, multiplier, || Fluent::closure_send_as_msgpack(addr, &record), |response| {
            response.is_ok()
        }) {
            Ok(_) => Ok(()),
            Err(err) => store_buffer::maybe_write_events(&self.get_conf(), record, From::from(err)),
        }
    }
}

#[cfg(test)]
#[cfg(feature = "fluentd")]
mod tests {
    use time;
    use fluent::Fluent;

    #[test]
    fn test_post() {
        use std::collections::HashMap;
        use forwardable::MsgpackForwardable;

        // 0.0.0.0 does not work in Windows.
        let fruently = Fluent::new("127.0.0.1:24224", "test");
        let mut obj: HashMap<String, String> = HashMap::new();
        obj.insert("hey".to_string(), "Rust with msgpack!".to_string());
        let result = fruently.post(obj).is_ok();
        assert_eq!(true, result);
    }

    #[test]
    fn test_post_with_time() {
        use std::collections::HashMap;
        use forwardable::MsgpackForwardable;

        // 0.0.0.0 does not work in Windows.
        let fruently = Fluent::new("127.0.0.1:24224", "test");
        let mut obj: HashMap<String, String> = HashMap::new();
        obj.insert("hey".to_string(), "Rust with msgpack!".to_string());
        let time = time::now();
        let result = fruently.post_with_time(obj, time).is_ok();
        assert_eq!(true, result);
    }
}