Skip to main content

jar_genesis/
queue.rs

1//! Genesis workflow queue — ensures sequential execution across all genesis workflows.
2//!
3//! Each workflow run polls the GitHub API for earlier active runs. If any exist,
4//! it waits until they complete before proceeding. Ordering by `created_at`
5//! ensures FIFO — no starvation, no silent drops.
6
7use crate::github;
8
9const POLL_INTERVAL_SECS: u64 = 10;
10const TIMEOUT_SECS: u64 = 3600; // 1 hour
11const GENESIS_WORKFLOW_PREFIX: &str = ".github/workflows/genesis-";
12
13/// Resolve workflow IDs for all genesis workflows by path prefix.
14fn resolve_genesis_workflow_ids() -> Result<Vec<String>, Box<dyn std::error::Error>> {
15    let output = github::gh(&[
16        "api",
17        "repos/{owner}/{repo}/actions/workflows",
18        "--jq",
19        ".workflows[] | [.id, .path] | @tsv",
20    ])?;
21
22    let ids: Vec<String> = output
23        .lines()
24        .filter_map(|line| {
25            let parts: Vec<&str> = line.split('\t').collect();
26            if parts.len() == 2 && parts[1].starts_with(GENESIS_WORKFLOW_PREFIX) {
27                Some(parts[0].trim().to_string())
28            } else {
29                None
30            }
31        })
32        .collect();
33
34    if ids.is_empty() {
35        return Err("no genesis workflows found".into());
36    }
37
38    eprintln!("genesis-queue: found {} genesis workflow(s)", ids.len());
39    Ok(ids)
40}
41
42/// Wait until all earlier genesis workflow runs have completed.
43/// Polls every 10 seconds, times out after 10 minutes.
44///
45/// Only runs in GitHub Actions (requires GITHUB_RUN_ID environment variable).
46pub fn wait_for_queue() -> Result<(), Box<dyn std::error::Error>> {
47    let run_id = std::env::var("GITHUB_RUN_ID")
48        .map_err(|_| "GITHUB_RUN_ID not set — not running in GitHub Actions")?;
49
50    // Get this run's created_at
51    let my_created_at = github::gh(&[
52        "api",
53        &format!("repos/{{owner}}/{{repo}}/actions/runs/{run_id}"),
54        "--jq",
55        ".created_at",
56    ])?
57    .trim()
58    .to_string();
59
60    eprintln!(
61        "genesis-queue: run {run_id} created at {my_created_at}, checking for earlier runs..."
62    );
63
64    let workflow_ids = resolve_genesis_workflow_ids()?;
65    let delay = std::time::Duration::from_secs(POLL_INTERVAL_SECS);
66    let timeout = std::time::Duration::from_secs(TIMEOUT_SECS);
67    let start = std::time::Instant::now();
68
69    loop {
70        let mut earlier_active: Vec<(String, String)> = Vec::new();
71
72        for wf_id in &workflow_ids {
73            for status in &["in_progress", "queued"] {
74                let output = github::gh(&[
75                    "api",
76                    &format!(
77                        "repos/{{owner}}/{{repo}}/actions/workflows/{wf_id}/runs?status={status}&per_page=10"
78                    ),
79                    "--jq",
80                    ".workflow_runs[] | [.id, .created_at] | @tsv",
81                ])?;
82
83                for line in output.lines() {
84                    let parts: Vec<&str> = line.split('\t').collect();
85                    if parts.len() == 2 {
86                        let id = parts[0].trim();
87                        let created = parts[1].trim();
88                        if id != run_id && created < my_created_at.as_str() {
89                            earlier_active.push((id.to_string(), created.to_string()));
90                        }
91                    }
92                }
93            }
94        }
95
96        if earlier_active.is_empty() {
97            eprintln!("genesis-queue: no earlier runs active, proceeding");
98            return Ok(());
99        }
100
101        if start.elapsed() >= timeout {
102            return Err(format!(
103                "genesis-queue: timed out after {}s waiting for {} earlier run(s): {}",
104                TIMEOUT_SECS,
105                earlier_active.len(),
106                earlier_active
107                    .iter()
108                    .map(|(id, _)| id.as_str())
109                    .collect::<Vec<_>>()
110                    .join(", ")
111            )
112            .into());
113        }
114
115        eprintln!(
116            "genesis-queue: waiting for {} earlier run(s) ({:.0}s elapsed)",
117            earlier_active.len(),
118            start.elapsed().as_secs_f64()
119        );
120        std::thread::sleep(delay);
121    }
122}