1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//! Store buffer when failing to send events.

use std::fs::File;
use std::path::PathBuf;
use std::io;
use std::io::Write;
use std::fmt::Debug;
use retry_conf::RetryConf;
use error::FluentError;
use std::fs::OpenOptions;
use serde::ser::Serialize;
use dumpable::Dumpable;

/// Create file with write, create, append, and open option.
fn ensure_file_with_wca(path: PathBuf) -> Result<File, io::Error> {
    let file = OpenOptions::new()
        .write(true)
        .create(true)
        .append(true)
        .open(path)?;
    Ok(file)
}

/// Write events buffer into file with TSV format.
pub fn maybe_write_events<T>(conf: &RetryConf, events: T, err: FluentError) -> Result<(), FluentError>
where
    T: Serialize + Dumpable + Debug,
{
    let store_needed = conf.clone().need_to_store();
    let store_path = conf.clone().store_path();
    if store_needed {
        match ensure_file_with_wca(store_path.clone().unwrap()) {
            Ok(mut f) => {
                let mut w = Vec::new();
                write!(&mut w, "{}", events.dump()).unwrap();
                f.write_all(&w)?;
                f.sync_data()?;
                Err(FluentError::FileStored(format!(
                    "stored buffer in specified file: \
                                                     {:?}",
                    store_path.unwrap()
                )))
            },
            Err(e) => Err(From::from(e)),
        }
    } else {
        Err(err)
    }
}

#[cfg(test)]
mod tests {
    extern crate tempdir;
    use super::*;
    use time;
    use std::collections::HashMap;
    use self::tempdir::TempDir;
    use record::Record;
    use retry_conf::RetryConf;
    use error::FluentError;
    use forwardable::forward::Forward;
    #[cfg(not(feature = "time-as-integer"))]
    use event_time::EventTime;
    #[cfg(not(feature = "time-as-integer"))]
    use event_record::EventRecord;

    #[test]
    fn test_write_record() {
        let tag = "fruently".to_string();
        let time = time::now();
        let mut obj: HashMap<String, String> = HashMap::new();
        obj.insert("name".to_string(), "fruently".to_string());
        let record = Record::new(tag.clone(), time, obj.clone());
        let tmp = TempDir::new("fruently").unwrap().into_path().join("buffer");
        let conf = RetryConf::new().store_file(tmp.clone());
        assert!(maybe_write_events(&conf, record, FluentError::Dummy("dummy".to_string())).is_err());
        assert!(tmp.exists())
    }

    #[cfg(not(feature = "time-as-integer"))]
    #[test]
    fn test_write_event_record() {
        let tag = "fruently".to_string();
        let time = time::now();
        let mut obj: HashMap<String, String> = HashMap::new();
        obj.insert("name".to_string(), "fruently".to_string());
        let record = EventRecord::new(tag.clone(), time, obj.clone());
        let tmp = TempDir::new("fruently").unwrap().into_path().join("buffer");
        let conf = RetryConf::new().store_file(tmp.clone());
        assert!(maybe_write_events(&conf, record, FluentError::Dummy("dummy".to_string())).is_err());
        assert!(tmp.exists())
    }

    #[test]
    fn test_write_record_2_times() {
        let tag = "fruently".to_string();
        let time = time::now();
        let mut obj: HashMap<String, String> = HashMap::new();
        obj.insert("name".to_string(), "fruently".to_string());
        let record = Record::new(tag.clone(), time, obj.clone());
        let tmp = TempDir::new("fruently").unwrap().into_path().join("buffer");
        let conf = RetryConf::new().store_file(tmp.clone());
        assert!(maybe_write_events(&conf, record, FluentError::Dummy("dummy".to_string())).is_err());
        assert!(tmp.exists());
        let mut obj2: HashMap<String, String> = HashMap::new();
        obj2.insert("name2".to_string(), "fruently2".to_string());
        let record2 = Record::new(tag.clone(), time, obj2.clone());
        let conf2 = RetryConf::new().store_file(tmp.clone());
        assert!(maybe_write_events(&conf2, record2, FluentError::Dummy("dummy".to_string())).is_err());
        assert!(tmp.exists())
    }

    #[test]
    fn test_write_forward_records() {
        #[inline]
        #[cfg(not(feature = "time-as-integer"))]
        fn make_time() -> EventTime {
            EventTime::new(time::now())
        }
        #[inline]
        #[cfg(feature = "time-as-integer")]
        fn make_time() -> i64 {
            time::now().to_timespec().sec
        }
        let tag = "fruently".to_string();
        let mut obj1: HashMap<String, String> = HashMap::new();
        obj1.insert("hey".to_string(), "Rust with forward mode!".to_string());
        let mut obj2: HashMap<String, String> = HashMap::new();
        obj2.insert("yeah".to_string(), "Also sent together!".to_string());
        let time = make_time();
        let entry = (time.clone(), obj1);
        let entry2 = (time.clone(), obj2);
        let entries = vec![(entry), (entry2)];
        let forward = Forward::new(tag, entries);
        let tmp = TempDir::new("fruently").unwrap().into_path().join("buffer");
        let conf = RetryConf::new().store_file(tmp.clone());
        assert!(maybe_write_events(&conf, forward, FluentError::Dummy("dummy".to_string())).is_err());
        assert!(tmp.exists())
    }
}