1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//! The sshx server, which coordinates terminal sharing.
//!
//! Requests are communicated to the server via gRPC (for command-line sharing
//! clients) and WebSocket connections (for web listeners). The server is built
//! using a hybrid Hyper service, split between a Tonic gRPC handler and an Axum
//! web listener.
//!
//! Most web requests are routed directly to static files located in the `dist/`
//! folder relative to where this binary is running, allowing the frontend to be
//! separately developed from the server.

#![forbid(unsafe_code)]
#![warn(missing_docs)]

use std::{net::SocketAddr, sync::Arc};

use anyhow::Result;
use hyper::server::conn::AddrIncoming;
use utils::Shutdown;

use crate::state::ServerState;

pub mod grpc;
mod listen;
pub mod session;
pub mod state;
pub mod utils;
pub mod web;

/// Options when constructing the application server.
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct ServerOptions {
    /// Secret used for signing tokens. Set randomly if not provided.
    pub secret: Option<String>,

    /// Override the origin returned for the Open() RPC.
    pub override_origin: Option<String>,

    /// URL of the Redis server that stores session data.
    pub redis_url: Option<String>,

    /// Hostname of this server, if running multiple servers.
    pub host: Option<String>,
}

/// Stateful object that manages the sshx server, with graceful termination.
pub struct Server {
    state: Arc<ServerState>,
    shutdown: Shutdown,
}

impl Server {
    /// Create a new application server, but do not listen for connections yet.
    pub fn new(options: ServerOptions) -> Result<Self> {
        Ok(Self {
            state: Arc::new(ServerState::new(options)?),
            shutdown: Shutdown::new(),
        })
    }

    /// Returns the server's state object.
    pub fn state(&self) -> Arc<ServerState> {
        Arc::clone(&self.state)
    }

    /// Run the application server, listening on a stream of connections.
    pub async fn listen(&self, incoming: AddrIncoming) -> Result<()> {
        let state = self.state.clone();
        let terminated = self.shutdown.wait();
        tokio::spawn(async move {
            let background_tasks = futures_util::future::join(
                state.listen_for_transfers(),
                state.close_old_sessions(),
            );
            tokio::select! {
                _ = terminated => {}
                _ = background_tasks => {}
            }
        });

        listen::start_server(self.state(), incoming, self.shutdown.wait()).await
    }

    /// Convenience function to call [`Server::listen`] bound to a TCP address.
    pub async fn bind(&self, addr: &SocketAddr) -> Result<()> {
        self.listen(AddrIncoming::bind(addr)?).await
    }

    /// Send a graceful shutdown signal to the server.
    pub fn shutdown(&self) {
        // Stop receiving new network connections.
        self.shutdown.shutdown();
        // Terminate each of the existing sessions.
        self.state.shutdown();
    }
}