linky_groups/
linky_groups.rs1use rusty_link::{AblLink, SessionState};
15use serde::{Deserialize, Serialize};
16use std::{sync::atomic, sync::Arc};
17
18mod net;
19
20pub const MAX_GROUP_ID_LENGTH: usize = 15;
22
23type GroupId = [u8; MAX_GROUP_ID_LENGTH];
25
26#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
28struct GroupFrame {
29 magic: [u8; 4],
31
32 version: u8,
34
35 group_id: GroupId,
37
38 timestamp: i64,
41}
42
43impl std::fmt::Display for GroupFrame {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(
46 f,
47 "Group(version = {version}, id = ",
48 version = self.version
49 )?;
50 let group_id = &self.group_id[..self
51 .group_id
52 .iter()
53 .position(|c| *c == 0)
54 .unwrap_or(self.group_id.len())];
55 if let Ok(group_id) = std::str::from_utf8(group_id) {
56 write!(f, "{group_id:?}")?;
57 } else {
58 write!(f, "{group_id:?}", group_id = self.group_id)?;
59 }
60 write!(f, ", timestamp = {timestamp})", timestamp = self.timestamp)
61 }
62}
63
64const LINKY_GROUPS_MAGIC: [u8; 4] = *b"grup";
66
67impl GroupFrame {
68 fn new(group_id: GroupId, timestamp: i64) -> Self {
70 Self {
71 magic: LINKY_GROUPS_MAGIC,
72 version: 1,
73 group_id,
74 timestamp,
75 }
76 }
77
78 fn is_supported(&self) -> bool {
82 self.magic == LINKY_GROUPS_MAGIC && self.version == 1
83 }
84}
85
86pub struct Groups {
88 listener: tokio::task::JoinHandle<()>,
90
91 worker: tokio::task::JoinHandle<()>,
93
94 cancel: tokio::sync::mpsc::Sender<()>,
96
97 actions: tokio::sync::mpsc::Sender<Action>,
99
100 link: std::sync::Arc<rusty_link::AblLink>,
102
103 is_playing: Arc<atomic::AtomicBool>,
105}
106
107#[derive(Debug)]
109pub enum Error {
110 GroupIdTooLong,
112}
113
114impl Groups {
115 pub async fn start(&self, group_id_str: &str) -> Result<(), Error> {
118 let mut group_id: GroupId = Default::default();
119 if group_id_str.len() > group_id.len() {
120 return Err(Error::GroupIdTooLong);
121 }
122 group_id[..group_id_str.len()].copy_from_slice(group_id_str.as_bytes());
123
124 let host_time = self.link.clock_micros();
125 let ghost_time = self.link.host_to_ghost(host_time);
126 let frame = GroupFrame::new(group_id, ghost_time);
127 self.actions
128 .send(Action::Start(frame))
129 .await
130 .expect("receiver will never be closed unless in destructor");
131 Ok(())
132 }
133
134 pub async fn stop(&self) {
136 self.actions
137 .send(Action::Stop)
138 .await
139 .expect("receiver will never be closed unless in destructor");
140 }
141
142 pub fn is_playing(&self) -> bool {
144 self.is_playing.load(atomic::Ordering::SeqCst)
145 }
146
147 pub async fn shutdown(self) {
151 tracing::debug!("Issuing shutdown");
152 self.cancel.send(()).await.unwrap();
153 self.actions.send(Action::Quit).await.unwrap();
154
155 self.listener.await.unwrap();
156 self.worker.await.unwrap();
157 }
158}
159
160#[derive(Debug)]
162enum Action {
163 Start(GroupFrame),
165
166 Join(GroupFrame),
168
169 Stop,
171
172 Quit,
174}
175
176async fn negotatior(
180 mut state: tokio::sync::mpsc::Receiver<Action>,
181 link: Arc<AblLink>,
182 send_frame: tokio::sync::mpsc::Sender<GroupFrame>,
183 is_playing: Arc<std::sync::atomic::AtomicBool>,
184) {
185 use tokio::time::{Duration, Instant};
186
187 let mut current_group = None;
188 let mut last_send_time = Instant::now();
189
190 #[allow(clippy::missing_docs_in_private_items)]
191 const TIMEOUT_DURATION: Duration = Duration::from_millis(50);
192 #[allow(clippy::missing_docs_in_private_items)]
193 const QUANTUM: f64 = 1.0;
194
195 let mut timeout = tokio::time::interval(TIMEOUT_DURATION);
196
197 loop {
198 let request = if current_group.is_some() {
199 tokio::select! {
200 request = state.recv() => request,
201 _ = timeout.tick() => None,
202 }
203 } else {
204 state.recv().await
205 };
206
207 if let Some(request) = request {
208 tracing::debug!("Negotatior received request: {request:?}");
209
210 match request {
211 Action::Start(frame) => {
212 current_group = Some(frame);
213
214 let host_time = link.ghost_to_host(frame.timestamp);
215
216 tracing::info!("starting {frame}");
217 let mut session_state = SessionState::new();
218 link.capture_app_session_state(&mut session_state);
219 session_state.request_beat_at_time(0.0, host_time, QUANTUM);
220 link.commit_app_session_state(&session_state);
221
222 is_playing.store(true, atomic::Ordering::SeqCst);
223 }
224 Action::Join(frame) => {
225 if let Some(current_frame) = current_group {
226 if current_frame.group_id == frame.group_id
228 && current_frame.timestamp > frame.timestamp
229 {
230 let foreign_host_time = link.ghost_to_host(frame.timestamp);
231 let my_host_time = link.clock_micros();
232
233 let mut session_state = SessionState::new();
234 link.capture_app_session_state(&mut session_state);
235
236 let beat_difference =
237 session_state.beat_at_time(foreign_host_time, QUANTUM);
238 let current_beat = session_state.beat_at_time(my_host_time, QUANTUM);
239 let desired_beat = current_beat - beat_difference;
240
241 tracing::info!("Transitioning from {current_beat} to {desired_beat} with frame {frame}");
242
243 session_state.request_beat_at_time(desired_beat, my_host_time, QUANTUM);
244 link.commit_app_session_state(&session_state);
245 current_group = Some(frame);
246 }
247 }
248 }
249 Action::Stop => {
250 current_group.take();
251 is_playing.store(false, atomic::Ordering::SeqCst);
252 tracing::info!("Stopping playing current group");
253 }
254 Action::Quit => break,
255 }
256 }
257
258 if last_send_time.elapsed() >= TIMEOUT_DURATION {
259 if let Some(frame) = current_group {
260 send_frame
261 .send(frame)
262 .await
263 .expect("send channel should be eternal");
264 last_send_time = tokio::time::Instant::now();
265 }
266 }
267 }
268}
269
270pub fn listen(link: std::sync::Arc<rusty_link::AblLink>) -> Groups {
272 let (cancel, wait_for_cancel) = tokio::sync::mpsc::channel(1);
273 let (send_frame, recv_frame) = tokio::sync::mpsc::channel(4);
274 let (send_action, state) = tokio::sync::mpsc::channel(4);
275 let is_playing = Arc::new(atomic::AtomicBool::new(false));
276 let is_enabled = link.is_enabled();
277
278 Groups {
279 actions: send_action.clone(),
280 link: link.clone(),
281 is_playing: is_playing.clone(),
282 listener: tokio::spawn(async move {
283 net::listen(recv_frame, send_action.clone(), wait_for_cancel, is_enabled).await;
284 }),
285 worker: tokio::spawn(async move {
286 negotatior(state, link, send_frame, is_playing).await;
287 }),
288 cancel,
289 }
290}