1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
//! Defines the Component trait, which was going to be used by each CyberGrape
//! processing module.
//! This enforces a common interface between modules, so that each
//! module can consume data from the preceding module, process it, and pass
//! new data to the subsequent module in the CyberGrape pipeline.
use log::{info, warn};
use std::sync::mpsc::{Receiver, Sender};
use std::thread::{self, JoinHandle};
pub enum ComponentError {
/// A stage in the CyberGrape pipeline, which performs a step of the data
/// aggregation, binauralization, or music playback process. All structs
/// that perform a processing step in the CyberGrape system must implement
/// Component, so that they can be integrated into the pipeline.
pub trait Component {
/// The type of data the component takes as input
type InData;
/// The type of data the component produces as output
type OutData;
/// Return the name of the component, for logging
fn name(&self) -> String;
/// Converts an input of type A into an output of type B
fn convert(&mut self, input: Self::InData) -> Self::OutData;
/// Cleans up at termination of pipeline
fn finalize(&mut self) -> Result<(), ComponentError>;
/// Runs the given Component on its own thread. On receiving data of type
/// InData on the input channel, the Component converts them to data of type
/// OutData and sends it to the output channel.
pub fn run_component<C: Component + std::marker::Send + 'static>(
mut component: Box<C>,
input: Receiver<<C as Component>::InData>,
output: Sender<<C as Component>::OutData>,
) -> JoinHandle<()>
<C as Component>::InData: Send + 'static,
<C as Component>::OutData: Send + 'static,
thread::spawn(move || {
while let Ok(data) = input.recv() {
let out_data = component.convert(data);
if let Err(error) = output.send(out_data) {
warn!("{} : received error {}.", component.name(), error);
if let Err(component_error) = component.finalize() {
"{} : error during terminating : {component_error:?}.",
info!("{} : terminated.", component.name());
mod tests {
use super::*;
use std::sync::mpsc::channel;
/// Null MockComponent for compilation testing
struct MockComponent {}
impl MockComponent {
fn new() -> Self {
Self {}
impl Component for MockComponent {
type InData = i32;
type OutData = i32;
fn name(&self) -> String {
fn convert(&mut self, input: i32) -> i32 {
input + 1
fn finalize(&mut self) -> Result<(), ComponentError> {
/// Checks that a Component's generic input and output types can be
/// specified. Checks that writing a value to the Component's input
/// produces that value, converted, in the Component's output
fn test_mock_component() {
let mock_comp = MockComponent::new();
let (test_tx, block_rx) = channel::<i32>();
let (block_tx, test_rx) = channel::<i32>();
run_component(Box::new(mock_comp), block_rx, block_tx);
assert_eq!(test_tx.send(0), Ok(()));
// TODO: how can we create Component inside the closure and still be able to access tx and rx down here?
assert_eq!(test_rx.recv(), Ok(1));
fn test_chained_component() {
let mock_comp_a = MockComponent::new();
let mock_comp_b = MockComponent::new();
let (test_tx, block_a_rx) = channel::<i32>();
let (block_a_tx, block_b_rx) = channel::<i32>();
let (block_b_tx, test_rx) = channel::<i32>();
run_component(Box::new(mock_comp_a), block_a_rx, block_a_tx);
run_component(Box::new(mock_comp_b), block_b_rx, block_b_tx);
assert_eq!(test_tx.send(0), Ok(()));
assert_eq!(test_rx.recv(), Ok(2));