1use {
8 bincode::{deserialize, serialize, serialized_size},
9 crossbeam_channel::{unbounded, Sender},
10 log::*,
11 serde_derive::{Deserialize, Serialize},
12 solana_hash::Hash,
13 solana_instruction::Instruction,
14 solana_keypair::Keypair,
15 solana_message::Message,
16 solana_metrics::datapoint_info,
17 solana_native_token::lamports_to_sol,
18 solana_packet::PACKET_DATA_SIZE,
19 solana_pubkey::Pubkey,
20 solana_signer::Signer,
21 solana_system_interface::instruction::transfer,
22 solana_transaction::Transaction,
23 std::{
24 collections::{HashMap, HashSet},
25 io::{Read, Write},
26 net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
27 sync::{Arc, Mutex},
28 thread,
29 time::Duration,
30 },
31 thiserror::Error,
32 tokio::{
33 io::{AsyncReadExt, AsyncWriteExt},
34 net::{TcpListener, TcpStream as TokioTcpStream},
35 runtime::Runtime,
36 },
37};
38
39#[macro_export]
40macro_rules! socketaddr {
41 ($ip:expr, $port:expr) => {
42 SocketAddr::from((Ipv4Addr::from($ip), $port))
43 };
44 ($str:expr) => {{
45 let a: SocketAddr = $str.parse().unwrap();
46 a
47 }};
48}
49
50const ERROR_RESPONSE: [u8; 2] = 0u16.to_le_bytes();
51
52pub const TIME_SLICE: u64 = 60;
53pub const FAUCET_PORT: u16 = 9900;
54
55#[derive(Error, Debug)]
56pub enum FaucetError {
57 #[error("IO Error: {0}")]
58 IoError(#[from] std::io::Error),
59
60 #[error("serialization error: {0}")]
61 Serialize(#[from] bincode::Error),
62
63 #[error("transaction_length from faucet exceeds limit: {0}")]
64 TransactionDataTooLarge(usize),
65
66 #[error("transaction_length from faucet: 0")]
67 NoDataReceived,
68
69 #[error("request too large; req: ◎{0}, cap: ◎{1}")]
70 PerRequestCapExceeded(f64, f64),
71
72 #[error("limit reached; req: ◎{0}, to: {1}, current: ◎{2}, cap: ◎{3}")]
73 PerTimeCapExceeded(f64, String, f64, f64),
74}
75
76#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
77pub enum FaucetRequest {
78 GetAirdrop {
79 lamports: u64,
80 to: Pubkey,
81 blockhash: Hash,
82 },
83}
84
85pub enum FaucetTransaction {
86 Airdrop(Transaction),
87 Memo((Transaction, String)),
88}
89
90pub struct Faucet {
91 faucet_keypair: Keypair,
92 ip_cache: HashMap<IpAddr, u64>,
93 address_cache: HashMap<Pubkey, u64>,
94 pub time_slice: Duration,
95 per_time_cap: Option<u64>,
96 per_request_cap: Option<u64>,
97 allowed_ips: HashSet<IpAddr>,
98}
99
100impl Faucet {
101 pub fn new(
102 faucet_keypair: Keypair,
103 time_input: Option<u64>,
104 per_time_cap: Option<u64>,
105 per_request_cap: Option<u64>,
106 ) -> Self {
107 Self::new_with_allowed_ips(
108 faucet_keypair,
109 time_input,
110 per_time_cap,
111 per_request_cap,
112 HashSet::new(),
113 )
114 }
115
116 pub fn new_with_allowed_ips(
117 faucet_keypair: Keypair,
118 time_input: Option<u64>,
119 per_time_cap: Option<u64>,
120 per_request_cap: Option<u64>,
121 allowed_ips: HashSet<IpAddr>,
122 ) -> Self {
123 let time_slice = Duration::new(time_input.unwrap_or(TIME_SLICE), 0);
124 if let Some((per_request_cap, per_time_cap)) = per_request_cap.zip(per_time_cap) {
125 if per_time_cap < per_request_cap {
126 warn!(
127 "per_time_cap {} SOL < per_request_cap {} SOL; \
128 maximum single requests will fail",
129 lamports_to_sol(per_time_cap),
130 lamports_to_sol(per_request_cap),
131 );
132 }
133 }
134 Self {
135 faucet_keypair,
136 ip_cache: HashMap::new(),
137 address_cache: HashMap::new(),
138 time_slice,
139 per_time_cap,
140 per_request_cap,
141 allowed_ips,
142 }
143 }
144
145 pub fn check_time_request_limit<T: LimitByTime + std::fmt::Display>(
146 &mut self,
147 request_amount: u64,
148 to: T,
149 ) -> Result<(), FaucetError> {
150 let new_total = to.check_cache(self, request_amount);
151 to.datapoint_info(request_amount, new_total);
152 if let Some(cap) = self.per_time_cap {
153 if new_total > cap {
154 return Err(FaucetError::PerTimeCapExceeded(
155 lamports_to_sol(request_amount),
156 to.to_string(),
157 lamports_to_sol(new_total),
158 lamports_to_sol(cap),
159 ));
160 }
161 }
162 Ok(())
163 }
164
165 pub fn clear_caches(&mut self) {
166 self.ip_cache.clear();
167 self.address_cache.clear();
168 }
169
170 pub fn build_airdrop_transaction(
175 &mut self,
176 req: FaucetRequest,
177 ip: IpAddr,
178 ) -> Result<FaucetTransaction, FaucetError> {
179 trace!("build_airdrop_transaction: {:?}", req);
180 match req {
181 FaucetRequest::GetAirdrop {
182 lamports,
183 to,
184 blockhash,
185 } => {
186 let mint_pubkey = self.faucet_keypair.pubkey();
187 info!(
188 "Requesting airdrop of {} SOL to {:?}",
189 lamports_to_sol(lamports),
190 to
191 );
192
193 if let Some(cap) = self.per_request_cap {
194 if lamports > cap {
195 let memo = format!(
196 "{}",
197 FaucetError::PerRequestCapExceeded(
198 lamports_to_sol(lamports),
199 lamports_to_sol(cap),
200 )
201 );
202 let memo_instruction = Instruction {
203 program_id: Pubkey::from(spl_memo::id().to_bytes()),
204 accounts: vec![],
205 data: memo.as_bytes().to_vec(),
206 };
207 let message = Message::new(&[memo_instruction], Some(&mint_pubkey));
208 return Ok(FaucetTransaction::Memo((
209 Transaction::new(&[&self.faucet_keypair], message, blockhash),
210 memo,
211 )));
212 }
213 }
214 if !ip.is_loopback() && !self.allowed_ips.contains(&ip) {
215 self.check_time_request_limit(lamports, ip)?;
216 }
217 self.check_time_request_limit(lamports, to)?;
218
219 let transfer_instruction = transfer(&mint_pubkey, &to, lamports);
220 let message = Message::new(&[transfer_instruction], Some(&mint_pubkey));
221 Ok(FaucetTransaction::Airdrop(Transaction::new(
222 &[&self.faucet_keypair],
223 message,
224 blockhash,
225 )))
226 }
227 }
228 }
229
230 pub fn process_faucet_request(
232 &mut self,
233 bytes: &[u8],
234 ip: IpAddr,
235 ) -> Result<Vec<u8>, FaucetError> {
236 let req: FaucetRequest = deserialize(bytes)?;
237
238 info!("Airdrop transaction requested...{:?}", req);
239 let res = self.build_airdrop_transaction(req, ip);
240 match res {
241 Ok(tx) => {
242 let tx = match tx {
243 FaucetTransaction::Airdrop(tx) => {
244 info!("Airdrop transaction granted");
245 tx
246 }
247 FaucetTransaction::Memo((tx, memo)) => {
248 warn!("Memo transaction returned: {}", memo);
249 tx
250 }
251 };
252 let response_vec = bincode::serialize(&tx)?;
253
254 let mut response_vec_with_length =
255 (response_vec.len() as u16).to_le_bytes().to_vec();
256 response_vec_with_length.extend_from_slice(&response_vec);
257
258 Ok(response_vec_with_length)
259 }
260 Err(err) => {
261 warn!("Airdrop transaction failed: {}", err);
262 Err(err)
263 }
264 }
265 }
266}
267
268impl Drop for Faucet {
269 fn drop(&mut self) {
270 solana_metrics::flush();
271 }
272}
273
274pub fn request_airdrop_transaction(
275 faucet_addr: &SocketAddr,
276 id: &Pubkey,
277 lamports: u64,
278 blockhash: Hash,
279) -> Result<Transaction, FaucetError> {
280 info!(
281 "request_airdrop_transaction: faucet_addr={} id={} lamports={} blockhash={}",
282 faucet_addr, id, lamports, blockhash
283 );
284
285 let mut stream = TcpStream::connect_timeout(faucet_addr, Duration::new(3, 0))?;
286 stream.set_read_timeout(Some(Duration::new(10, 0)))?;
287 let req = FaucetRequest::GetAirdrop {
288 lamports,
289 blockhash,
290 to: *id,
291 };
292 let req = serialize(&req).expect("serialize faucet request");
293 stream.write_all(&req)?;
294
295 let mut buffer = [0; 2];
297 stream.read_exact(&mut buffer).map_err(|err| {
298 info!(
299 "request_airdrop_transaction: buffer length read_exact error: {:?}",
300 err
301 );
302 err
303 })?;
304 let transaction_length = u16::from_le_bytes(buffer) as usize;
305 if transaction_length > PACKET_DATA_SIZE {
306 return Err(FaucetError::TransactionDataTooLarge(transaction_length));
307 } else if transaction_length == 0 {
308 return Err(FaucetError::NoDataReceived);
309 }
310
311 let mut buffer = vec![0; transaction_length];
313 stream.read_exact(&mut buffer).map_err(|err| {
314 info!(
315 "request_airdrop_transaction: buffer read_exact error: {:?}",
316 err
317 );
318 err
319 })?;
320
321 let transaction: Transaction = deserialize(&buffer)?;
322 Ok(transaction)
323}
324
325pub fn run_local_faucet_with_port(
326 faucet_keypair: Keypair,
327 sender: Sender<Result<SocketAddr, String>>,
328 time_input: Option<u64>,
329 per_time_cap: Option<u64>,
330 per_request_cap: Option<u64>,
331 port: u16, ) {
333 thread::spawn(move || {
334 let faucet_addr = socketaddr!(Ipv4Addr::UNSPECIFIED, port);
335 let faucet = Arc::new(Mutex::new(Faucet::new(
336 faucet_keypair,
337 time_input,
338 per_time_cap,
339 per_request_cap,
340 )));
341 let runtime = Runtime::new().unwrap();
342 runtime.block_on(run_faucet(faucet, faucet_addr, Some(sender)));
343 });
344}
345
346pub fn run_local_faucet(faucet_keypair: Keypair, per_time_cap: Option<u64>) -> SocketAddr {
348 let (sender, receiver) = unbounded();
349 run_local_faucet_with_port(faucet_keypair, sender, None, per_time_cap, None, 0);
350 receiver
351 .recv()
352 .expect("run_local_faucet")
353 .expect("faucet_addr")
354}
355
356pub async fn run_faucet(
357 faucet: Arc<Mutex<Faucet>>,
358 faucet_addr: SocketAddr,
359 sender: Option<Sender<Result<SocketAddr, String>>>,
360) {
361 let listener = TcpListener::bind(&faucet_addr).await;
362 if let Some(sender) = sender {
363 sender.send(
364 listener.as_ref().map(|listener| listener.local_addr().unwrap())
365 .map_err(|err| {
366 format!(
367 "Unable to bind faucet to {faucet_addr:?}, check the address is not already in use: {err}"
368 )
369 })
370 )
371 .unwrap();
372 }
373
374 let listener = match listener {
375 Err(err) => {
376 error!("Faucet failed to start: {}", err);
377 return;
378 }
379 Ok(listener) => listener,
380 };
381 info!("Faucet started. Listening on: {}", faucet_addr);
382 info!(
383 "Faucet account address: {}",
384 faucet.lock().unwrap().faucet_keypair.pubkey()
385 );
386
387 loop {
388 let faucet = faucet.clone();
389 match listener.accept().await {
390 Ok((stream, _)) => {
391 tokio::spawn(async move {
392 if let Err(e) = process(stream, faucet).await {
393 info!("failed to process request; error = {:?}", e);
394 }
395 });
396 }
397 Err(e) => debug!("failed to accept socket; error = {:?}", e),
398 }
399 }
400}
401
402async fn process(
403 mut stream: TokioTcpStream,
404 faucet: Arc<Mutex<Faucet>>,
405) -> Result<(), Box<dyn std::error::Error>> {
406 let mut request = vec![
407 0u8;
408 serialized_size(&FaucetRequest::GetAirdrop {
409 lamports: u64::default(),
410 to: Pubkey::default(),
411 blockhash: Hash::default(),
412 })
413 .unwrap() as usize
414 ];
415 while stream.read_exact(&mut request).await.is_ok() {
416 trace!("{:?}", request);
417
418 let response = {
419 match stream.peer_addr() {
420 Err(e) => {
421 info!("{:?}", e.into_inner());
422 ERROR_RESPONSE.to_vec()
423 }
424 Ok(peer_addr) => {
425 let ip = peer_addr.ip();
426 info!("Request IP: {:?}", ip);
427
428 match faucet.lock().unwrap().process_faucet_request(&request, ip) {
429 Ok(response_bytes) => {
430 trace!("Airdrop response_bytes: {:?}", response_bytes);
431 response_bytes
432 }
433 Err(e) => {
434 info!("Error in request: {}", e);
435 ERROR_RESPONSE.to_vec()
436 }
437 }
438 }
439 }
440 };
441 stream.write_all(&response).await?;
442 }
443
444 Ok(())
445}
446
447pub trait LimitByTime {
448 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64;
449 fn datapoint_info(&self, request_amount: u64, new_total: u64);
450}
451
452impl LimitByTime for IpAddr {
453 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
454 *faucet
455 .ip_cache
456 .entry(*self)
457 .and_modify(|total| *total = total.saturating_add(request_amount))
458 .or_insert(request_amount)
459 }
460
461 fn datapoint_info(&self, request_amount: u64, new_total: u64) {
462 datapoint_info!(
463 "faucet-airdrop",
464 ("request_amount", request_amount, i64),
465 ("ip", self.to_string(), String),
466 ("new_total", new_total, i64)
467 );
468 }
469}
470
471impl LimitByTime for Pubkey {
472 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
473 *faucet
474 .address_cache
475 .entry(*self)
476 .and_modify(|total| *total = total.saturating_add(request_amount))
477 .or_insert(request_amount)
478 }
479
480 fn datapoint_info(&self, request_amount: u64, new_total: u64) {
481 datapoint_info!(
482 "faucet-airdrop",
483 ("request_amount", request_amount, i64),
484 ("address", self.to_string(), String),
485 ("new_total", new_total, i64)
486 );
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use {super::*, solana_system_interface::instruction::SystemInstruction, std::time::Duration};
493
494 #[test]
495 fn test_check_time_request_limit() {
496 let keypair = Keypair::new();
497 let mut faucet = Faucet::new(keypair, None, Some(2), None);
498 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
499 assert!(faucet.check_time_request_limit(1, ip).is_ok());
500 assert!(faucet.check_time_request_limit(1, ip).is_ok());
501 assert!(faucet.check_time_request_limit(1, ip).is_err());
502
503 let address = Pubkey::new_unique();
504 assert!(faucet.check_time_request_limit(1, address).is_ok());
505 assert!(faucet.check_time_request_limit(1, address).is_ok());
506 assert!(faucet.check_time_request_limit(1, address).is_err());
507 }
508
509 #[test]
510 fn test_clear_caches() {
511 let keypair = Keypair::new();
512 let mut faucet = Faucet::new(keypair, None, None, None);
513 let ip = socketaddr!(Ipv4Addr::LOCALHOST, 0).ip();
514 assert_eq!(faucet.ip_cache.len(), 0);
515 faucet.check_time_request_limit(1, ip).unwrap();
516 assert_eq!(faucet.ip_cache.len(), 1);
517 faucet.clear_caches();
518 assert_eq!(faucet.ip_cache.len(), 0);
519 assert!(faucet.ip_cache.is_empty());
520
521 let address = Pubkey::new_unique();
522 assert_eq!(faucet.address_cache.len(), 0);
523 faucet.check_time_request_limit(1, address).unwrap();
524 assert_eq!(faucet.address_cache.len(), 1);
525 faucet.clear_caches();
526 assert_eq!(faucet.address_cache.len(), 0);
527 assert!(faucet.address_cache.is_empty());
528 }
529
530 #[test]
531 fn test_faucet_default_init() {
532 let keypair = Keypair::new();
533 let time_slice: Option<u64> = None;
534 let per_time_cap: Option<u64> = Some(200);
535 let per_request_cap: Option<u64> = Some(100);
536 let faucet = Faucet::new(keypair, time_slice, per_time_cap, per_request_cap);
537 assert_eq!(faucet.time_slice, Duration::new(TIME_SLICE, 0));
538 assert_eq!(faucet.per_time_cap, per_time_cap);
539 assert_eq!(faucet.per_request_cap, per_request_cap);
540 }
541
542 #[test]
543 fn test_faucet_build_airdrop_transaction() {
544 let to = Pubkey::new_unique();
545 let blockhash = Hash::default();
546 let request = FaucetRequest::GetAirdrop {
547 lamports: 2,
548 to,
549 blockhash,
550 };
551 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
552
553 let mint = Keypair::new();
554 let mint_pubkey = mint.pubkey();
555 let mut faucet = Faucet::new(mint, None, None, None);
556
557 if let FaucetTransaction::Airdrop(tx) =
558 faucet.build_airdrop_transaction(request, ip).unwrap()
559 {
560 let message = tx.message();
561
562 assert_eq!(tx.signatures.len(), 1);
563 assert_eq!(
564 message.account_keys,
565 vec![mint_pubkey, to, Pubkey::default()]
566 );
567 assert_eq!(message.recent_blockhash, blockhash);
568
569 assert_eq!(message.instructions.len(), 1);
570 let instruction: SystemInstruction =
571 deserialize(&message.instructions[0].data).unwrap();
572 assert_eq!(instruction, SystemInstruction::Transfer { lamports: 2 });
573 } else {
574 panic!("airdrop should succeed");
575 }
576
577 let mint = Keypair::new();
579 faucet = Faucet::new(mint, None, Some(2), None);
580 let _tx = faucet.build_airdrop_transaction(request, ip).unwrap(); let tx = faucet.build_airdrop_transaction(request, ip);
582 assert!(tx.is_err());
583
584 let mint = Keypair::new();
586 faucet = Faucet::new(mint, None, Some(2), None);
587 let ip = socketaddr!(Ipv4Addr::LOCALHOST, 0).ip();
588 let other = Pubkey::new_unique();
589 let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); let request1 = FaucetRequest::GetAirdrop {
591 lamports: 2,
592 to: other,
593 blockhash,
594 };
595 let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); let tx0 = faucet.build_airdrop_transaction(request, ip);
597 assert!(tx0.is_err());
598 let tx1 = faucet.build_airdrop_transaction(request1, ip);
599 assert!(tx1.is_err());
600
601 let mint = Keypair::new();
603 let ip = socketaddr!([203, 0, 113, 1], 0).ip();
604 let mut allowed_ips = HashSet::new();
605 allowed_ips.insert(ip);
606 faucet = Faucet::new_with_allowed_ips(mint, None, Some(2), None, allowed_ips);
607 let other = Pubkey::new_unique();
608 let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); let request1 = FaucetRequest::GetAirdrop {
610 lamports: 2,
611 to: other,
612 blockhash,
613 };
614 let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); let tx0 = faucet.build_airdrop_transaction(request, ip);
616 assert!(tx0.is_err());
617 let tx1 = faucet.build_airdrop_transaction(request1, ip);
618 assert!(tx1.is_err());
619
620 let mint = Keypair::new();
622 let mint_pubkey = mint.pubkey();
623 let mut faucet = Faucet::new(mint, None, None, Some(1));
624
625 if let FaucetTransaction::Memo((tx, memo)) =
626 faucet.build_airdrop_transaction(request, ip).unwrap()
627 {
628 let message = tx.message();
629
630 assert_eq!(tx.signatures.len(), 1);
631 assert_eq!(
632 message.account_keys,
633 vec![mint_pubkey, Pubkey::from(spl_memo::id().to_bytes())]
634 );
635 assert_eq!(message.recent_blockhash, blockhash);
636
637 assert_eq!(message.instructions.len(), 1);
638 let parsed_memo = std::str::from_utf8(&message.instructions[0].data).unwrap();
639 let expected_memo = "request too large; req: ◎0.000000002, cap: ◎0.000000001";
640 assert_eq!(parsed_memo, expected_memo);
641 assert_eq!(memo, expected_memo);
642 } else {
643 panic!("airdrop attempt should result in memo tx");
644 }
645 }
646
647 #[test]
648 fn test_process_faucet_request() {
649 let to = solana_pubkey::new_rand();
650 let blockhash = Hash::new_from_array(to.to_bytes());
651 let lamports = 50;
652 let req = FaucetRequest::GetAirdrop {
653 lamports,
654 blockhash,
655 to,
656 };
657 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
658 let req = serialize(&req).unwrap();
659
660 let keypair = Keypair::new();
661 let expected_instruction = transfer(&keypair.pubkey(), &to, lamports);
662 let message = Message::new(&[expected_instruction], Some(&keypair.pubkey()));
663 let expected_tx = Transaction::new(&[&keypair], message, blockhash);
664 let expected_bytes = serialize(&expected_tx).unwrap();
665 let mut expected_vec_with_length = (expected_bytes.len() as u16).to_le_bytes().to_vec();
666 expected_vec_with_length.extend_from_slice(&expected_bytes);
667
668 let mut faucet = Faucet::new(keypair, None, None, None);
669 let response = faucet.process_faucet_request(&req, ip);
670 let response_vec = response.unwrap().to_vec();
671 assert_eq!(expected_vec_with_length, response_vec);
672
673 let bad_bytes = "bad bytes".as_bytes();
674 assert!(faucet.process_faucet_request(bad_bytes, ip).is_err());
675 }
676}