1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//! Defines the Component trait, which was going to be used by each CyberGrape
//! processing module.
//!
//! This enforces a common interface between modules, so that each
//! module can consume data from the preceding module, process it, and pass
//! new data to the subsequent module in the CyberGrape pipeline.

use log::{info, warn};
use std::sync::mpsc::{Receiver, Sender};
use std::thread::{self, JoinHandle};

#[allow(missing_docs)]
#[derive(Debug)]
pub enum ComponentError {
    HoundError(hound::Error),
}

/// A stage in the CyberGrape pipeline, which performs a step of the data
/// aggregation, binauralization, or music playback process. All structs
/// that perform a processing step in the CyberGrape system must implement
/// Component, so that they can be integrated into the pipeline.
pub trait Component {
    /// The type of data the component takes as input
    type InData;
    /// The type of data the component produces as output
    type OutData;

    /// Return the name of the component, for logging
    fn name(&self) -> String;

    /// Converts an input of type A into an output of type B
    fn convert(&mut self, input: Self::InData) -> Self::OutData;

    /// Cleans up at termination of pipeline
    fn finalize(&mut self) -> Result<(), ComponentError>;
}


/// Runs the given Component on its own thread. On receiving data of type
/// InData on the input channel, the Component converts them to data of type
/// OutData and sends it to the output channel.
pub fn run_component<C: Component + std::marker::Send + 'static>(
    mut component: Box<C>,
    input: Receiver<<C as Component>::InData>,
    output: Sender<<C as Component>::OutData>,
) -> JoinHandle<()>
where
    <C as Component>::InData: Send + 'static,
    <C as Component>::OutData: Send + 'static,
{
    thread::spawn(move || {
        while let Ok(data) = input.recv() {
            let out_data = component.convert(data);
            if let Err(error) = output.send(out_data) {
                warn!("{} : received error {}.", component.name(), error);
            }
        }

        if let Err(component_error) = component.finalize() {
            warn!(
                "{} : error during terminating : {component_error:?}.",
                component.name(),
            );
        }
        info!("{} : terminated.", component.name());
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::mpsc::channel;

    /// Null MockComponent for compilation testing
    struct MockComponent {}

    impl MockComponent {
        fn new() -> Self {
            Self {}
        }
    }

    impl Component for MockComponent {
        type InData = i32;
        type OutData = i32;

        fn name(&self) -> String {
            "MockComponent".to_string()
        }

        fn convert(&mut self, input: i32) -> i32 {
            input + 1
        }

        fn finalize(&mut self) -> Result<(), ComponentError> {
            Ok(())
        }
    }

    /// Checks that a Component's generic input and output types can be
    /// specified. Checks that writing a value to the Component's input
    /// produces that value, converted, in the Component's output
    #[test]
    fn test_mock_component() {
        let mock_comp = MockComponent::new();
        let (test_tx, block_rx) = channel::<i32>();
        let (block_tx, test_rx) = channel::<i32>();

        run_component(Box::new(mock_comp), block_rx, block_tx);

        assert_eq!(test_tx.send(0), Ok(()));
        // TODO: how can we create Component inside the closure and still be able to access tx and rx down here?
        assert_eq!(test_rx.recv(), Ok(1));
    }

    #[test]
    fn test_chained_component() {
        let mock_comp_a = MockComponent::new();
        let mock_comp_b = MockComponent::new();

        let (test_tx, block_a_rx) = channel::<i32>();
        let (block_a_tx, block_b_rx) = channel::<i32>();
        let (block_b_tx, test_rx) = channel::<i32>();

        run_component(Box::new(mock_comp_a), block_a_rx, block_a_tx);

        run_component(Box::new(mock_comp_b), block_b_rx, block_b_tx);

        assert_eq!(test_tx.send(0), Ok(()));
        assert_eq!(test_rx.recv(), Ok(2));
    }
}