solana_faucet/
faucet.rs

1//! The `faucet` module provides an object for launching a Solana Faucet,
2//! which is the custodian of any remaining lamports in a mint.
3//! The Solana Faucet builds and sends airdrop transactions,
4//! checking requests against a single-request cap and a per-IP limit
5//! for a given time time_slice.
6
7use {
8    bincode::{deserialize, serialize, serialized_size},
9    byteorder::{ByteOrder, LittleEndian},
10    crossbeam_channel::{unbounded, Sender},
11    log::*,
12    serde_derive::{Deserialize, Serialize},
13    solana_metrics::datapoint_info,
14    solana_sdk::{
15        hash::Hash,
16        instruction::Instruction,
17        message::Message,
18        native_token::lamports_to_sol,
19        packet::PACKET_DATA_SIZE,
20        pubkey::Pubkey,
21        signature::{Keypair, Signer},
22        system_instruction,
23        transaction::Transaction,
24    },
25    std::{
26        collections::{HashMap, HashSet},
27        io::{Read, Write},
28        net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
29        sync::{Arc, Mutex},
30        thread,
31        time::Duration,
32    },
33    thiserror::Error,
34    tokio::{
35        io::{AsyncReadExt, AsyncWriteExt},
36        net::{TcpListener, TcpStream as TokioTcpStream},
37        runtime::Runtime,
38    },
39};
40
41#[macro_export]
42macro_rules! socketaddr {
43    ($ip:expr, $port:expr) => {
44        SocketAddr::from((Ipv4Addr::from($ip), $port))
45    };
46    ($str:expr) => {{
47        let a: SocketAddr = $str.parse().unwrap();
48        a
49    }};
50}
51
52const ERROR_RESPONSE: [u8; 2] = 0u16.to_le_bytes();
53
54pub const TIME_SLICE: u64 = 60;
55pub const FAUCET_PORT: u16 = 9900;
56
57#[derive(Error, Debug)]
58pub enum FaucetError {
59    #[error("IO Error: {0}")]
60    IoError(#[from] std::io::Error),
61
62    #[error("serialization error: {0}")]
63    Serialize(#[from] bincode::Error),
64
65    #[error("transaction_length from faucet exceeds limit: {0}")]
66    TransactionDataTooLarge(usize),
67
68    #[error("transaction_length from faucet: 0")]
69    NoDataReceived,
70
71    #[error("request too large; req: ◎{0}, cap: ◎{1}")]
72    PerRequestCapExceeded(f64, f64),
73
74    #[error("limit reached; req: ◎{0}, to: {1}, current: ◎{2}, cap: ◎{3}")]
75    PerTimeCapExceeded(f64, String, f64, f64),
76}
77
78#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
79pub enum FaucetRequest {
80    GetAirdrop {
81        lamports: u64,
82        to: Pubkey,
83        blockhash: Hash,
84    },
85}
86
87pub enum FaucetTransaction {
88    Airdrop(Transaction),
89    Memo((Transaction, String)),
90}
91
92pub struct Faucet {
93    faucet_keypair: Keypair,
94    ip_cache: HashMap<IpAddr, u64>,
95    address_cache: HashMap<Pubkey, u64>,
96    pub time_slice: Duration,
97    per_time_cap: Option<u64>,
98    per_request_cap: Option<u64>,
99    allowed_ips: HashSet<IpAddr>,
100}
101
102impl Faucet {
103    pub fn new(
104        faucet_keypair: Keypair,
105        time_input: Option<u64>,
106        per_time_cap: Option<u64>,
107        per_request_cap: Option<u64>,
108    ) -> Self {
109        Self::new_with_allowed_ips(
110            faucet_keypair,
111            time_input,
112            per_time_cap,
113            per_request_cap,
114            HashSet::new(),
115        )
116    }
117
118    pub fn new_with_allowed_ips(
119        faucet_keypair: Keypair,
120        time_input: Option<u64>,
121        per_time_cap: Option<u64>,
122        per_request_cap: Option<u64>,
123        allowed_ips: HashSet<IpAddr>,
124    ) -> Self {
125        let time_slice = Duration::new(time_input.unwrap_or(TIME_SLICE), 0);
126        if let Some((per_request_cap, per_time_cap)) = per_request_cap.zip(per_time_cap) {
127            if per_time_cap < per_request_cap {
128                warn!(
129                    "per_time_cap {} SOL < per_request_cap {} SOL; \
130                    maximum single requests will fail",
131                    lamports_to_sol(per_time_cap),
132                    lamports_to_sol(per_request_cap),
133                );
134            }
135        }
136        Self {
137            faucet_keypair,
138            ip_cache: HashMap::new(),
139            address_cache: HashMap::new(),
140            time_slice,
141            per_time_cap,
142            per_request_cap,
143            allowed_ips,
144        }
145    }
146
147    pub fn check_time_request_limit<T: LimitByTime + std::fmt::Display>(
148        &mut self,
149        request_amount: u64,
150        to: T,
151    ) -> Result<(), FaucetError> {
152        let new_total = to.check_cache(self, request_amount);
153        to.datapoint_info(request_amount, new_total);
154        if let Some(cap) = self.per_time_cap {
155            if new_total > cap {
156                return Err(FaucetError::PerTimeCapExceeded(
157                    lamports_to_sol(request_amount),
158                    to.to_string(),
159                    lamports_to_sol(new_total),
160                    lamports_to_sol(cap),
161                ));
162            }
163        }
164        Ok(())
165    }
166
167    pub fn clear_caches(&mut self) {
168        self.ip_cache.clear();
169        self.address_cache.clear();
170    }
171
172    /// Checks per-request and per-time-ip limits; if both pass, this method returns a signed
173    /// SystemProgram::Transfer transaction from the faucet keypair to the requested recipient. If
174    /// the request exceeds this per-request limit, this method returns a signed SPL Memo
175    /// transaction with the memo: `"request too large; req: <REQUEST> SOL cap: <CAP> SOL"`
176    pub fn build_airdrop_transaction(
177        &mut self,
178        req: FaucetRequest,
179        ip: IpAddr,
180    ) -> Result<FaucetTransaction, FaucetError> {
181        trace!("build_airdrop_transaction: {:?}", req);
182        match req {
183            FaucetRequest::GetAirdrop {
184                lamports,
185                to,
186                blockhash,
187            } => {
188                let mint_pubkey = self.faucet_keypair.pubkey();
189                info!(
190                    "Requesting airdrop of {} SOL to {:?}",
191                    lamports_to_sol(lamports),
192                    to
193                );
194
195                if let Some(cap) = self.per_request_cap {
196                    if lamports > cap {
197                        let memo = format!(
198                            "{}",
199                            FaucetError::PerRequestCapExceeded(
200                                lamports_to_sol(lamports),
201                                lamports_to_sol(cap),
202                            )
203                        );
204                        let memo_instruction = Instruction {
205                            program_id: Pubkey::from(spl_memo::id().to_bytes()),
206                            accounts: vec![],
207                            data: memo.as_bytes().to_vec(),
208                        };
209                        let message = Message::new(&[memo_instruction], Some(&mint_pubkey));
210                        return Ok(FaucetTransaction::Memo((
211                            Transaction::new(&[&self.faucet_keypair], message, blockhash),
212                            memo,
213                        )));
214                    }
215                }
216                if !ip.is_loopback() && !self.allowed_ips.contains(&ip) {
217                    self.check_time_request_limit(lamports, ip)?;
218                }
219                self.check_time_request_limit(lamports, to)?;
220
221                let transfer_instruction =
222                    system_instruction::transfer(&mint_pubkey, &to, lamports);
223                let message = Message::new(&[transfer_instruction], Some(&mint_pubkey));
224                Ok(FaucetTransaction::Airdrop(Transaction::new(
225                    &[&self.faucet_keypair],
226                    message,
227                    blockhash,
228                )))
229            }
230        }
231    }
232
233    /// Deserializes a received airdrop request, and returns a serialized transaction
234    pub fn process_faucet_request(
235        &mut self,
236        bytes: &[u8],
237        ip: IpAddr,
238    ) -> Result<Vec<u8>, FaucetError> {
239        let req: FaucetRequest = deserialize(bytes)?;
240
241        info!("Airdrop transaction requested...{:?}", req);
242        let res = self.build_airdrop_transaction(req, ip);
243        match res {
244            Ok(tx) => {
245                let tx = match tx {
246                    FaucetTransaction::Airdrop(tx) => {
247                        info!("Airdrop transaction granted");
248                        tx
249                    }
250                    FaucetTransaction::Memo((tx, memo)) => {
251                        warn!("Memo transaction returned: {}", memo);
252                        tx
253                    }
254                };
255                let response_vec = bincode::serialize(&tx)?;
256
257                let mut response_vec_with_length = vec![0; 2];
258                LittleEndian::write_u16(&mut response_vec_with_length, response_vec.len() as u16);
259                response_vec_with_length.extend_from_slice(&response_vec);
260
261                Ok(response_vec_with_length)
262            }
263            Err(err) => {
264                warn!("Airdrop transaction failed: {}", err);
265                Err(err)
266            }
267        }
268    }
269}
270
271impl Drop for Faucet {
272    fn drop(&mut self) {
273        solana_metrics::flush();
274    }
275}
276
277pub fn request_airdrop_transaction(
278    faucet_addr: &SocketAddr,
279    id: &Pubkey,
280    lamports: u64,
281    blockhash: Hash,
282) -> Result<Transaction, FaucetError> {
283    info!(
284        "request_airdrop_transaction: faucet_addr={} id={} lamports={} blockhash={}",
285        faucet_addr, id, lamports, blockhash
286    );
287
288    let mut stream = TcpStream::connect_timeout(faucet_addr, Duration::new(3, 0))?;
289    stream.set_read_timeout(Some(Duration::new(10, 0)))?;
290    let req = FaucetRequest::GetAirdrop {
291        lamports,
292        blockhash,
293        to: *id,
294    };
295    let req = serialize(&req).expect("serialize faucet request");
296    stream.write_all(&req)?;
297
298    // Read length of transaction
299    let mut buffer = [0; 2];
300    stream.read_exact(&mut buffer).map_err(|err| {
301        info!(
302            "request_airdrop_transaction: buffer length read_exact error: {:?}",
303            err
304        );
305        err
306    })?;
307    let transaction_length = LittleEndian::read_u16(&buffer) as usize;
308    if transaction_length > PACKET_DATA_SIZE {
309        return Err(FaucetError::TransactionDataTooLarge(transaction_length));
310    } else if transaction_length == 0 {
311        return Err(FaucetError::NoDataReceived);
312    }
313
314    // Read the transaction
315    let mut buffer = vec![0; transaction_length];
316    stream.read_exact(&mut buffer).map_err(|err| {
317        info!(
318            "request_airdrop_transaction: buffer read_exact error: {:?}",
319            err
320        );
321        err
322    })?;
323
324    let transaction: Transaction = deserialize(&buffer)?;
325    Ok(transaction)
326}
327
328pub fn run_local_faucet_with_port(
329    faucet_keypair: Keypair,
330    sender: Sender<Result<SocketAddr, String>>,
331    time_input: Option<u64>,
332    per_time_cap: Option<u64>,
333    per_request_cap: Option<u64>,
334    port: u16, // 0 => auto assign
335) {
336    thread::spawn(move || {
337        let faucet_addr = socketaddr!(Ipv4Addr::UNSPECIFIED, port);
338        let faucet = Arc::new(Mutex::new(Faucet::new(
339            faucet_keypair,
340            time_input,
341            per_time_cap,
342            per_request_cap,
343        )));
344        let runtime = Runtime::new().unwrap();
345        runtime.block_on(run_faucet(faucet, faucet_addr, Some(sender)));
346    });
347}
348
349// For integration tests. Listens on random open port and reports port to Sender.
350pub fn run_local_faucet(faucet_keypair: Keypair, per_time_cap: Option<u64>) -> SocketAddr {
351    let (sender, receiver) = unbounded();
352    run_local_faucet_with_port(faucet_keypair, sender, None, per_time_cap, None, 0);
353    receiver
354        .recv()
355        .expect("run_local_faucet")
356        .expect("faucet_addr")
357}
358
359pub async fn run_faucet(
360    faucet: Arc<Mutex<Faucet>>,
361    faucet_addr: SocketAddr,
362    sender: Option<Sender<Result<SocketAddr, String>>>,
363) {
364    let listener = TcpListener::bind(&faucet_addr).await;
365    if let Some(sender) = sender {
366        sender.send(
367            listener.as_ref().map(|listener| listener.local_addr().unwrap())
368                .map_err(|err| {
369                    format!(
370                        "Unable to bind faucet to {faucet_addr:?}, check the address is not already in use: {err}"
371                    )
372                })
373            )
374            .unwrap();
375    }
376
377    let listener = match listener {
378        Err(err) => {
379            error!("Faucet failed to start: {}", err);
380            return;
381        }
382        Ok(listener) => listener,
383    };
384    info!("Faucet started. Listening on: {}", faucet_addr);
385    info!(
386        "Faucet account address: {}",
387        faucet.lock().unwrap().faucet_keypair.pubkey()
388    );
389
390    loop {
391        let faucet = faucet.clone();
392        match listener.accept().await {
393            Ok((stream, _)) => {
394                tokio::spawn(async move {
395                    if let Err(e) = process(stream, faucet).await {
396                        info!("failed to process request; error = {:?}", e);
397                    }
398                });
399            }
400            Err(e) => debug!("failed to accept socket; error = {:?}", e),
401        }
402    }
403}
404
405async fn process(
406    mut stream: TokioTcpStream,
407    faucet: Arc<Mutex<Faucet>>,
408) -> Result<(), Box<dyn std::error::Error>> {
409    let mut request = vec![
410        0u8;
411        serialized_size(&FaucetRequest::GetAirdrop {
412            lamports: u64::default(),
413            to: Pubkey::default(),
414            blockhash: Hash::default(),
415        })
416        .unwrap() as usize
417    ];
418    while stream.read_exact(&mut request).await.is_ok() {
419        trace!("{:?}", request);
420
421        let response = {
422            match stream.peer_addr() {
423                Err(e) => {
424                    info!("{:?}", e.into_inner());
425                    ERROR_RESPONSE.to_vec()
426                }
427                Ok(peer_addr) => {
428                    let ip = peer_addr.ip();
429                    info!("Request IP: {:?}", ip);
430
431                    match faucet.lock().unwrap().process_faucet_request(&request, ip) {
432                        Ok(response_bytes) => {
433                            trace!("Airdrop response_bytes: {:?}", response_bytes);
434                            response_bytes
435                        }
436                        Err(e) => {
437                            info!("Error in request: {}", e);
438                            ERROR_RESPONSE.to_vec()
439                        }
440                    }
441                }
442            }
443        };
444        stream.write_all(&response).await?;
445    }
446
447    Ok(())
448}
449
450pub trait LimitByTime {
451    fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64;
452    fn datapoint_info(&self, request_amount: u64, new_total: u64);
453}
454
455impl LimitByTime for IpAddr {
456    fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
457        *faucet
458            .ip_cache
459            .entry(*self)
460            .and_modify(|total| *total = total.saturating_add(request_amount))
461            .or_insert(request_amount)
462    }
463
464    fn datapoint_info(&self, request_amount: u64, new_total: u64) {
465        datapoint_info!(
466            "faucet-airdrop",
467            ("request_amount", request_amount, i64),
468            ("ip", self.to_string(), String),
469            ("new_total", new_total, i64)
470        );
471    }
472}
473
474impl LimitByTime for Pubkey {
475    fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
476        *faucet
477            .address_cache
478            .entry(*self)
479            .and_modify(|total| *total = total.saturating_add(request_amount))
480            .or_insert(request_amount)
481    }
482
483    fn datapoint_info(&self, request_amount: u64, new_total: u64) {
484        datapoint_info!(
485            "faucet-airdrop",
486            ("request_amount", request_amount, i64),
487            ("address", self.to_string(), String),
488            ("new_total", new_total, i64)
489        );
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use {super::*, solana_sdk::system_instruction::SystemInstruction, std::time::Duration};
496
497    #[test]
498    fn test_check_time_request_limit() {
499        let keypair = Keypair::new();
500        let mut faucet = Faucet::new(keypair, None, Some(2), None);
501        let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
502        assert!(faucet.check_time_request_limit(1, ip).is_ok());
503        assert!(faucet.check_time_request_limit(1, ip).is_ok());
504        assert!(faucet.check_time_request_limit(1, ip).is_err());
505
506        let address = Pubkey::new_unique();
507        assert!(faucet.check_time_request_limit(1, address).is_ok());
508        assert!(faucet.check_time_request_limit(1, address).is_ok());
509        assert!(faucet.check_time_request_limit(1, address).is_err());
510    }
511
512    #[test]
513    fn test_clear_caches() {
514        let keypair = Keypair::new();
515        let mut faucet = Faucet::new(keypair, None, None, None);
516        let ip = socketaddr!(Ipv4Addr::LOCALHOST, 0).ip();
517        assert_eq!(faucet.ip_cache.len(), 0);
518        faucet.check_time_request_limit(1, ip).unwrap();
519        assert_eq!(faucet.ip_cache.len(), 1);
520        faucet.clear_caches();
521        assert_eq!(faucet.ip_cache.len(), 0);
522        assert!(faucet.ip_cache.is_empty());
523
524        let address = Pubkey::new_unique();
525        assert_eq!(faucet.address_cache.len(), 0);
526        faucet.check_time_request_limit(1, address).unwrap();
527        assert_eq!(faucet.address_cache.len(), 1);
528        faucet.clear_caches();
529        assert_eq!(faucet.address_cache.len(), 0);
530        assert!(faucet.address_cache.is_empty());
531    }
532
533    #[test]
534    fn test_faucet_default_init() {
535        let keypair = Keypair::new();
536        let time_slice: Option<u64> = None;
537        let per_time_cap: Option<u64> = Some(200);
538        let per_request_cap: Option<u64> = Some(100);
539        let faucet = Faucet::new(keypair, time_slice, per_time_cap, per_request_cap);
540        assert_eq!(faucet.time_slice, Duration::new(TIME_SLICE, 0));
541        assert_eq!(faucet.per_time_cap, per_time_cap);
542        assert_eq!(faucet.per_request_cap, per_request_cap);
543    }
544
545    #[test]
546    fn test_faucet_build_airdrop_transaction() {
547        let to = Pubkey::new_unique();
548        let blockhash = Hash::default();
549        let request = FaucetRequest::GetAirdrop {
550            lamports: 2,
551            to,
552            blockhash,
553        };
554        let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
555
556        let mint = Keypair::new();
557        let mint_pubkey = mint.pubkey();
558        let mut faucet = Faucet::new(mint, None, None, None);
559
560        if let FaucetTransaction::Airdrop(tx) =
561            faucet.build_airdrop_transaction(request, ip).unwrap()
562        {
563            let message = tx.message();
564
565            assert_eq!(tx.signatures.len(), 1);
566            assert_eq!(
567                message.account_keys,
568                vec![mint_pubkey, to, Pubkey::default()]
569            );
570            assert_eq!(message.recent_blockhash, blockhash);
571
572            assert_eq!(message.instructions.len(), 1);
573            let instruction: SystemInstruction =
574                deserialize(&message.instructions[0].data).unwrap();
575            assert_eq!(instruction, SystemInstruction::Transfer { lamports: 2 });
576        } else {
577            panic!("airdrop should succeed");
578        }
579
580        // Test per-time request cap
581        let mint = Keypair::new();
582        faucet = Faucet::new(mint, None, Some(2), None);
583        let _tx = faucet.build_airdrop_transaction(request, ip).unwrap(); // first request succeeds
584        let tx = faucet.build_airdrop_transaction(request, ip);
585        assert!(tx.is_err());
586
587        // Test multiple requests from loopback with different addresses succeed
588        let mint = Keypair::new();
589        faucet = Faucet::new(mint, None, Some(2), None);
590        let ip = socketaddr!(Ipv4Addr::LOCALHOST, 0).ip();
591        let other = Pubkey::new_unique();
592        let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); // first request succeeds
593        let request1 = FaucetRequest::GetAirdrop {
594            lamports: 2,
595            to: other,
596            blockhash,
597        };
598        let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); // first request succeeds
599        let tx0 = faucet.build_airdrop_transaction(request, ip);
600        assert!(tx0.is_err());
601        let tx1 = faucet.build_airdrop_transaction(request1, ip);
602        assert!(tx1.is_err());
603
604        // Test multiple requests from allowed ip with different addresses succeed
605        let mint = Keypair::new();
606        let ip = socketaddr!([203, 0, 113, 1], 0).ip();
607        let mut allowed_ips = HashSet::new();
608        allowed_ips.insert(ip);
609        faucet = Faucet::new_with_allowed_ips(mint, None, Some(2), None, allowed_ips);
610        let other = Pubkey::new_unique();
611        let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); // first request succeeds
612        let request1 = FaucetRequest::GetAirdrop {
613            lamports: 2,
614            to: other,
615            blockhash,
616        };
617        let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); // first request succeeds
618        let tx0 = faucet.build_airdrop_transaction(request, ip);
619        assert!(tx0.is_err());
620        let tx1 = faucet.build_airdrop_transaction(request1, ip);
621        assert!(tx1.is_err());
622
623        // Test per-request cap
624        let mint = Keypair::new();
625        let mint_pubkey = mint.pubkey();
626        let mut faucet = Faucet::new(mint, None, None, Some(1));
627
628        if let FaucetTransaction::Memo((tx, memo)) =
629            faucet.build_airdrop_transaction(request, ip).unwrap()
630        {
631            let message = tx.message();
632
633            assert_eq!(tx.signatures.len(), 1);
634            assert_eq!(
635                message.account_keys,
636                vec![mint_pubkey, Pubkey::from(spl_memo::id().to_bytes())]
637            );
638            assert_eq!(message.recent_blockhash, blockhash);
639
640            assert_eq!(message.instructions.len(), 1);
641            let parsed_memo = std::str::from_utf8(&message.instructions[0].data).unwrap();
642            let expected_memo = "request too large; req: ◎0.000000002, cap: ◎0.000000001";
643            assert_eq!(parsed_memo, expected_memo);
644            assert_eq!(memo, expected_memo);
645        } else {
646            panic!("airdrop attempt should result in memo tx");
647        }
648    }
649
650    #[test]
651    fn test_process_faucet_request() {
652        let to = solana_sdk::pubkey::new_rand();
653        let blockhash = Hash::new(to.as_ref());
654        let lamports = 50;
655        let req = FaucetRequest::GetAirdrop {
656            lamports,
657            blockhash,
658            to,
659        };
660        let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
661        let req = serialize(&req).unwrap();
662
663        let keypair = Keypair::new();
664        let expected_instruction = system_instruction::transfer(&keypair.pubkey(), &to, lamports);
665        let message = Message::new(&[expected_instruction], Some(&keypair.pubkey()));
666        let expected_tx = Transaction::new(&[&keypair], message, blockhash);
667        let expected_bytes = serialize(&expected_tx).unwrap();
668        let mut expected_vec_with_length = vec![0; 2];
669        LittleEndian::write_u16(&mut expected_vec_with_length, expected_bytes.len() as u16);
670        expected_vec_with_length.extend_from_slice(&expected_bytes);
671
672        let mut faucet = Faucet::new(keypair, None, None, None);
673        let response = faucet.process_faucet_request(&req, ip);
674        let response_vec = response.unwrap().to_vec();
675        assert_eq!(expected_vec_with_length, response_vec);
676
677        let bad_bytes = "bad bytes".as_bytes();
678        assert!(faucet.process_faucet_request(bad_bytes, ip).is_err());
679    }
680}