linky_groups/
linky_groups.rs

1//! Participatory synchronization mechanism for laptop orchestras
2//!
3//! This library is part of [harmonia](../harmonia/index.html) project.
4//! It implements participatory synchronization mechanism based on patched [Ableton Link][rusty_link] library.
5//!
6//! The library satisfies this requirements:
7//!
8//! * participatory - user chooses to join the orchestra, not orchestra forces them to start
9//! * bottom-up - there is no central server or chosen leader, all hosts participate to create
10//!   shared common synchronized state
11//! * minimal configuration - users need to provide shared group name, that they are going to
12//!   synchronize under
13
14use rusty_link::{AblLink, SessionState};
15use serde::{Deserialize, Serialize};
16use std::{sync::atomic, sync::Arc};
17
18mod net;
19
20/// Max length of the group name
21pub const MAX_GROUP_ID_LENGTH: usize = 15;
22
23/// ID that defines the group.
24type GroupId = [u8; MAX_GROUP_ID_LENGTH];
25
26/// Parsed Network Packet describing synchronization state
27#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
28struct GroupFrame {
29    /// Magic sequence distinguishing packets
30    magic: [u8; 4],
31
32    /// Version of the packet
33    version: u8,
34
35    /// Group identificator used to distinguish between concurrently going groups
36    group_id: GroupId,
37
38    /// Timestamp in microseconds that is a reference point using global host time for when group
39    /// was started
40    timestamp: i64,
41}
42
43impl std::fmt::Display for GroupFrame {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(
46            f,
47            "Group(version = {version}, id = ",
48            version = self.version
49        )?;
50        let group_id = &self.group_id[..self
51            .group_id
52            .iter()
53            .position(|c| *c == 0)
54            .unwrap_or(self.group_id.len())];
55        if let Ok(group_id) = std::str::from_utf8(group_id) {
56            write!(f, "{group_id:?}")?;
57        } else {
58            write!(f, "{group_id:?}", group_id = self.group_id)?;
59        }
60        write!(f, ", timestamp = {timestamp})", timestamp = self.timestamp)
61    }
62}
63
64/// Constant used to distinguish linky groups from other fields (based on link)
65const LINKY_GROUPS_MAGIC: [u8; 4] = *b"grup";
66
67impl GroupFrame {
68    /// Create new packet based on `group_id` from user and `timestamp` from [link][rusty_link]
69    fn new(group_id: GroupId, timestamp: i64) -> Self {
70        Self {
71            magic: LINKY_GROUPS_MAGIC,
72            version: 1,
73            group_id,
74            timestamp,
75        }
76    }
77
78    /// Check if current implementation supports this packet.
79    ///
80    /// Allows for backwards compatibility in future releases
81    fn is_supported(&self) -> bool {
82        self.magic == LINKY_GROUPS_MAGIC && self.version == 1
83    }
84}
85
86/// State for Group synchronization system
87pub struct Groups {
88    /// Listening task that receives group messages
89    listener: tokio::task::JoinHandle<()>,
90
91    /// State consolidation worker
92    worker: tokio::task::JoinHandle<()>,
93
94    /// Channel used to issue cancelation request
95    cancel: tokio::sync::mpsc::Sender<()>,
96
97    /// Channel for state updates
98    actions: tokio::sync::mpsc::Sender<Action>,
99
100    /// Used Ableton Link instance for time synchronization
101    link: std::sync::Arc<rusty_link::AblLink>,
102
103    /// Is set when there is a group in which we are playing.
104    is_playing: Arc<atomic::AtomicBool>,
105}
106
107/// All the errors that this crate may produce
108#[derive(Debug)]
109pub enum Error {
110    /// User provided GroupId that is longer [MAX_GROUP_ID_LENGTH]
111    GroupIdTooLong,
112}
113
114impl Groups {
115    // TODO: To avoid confusion make group_id_str case insensitive.
116    /// Start or join the group pointed by the user
117    pub async fn start(&self, group_id_str: &str) -> Result<(), Error> {
118        let mut group_id: GroupId = Default::default();
119        if group_id_str.len() > group_id.len() {
120            return Err(Error::GroupIdTooLong);
121        }
122        group_id[..group_id_str.len()].copy_from_slice(group_id_str.as_bytes());
123
124        let host_time = self.link.clock_micros();
125        let ghost_time = self.link.host_to_ghost(host_time);
126        let frame = GroupFrame::new(group_id, ghost_time);
127        self.actions
128            .send(Action::Start(frame))
129            .await
130            .expect("receiver will never be closed unless in destructor");
131        Ok(())
132    }
133
134    /// Stop performing in current group
135    pub async fn stop(&self) {
136        self.actions
137            .send(Action::Stop)
138            .await
139            .expect("receiver will never be closed unless in destructor");
140    }
141
142    /// Check if we are playing
143    pub fn is_playing(&self) -> bool {
144        self.is_playing.load(atomic::Ordering::SeqCst)
145    }
146
147    /// Stop group synchronization mechanism
148    ///
149    /// Only used in graceful shoutdown
150    pub async fn shutdown(self) {
151        tracing::debug!("Issuing shutdown");
152        self.cancel.send(()).await.unwrap();
153        self.actions.send(Action::Quit).await.unwrap();
154
155        self.listener.await.unwrap();
156        self.worker.await.unwrap();
157    }
158}
159
160/// Action is the description of requests for group synchronization worker
161#[derive(Debug)]
162enum Action {
163    /// Start playing in the provided group
164    Start(GroupFrame),
165
166    /// Join the provided group if it matches currently played
167    Join(GroupFrame),
168
169    /// Stop playing in the provided group (leave group)
170    Stop,
171
172    /// Quit listening
173    Quit,
174}
175
176/// The main loop of synchronization worker
177///
178/// Receives current state and based on it decides if join the group, start a new one etc.
179async fn negotatior(
180    mut state: tokio::sync::mpsc::Receiver<Action>,
181    link: Arc<AblLink>,
182    send_frame: tokio::sync::mpsc::Sender<GroupFrame>,
183    is_playing: Arc<std::sync::atomic::AtomicBool>,
184) {
185    use tokio::time::{Duration, Instant};
186
187    let mut current_group = None;
188    let mut last_send_time = Instant::now();
189
190    #[allow(clippy::missing_docs_in_private_items)]
191    const TIMEOUT_DURATION: Duration = Duration::from_millis(50);
192    #[allow(clippy::missing_docs_in_private_items)]
193    const QUANTUM: f64 = 1.0;
194
195    let mut timeout = tokio::time::interval(TIMEOUT_DURATION);
196
197    loop {
198        let request = if current_group.is_some() {
199            tokio::select! {
200                request = state.recv() => request,
201                _ = timeout.tick() => None,
202            }
203        } else {
204            state.recv().await
205        };
206
207        if let Some(request) = request {
208            tracing::debug!("Negotatior received request: {request:?}");
209
210            match request {
211                Action::Start(frame) => {
212                    current_group = Some(frame);
213
214                    let host_time = link.ghost_to_host(frame.timestamp);
215
216                    tracing::info!("starting {frame}");
217                    let mut session_state = SessionState::new();
218                    link.capture_app_session_state(&mut session_state);
219                    session_state.request_beat_at_time(0.0, host_time, QUANTUM);
220                    link.commit_app_session_state(&session_state);
221
222                    is_playing.store(true, atomic::Ordering::SeqCst);
223                }
224                Action::Join(frame) => {
225                    if let Some(current_frame) = current_group {
226                        // TODO: Add tolerance interval like Ableton/Link
227                        if current_frame.group_id == frame.group_id
228                            && current_frame.timestamp > frame.timestamp
229                        {
230                            let foreign_host_time = link.ghost_to_host(frame.timestamp);
231                            let my_host_time = link.clock_micros();
232
233                            let mut session_state = SessionState::new();
234                            link.capture_app_session_state(&mut session_state);
235
236                            let beat_difference =
237                                session_state.beat_at_time(foreign_host_time, QUANTUM);
238                            let current_beat = session_state.beat_at_time(my_host_time, QUANTUM);
239                            let desired_beat = current_beat - beat_difference;
240
241                            tracing::info!("Transitioning from {current_beat} to {desired_beat} with frame {frame}");
242
243                            session_state.request_beat_at_time(desired_beat, my_host_time, QUANTUM);
244                            link.commit_app_session_state(&session_state);
245                            current_group = Some(frame);
246                        }
247                    }
248                }
249                Action::Stop => {
250                    current_group.take();
251                    is_playing.store(false, atomic::Ordering::SeqCst);
252                    tracing::info!("Stopping playing current group");
253                }
254                Action::Quit => break,
255            }
256        }
257
258        if last_send_time.elapsed() >= TIMEOUT_DURATION {
259            if let Some(frame) = current_group {
260                send_frame
261                    .send(frame)
262                    .await
263                    .expect("send channel should be eternal");
264                last_send_time = tokio::time::Instant::now();
265            }
266        }
267    }
268}
269
270/// Create, initialize and start listening for group synchronization mechanism
271pub fn listen(link: std::sync::Arc<rusty_link::AblLink>) -> Groups {
272    let (cancel, wait_for_cancel) = tokio::sync::mpsc::channel(1);
273    let (send_frame, recv_frame) = tokio::sync::mpsc::channel(4);
274    let (send_action, state) = tokio::sync::mpsc::channel(4);
275    let is_playing = Arc::new(atomic::AtomicBool::new(false));
276    let is_enabled = link.is_enabled();
277
278    Groups {
279        actions: send_action.clone(),
280        link: link.clone(),
281        is_playing: is_playing.clone(),
282        listener: tokio::spawn(async move {
283            net::listen(recv_frame, send_action.clone(), wait_for_cancel, is_enabled).await;
284        }),
285        worker: tokio::spawn(async move {
286            negotatior(state, link, send_frame, is_playing).await;
287        }),
288        cancel,
289    }
290}