1use {
8 bincode::{deserialize, serialize, serialized_size},
9 byteorder::{ByteOrder, LittleEndian},
10 crossbeam_channel::{unbounded, Sender},
11 log::*,
12 serde_derive::{Deserialize, Serialize},
13 solana_metrics::datapoint_info,
14 solana_sdk::{
15 hash::Hash,
16 instruction::Instruction,
17 message::Message,
18 native_token::lamports_to_sol,
19 packet::PACKET_DATA_SIZE,
20 pubkey::Pubkey,
21 signature::{Keypair, Signer},
22 system_instruction,
23 transaction::Transaction,
24 },
25 std::{
26 collections::{HashMap, HashSet},
27 io::{Read, Write},
28 net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
29 sync::{Arc, Mutex},
30 thread,
31 time::Duration,
32 },
33 thiserror::Error,
34 tokio::{
35 io::{AsyncReadExt, AsyncWriteExt},
36 net::{TcpListener, TcpStream as TokioTcpStream},
37 runtime::Runtime,
38 },
39};
40
41#[macro_export]
42macro_rules! socketaddr {
43 ($ip:expr, $port:expr) => {
44 SocketAddr::from((Ipv4Addr::from($ip), $port))
45 };
46 ($str:expr) => {{
47 let a: SocketAddr = $str.parse().unwrap();
48 a
49 }};
50}
51
52const ERROR_RESPONSE: [u8; 2] = 0u16.to_le_bytes();
53
54pub const TIME_SLICE: u64 = 60;
55pub const FAUCET_PORT: u16 = 9900;
56
57#[derive(Error, Debug)]
58pub enum FaucetError {
59 #[error("IO Error: {0}")]
60 IoError(#[from] std::io::Error),
61
62 #[error("serialization error: {0}")]
63 Serialize(#[from] bincode::Error),
64
65 #[error("transaction_length from faucet exceeds limit: {0}")]
66 TransactionDataTooLarge(usize),
67
68 #[error("transaction_length from faucet: 0")]
69 NoDataReceived,
70
71 #[error("request too large; req: ◎{0}, cap: ◎{1}")]
72 PerRequestCapExceeded(f64, f64),
73
74 #[error("limit reached; req: ◎{0}, to: {1}, current: ◎{2}, cap: ◎{3}")]
75 PerTimeCapExceeded(f64, String, f64, f64),
76}
77
78#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
79pub enum FaucetRequest {
80 GetAirdrop {
81 lamports: u64,
82 to: Pubkey,
83 blockhash: Hash,
84 },
85}
86
87pub enum FaucetTransaction {
88 Airdrop(Transaction),
89 Memo((Transaction, String)),
90}
91
92pub struct Faucet {
93 faucet_keypair: Keypair,
94 ip_cache: HashMap<IpAddr, u64>,
95 address_cache: HashMap<Pubkey, u64>,
96 pub time_slice: Duration,
97 per_time_cap: Option<u64>,
98 per_request_cap: Option<u64>,
99 allowed_ips: HashSet<IpAddr>,
100}
101
102impl Faucet {
103 pub fn new(
104 faucet_keypair: Keypair,
105 time_input: Option<u64>,
106 per_time_cap: Option<u64>,
107 per_request_cap: Option<u64>,
108 ) -> Self {
109 Self::new_with_allowed_ips(
110 faucet_keypair,
111 time_input,
112 per_time_cap,
113 per_request_cap,
114 HashSet::new(),
115 )
116 }
117
118 pub fn new_with_allowed_ips(
119 faucet_keypair: Keypair,
120 time_input: Option<u64>,
121 per_time_cap: Option<u64>,
122 per_request_cap: Option<u64>,
123 allowed_ips: HashSet<IpAddr>,
124 ) -> Self {
125 let time_slice = Duration::new(time_input.unwrap_or(TIME_SLICE), 0);
126 if let Some((per_request_cap, per_time_cap)) = per_request_cap.zip(per_time_cap) {
127 if per_time_cap < per_request_cap {
128 warn!(
129 "per_time_cap {} SOL < per_request_cap {} SOL; \
130 maximum single requests will fail",
131 lamports_to_sol(per_time_cap),
132 lamports_to_sol(per_request_cap),
133 );
134 }
135 }
136 Self {
137 faucet_keypair,
138 ip_cache: HashMap::new(),
139 address_cache: HashMap::new(),
140 time_slice,
141 per_time_cap,
142 per_request_cap,
143 allowed_ips,
144 }
145 }
146
147 pub fn check_time_request_limit<T: LimitByTime + std::fmt::Display>(
148 &mut self,
149 request_amount: u64,
150 to: T,
151 ) -> Result<(), FaucetError> {
152 let new_total = to.check_cache(self, request_amount);
153 to.datapoint_info(request_amount, new_total);
154 if let Some(cap) = self.per_time_cap {
155 if new_total > cap {
156 return Err(FaucetError::PerTimeCapExceeded(
157 lamports_to_sol(request_amount),
158 to.to_string(),
159 lamports_to_sol(new_total),
160 lamports_to_sol(cap),
161 ));
162 }
163 }
164 Ok(())
165 }
166
167 pub fn clear_caches(&mut self) {
168 self.ip_cache.clear();
169 self.address_cache.clear();
170 }
171
172 pub fn build_airdrop_transaction(
177 &mut self,
178 req: FaucetRequest,
179 ip: IpAddr,
180 ) -> Result<FaucetTransaction, FaucetError> {
181 trace!("build_airdrop_transaction: {:?}", req);
182 match req {
183 FaucetRequest::GetAirdrop {
184 lamports,
185 to,
186 blockhash,
187 } => {
188 let mint_pubkey = self.faucet_keypair.pubkey();
189 info!(
190 "Requesting airdrop of {} SOL to {:?}",
191 lamports_to_sol(lamports),
192 to
193 );
194
195 if let Some(cap) = self.per_request_cap {
196 if lamports > cap {
197 let memo = format!(
198 "{}",
199 FaucetError::PerRequestCapExceeded(
200 lamports_to_sol(lamports),
201 lamports_to_sol(cap),
202 )
203 );
204 let memo_instruction = Instruction {
205 program_id: Pubkey::from(spl_memo::id().to_bytes()),
206 accounts: vec![],
207 data: memo.as_bytes().to_vec(),
208 };
209 let message = Message::new(&[memo_instruction], Some(&mint_pubkey));
210 return Ok(FaucetTransaction::Memo((
211 Transaction::new(&[&self.faucet_keypair], message, blockhash),
212 memo,
213 )));
214 }
215 }
216 if !ip.is_loopback() && !self.allowed_ips.contains(&ip) {
217 self.check_time_request_limit(lamports, ip)?;
218 }
219 self.check_time_request_limit(lamports, to)?;
220
221 let transfer_instruction =
222 system_instruction::transfer(&mint_pubkey, &to, lamports);
223 let message = Message::new(&[transfer_instruction], Some(&mint_pubkey));
224 Ok(FaucetTransaction::Airdrop(Transaction::new(
225 &[&self.faucet_keypair],
226 message,
227 blockhash,
228 )))
229 }
230 }
231 }
232
233 pub fn process_faucet_request(
235 &mut self,
236 bytes: &[u8],
237 ip: IpAddr,
238 ) -> Result<Vec<u8>, FaucetError> {
239 let req: FaucetRequest = deserialize(bytes)?;
240
241 info!("Airdrop transaction requested...{:?}", req);
242 let res = self.build_airdrop_transaction(req, ip);
243 match res {
244 Ok(tx) => {
245 let tx = match tx {
246 FaucetTransaction::Airdrop(tx) => {
247 info!("Airdrop transaction granted");
248 tx
249 }
250 FaucetTransaction::Memo((tx, memo)) => {
251 warn!("Memo transaction returned: {}", memo);
252 tx
253 }
254 };
255 let response_vec = bincode::serialize(&tx)?;
256
257 let mut response_vec_with_length = vec![0; 2];
258 LittleEndian::write_u16(&mut response_vec_with_length, response_vec.len() as u16);
259 response_vec_with_length.extend_from_slice(&response_vec);
260
261 Ok(response_vec_with_length)
262 }
263 Err(err) => {
264 warn!("Airdrop transaction failed: {}", err);
265 Err(err)
266 }
267 }
268 }
269}
270
271impl Drop for Faucet {
272 fn drop(&mut self) {
273 solana_metrics::flush();
274 }
275}
276
277pub fn request_airdrop_transaction(
278 faucet_addr: &SocketAddr,
279 id: &Pubkey,
280 lamports: u64,
281 blockhash: Hash,
282) -> Result<Transaction, FaucetError> {
283 info!(
284 "request_airdrop_transaction: faucet_addr={} id={} lamports={} blockhash={}",
285 faucet_addr, id, lamports, blockhash
286 );
287
288 let mut stream = TcpStream::connect_timeout(faucet_addr, Duration::new(3, 0))?;
289 stream.set_read_timeout(Some(Duration::new(10, 0)))?;
290 let req = FaucetRequest::GetAirdrop {
291 lamports,
292 blockhash,
293 to: *id,
294 };
295 let req = serialize(&req).expect("serialize faucet request");
296 stream.write_all(&req)?;
297
298 let mut buffer = [0; 2];
300 stream.read_exact(&mut buffer).map_err(|err| {
301 info!(
302 "request_airdrop_transaction: buffer length read_exact error: {:?}",
303 err
304 );
305 err
306 })?;
307 let transaction_length = LittleEndian::read_u16(&buffer) as usize;
308 if transaction_length > PACKET_DATA_SIZE {
309 return Err(FaucetError::TransactionDataTooLarge(transaction_length));
310 } else if transaction_length == 0 {
311 return Err(FaucetError::NoDataReceived);
312 }
313
314 let mut buffer = vec![0; transaction_length];
316 stream.read_exact(&mut buffer).map_err(|err| {
317 info!(
318 "request_airdrop_transaction: buffer read_exact error: {:?}",
319 err
320 );
321 err
322 })?;
323
324 let transaction: Transaction = deserialize(&buffer)?;
325 Ok(transaction)
326}
327
328pub fn run_local_faucet_with_port(
329 faucet_keypair: Keypair,
330 sender: Sender<Result<SocketAddr, String>>,
331 time_input: Option<u64>,
332 per_time_cap: Option<u64>,
333 per_request_cap: Option<u64>,
334 port: u16, ) {
336 thread::spawn(move || {
337 let faucet_addr = socketaddr!(Ipv4Addr::UNSPECIFIED, port);
338 let faucet = Arc::new(Mutex::new(Faucet::new(
339 faucet_keypair,
340 time_input,
341 per_time_cap,
342 per_request_cap,
343 )));
344 let runtime = Runtime::new().unwrap();
345 runtime.block_on(run_faucet(faucet, faucet_addr, Some(sender)));
346 });
347}
348
349pub fn run_local_faucet(faucet_keypair: Keypair, per_time_cap: Option<u64>) -> SocketAddr {
351 let (sender, receiver) = unbounded();
352 run_local_faucet_with_port(faucet_keypair, sender, None, per_time_cap, None, 0);
353 receiver
354 .recv()
355 .expect("run_local_faucet")
356 .expect("faucet_addr")
357}
358
359pub async fn run_faucet(
360 faucet: Arc<Mutex<Faucet>>,
361 faucet_addr: SocketAddr,
362 sender: Option<Sender<Result<SocketAddr, String>>>,
363) {
364 let listener = TcpListener::bind(&faucet_addr).await;
365 if let Some(sender) = sender {
366 sender.send(
367 listener.as_ref().map(|listener| listener.local_addr().unwrap())
368 .map_err(|err| {
369 format!(
370 "Unable to bind faucet to {faucet_addr:?}, check the address is not already in use: {err}"
371 )
372 })
373 )
374 .unwrap();
375 }
376
377 let listener = match listener {
378 Err(err) => {
379 error!("Faucet failed to start: {}", err);
380 return;
381 }
382 Ok(listener) => listener,
383 };
384 info!("Faucet started. Listening on: {}", faucet_addr);
385 info!(
386 "Faucet account address: {}",
387 faucet.lock().unwrap().faucet_keypair.pubkey()
388 );
389
390 loop {
391 let faucet = faucet.clone();
392 match listener.accept().await {
393 Ok((stream, _)) => {
394 tokio::spawn(async move {
395 if let Err(e) = process(stream, faucet).await {
396 info!("failed to process request; error = {:?}", e);
397 }
398 });
399 }
400 Err(e) => debug!("failed to accept socket; error = {:?}", e),
401 }
402 }
403}
404
405async fn process(
406 mut stream: TokioTcpStream,
407 faucet: Arc<Mutex<Faucet>>,
408) -> Result<(), Box<dyn std::error::Error>> {
409 let mut request = vec![
410 0u8;
411 serialized_size(&FaucetRequest::GetAirdrop {
412 lamports: u64::default(),
413 to: Pubkey::default(),
414 blockhash: Hash::default(),
415 })
416 .unwrap() as usize
417 ];
418 while stream.read_exact(&mut request).await.is_ok() {
419 trace!("{:?}", request);
420
421 let response = {
422 match stream.peer_addr() {
423 Err(e) => {
424 info!("{:?}", e.into_inner());
425 ERROR_RESPONSE.to_vec()
426 }
427 Ok(peer_addr) => {
428 let ip = peer_addr.ip();
429 info!("Request IP: {:?}", ip);
430
431 match faucet.lock().unwrap().process_faucet_request(&request, ip) {
432 Ok(response_bytes) => {
433 trace!("Airdrop response_bytes: {:?}", response_bytes);
434 response_bytes
435 }
436 Err(e) => {
437 info!("Error in request: {}", e);
438 ERROR_RESPONSE.to_vec()
439 }
440 }
441 }
442 }
443 };
444 stream.write_all(&response).await?;
445 }
446
447 Ok(())
448}
449
450pub trait LimitByTime {
451 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64;
452 fn datapoint_info(&self, request_amount: u64, new_total: u64);
453}
454
455impl LimitByTime for IpAddr {
456 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
457 *faucet
458 .ip_cache
459 .entry(*self)
460 .and_modify(|total| *total = total.saturating_add(request_amount))
461 .or_insert(request_amount)
462 }
463
464 fn datapoint_info(&self, request_amount: u64, new_total: u64) {
465 datapoint_info!(
466 "faucet-airdrop",
467 ("request_amount", request_amount, i64),
468 ("ip", self.to_string(), String),
469 ("new_total", new_total, i64)
470 );
471 }
472}
473
474impl LimitByTime for Pubkey {
475 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
476 *faucet
477 .address_cache
478 .entry(*self)
479 .and_modify(|total| *total = total.saturating_add(request_amount))
480 .or_insert(request_amount)
481 }
482
483 fn datapoint_info(&self, request_amount: u64, new_total: u64) {
484 datapoint_info!(
485 "faucet-airdrop",
486 ("request_amount", request_amount, i64),
487 ("address", self.to_string(), String),
488 ("new_total", new_total, i64)
489 );
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use {super::*, solana_sdk::system_instruction::SystemInstruction, std::time::Duration};
496
497 #[test]
498 fn test_check_time_request_limit() {
499 let keypair = Keypair::new();
500 let mut faucet = Faucet::new(keypair, None, Some(2), None);
501 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
502 assert!(faucet.check_time_request_limit(1, ip).is_ok());
503 assert!(faucet.check_time_request_limit(1, ip).is_ok());
504 assert!(faucet.check_time_request_limit(1, ip).is_err());
505
506 let address = Pubkey::new_unique();
507 assert!(faucet.check_time_request_limit(1, address).is_ok());
508 assert!(faucet.check_time_request_limit(1, address).is_ok());
509 assert!(faucet.check_time_request_limit(1, address).is_err());
510 }
511
512 #[test]
513 fn test_clear_caches() {
514 let keypair = Keypair::new();
515 let mut faucet = Faucet::new(keypair, None, None, None);
516 let ip = socketaddr!(Ipv4Addr::LOCALHOST, 0).ip();
517 assert_eq!(faucet.ip_cache.len(), 0);
518 faucet.check_time_request_limit(1, ip).unwrap();
519 assert_eq!(faucet.ip_cache.len(), 1);
520 faucet.clear_caches();
521 assert_eq!(faucet.ip_cache.len(), 0);
522 assert!(faucet.ip_cache.is_empty());
523
524 let address = Pubkey::new_unique();
525 assert_eq!(faucet.address_cache.len(), 0);
526 faucet.check_time_request_limit(1, address).unwrap();
527 assert_eq!(faucet.address_cache.len(), 1);
528 faucet.clear_caches();
529 assert_eq!(faucet.address_cache.len(), 0);
530 assert!(faucet.address_cache.is_empty());
531 }
532
533 #[test]
534 fn test_faucet_default_init() {
535 let keypair = Keypair::new();
536 let time_slice: Option<u64> = None;
537 let per_time_cap: Option<u64> = Some(200);
538 let per_request_cap: Option<u64> = Some(100);
539 let faucet = Faucet::new(keypair, time_slice, per_time_cap, per_request_cap);
540 assert_eq!(faucet.time_slice, Duration::new(TIME_SLICE, 0));
541 assert_eq!(faucet.per_time_cap, per_time_cap);
542 assert_eq!(faucet.per_request_cap, per_request_cap);
543 }
544
545 #[test]
546 fn test_faucet_build_airdrop_transaction() {
547 let to = Pubkey::new_unique();
548 let blockhash = Hash::default();
549 let request = FaucetRequest::GetAirdrop {
550 lamports: 2,
551 to,
552 blockhash,
553 };
554 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
555
556 let mint = Keypair::new();
557 let mint_pubkey = mint.pubkey();
558 let mut faucet = Faucet::new(mint, None, None, None);
559
560 if let FaucetTransaction::Airdrop(tx) =
561 faucet.build_airdrop_transaction(request, ip).unwrap()
562 {
563 let message = tx.message();
564
565 assert_eq!(tx.signatures.len(), 1);
566 assert_eq!(
567 message.account_keys,
568 vec![mint_pubkey, to, Pubkey::default()]
569 );
570 assert_eq!(message.recent_blockhash, blockhash);
571
572 assert_eq!(message.instructions.len(), 1);
573 let instruction: SystemInstruction =
574 deserialize(&message.instructions[0].data).unwrap();
575 assert_eq!(instruction, SystemInstruction::Transfer { lamports: 2 });
576 } else {
577 panic!("airdrop should succeed");
578 }
579
580 let mint = Keypair::new();
582 faucet = Faucet::new(mint, None, Some(2), None);
583 let _tx = faucet.build_airdrop_transaction(request, ip).unwrap(); let tx = faucet.build_airdrop_transaction(request, ip);
585 assert!(tx.is_err());
586
587 let mint = Keypair::new();
589 faucet = Faucet::new(mint, None, Some(2), None);
590 let ip = socketaddr!(Ipv4Addr::LOCALHOST, 0).ip();
591 let other = Pubkey::new_unique();
592 let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); let request1 = FaucetRequest::GetAirdrop {
594 lamports: 2,
595 to: other,
596 blockhash,
597 };
598 let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); let tx0 = faucet.build_airdrop_transaction(request, ip);
600 assert!(tx0.is_err());
601 let tx1 = faucet.build_airdrop_transaction(request1, ip);
602 assert!(tx1.is_err());
603
604 let mint = Keypair::new();
606 let ip = socketaddr!([203, 0, 113, 1], 0).ip();
607 let mut allowed_ips = HashSet::new();
608 allowed_ips.insert(ip);
609 faucet = Faucet::new_with_allowed_ips(mint, None, Some(2), None, allowed_ips);
610 let other = Pubkey::new_unique();
611 let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); let request1 = FaucetRequest::GetAirdrop {
613 lamports: 2,
614 to: other,
615 blockhash,
616 };
617 let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); let tx0 = faucet.build_airdrop_transaction(request, ip);
619 assert!(tx0.is_err());
620 let tx1 = faucet.build_airdrop_transaction(request1, ip);
621 assert!(tx1.is_err());
622
623 let mint = Keypair::new();
625 let mint_pubkey = mint.pubkey();
626 let mut faucet = Faucet::new(mint, None, None, Some(1));
627
628 if let FaucetTransaction::Memo((tx, memo)) =
629 faucet.build_airdrop_transaction(request, ip).unwrap()
630 {
631 let message = tx.message();
632
633 assert_eq!(tx.signatures.len(), 1);
634 assert_eq!(
635 message.account_keys,
636 vec![mint_pubkey, Pubkey::from(spl_memo::id().to_bytes())]
637 );
638 assert_eq!(message.recent_blockhash, blockhash);
639
640 assert_eq!(message.instructions.len(), 1);
641 let parsed_memo = std::str::from_utf8(&message.instructions[0].data).unwrap();
642 let expected_memo = "request too large; req: ◎0.000000002, cap: ◎0.000000001";
643 assert_eq!(parsed_memo, expected_memo);
644 assert_eq!(memo, expected_memo);
645 } else {
646 panic!("airdrop attempt should result in memo tx");
647 }
648 }
649
650 #[test]
651 fn test_process_faucet_request() {
652 let to = solana_sdk::pubkey::new_rand();
653 let blockhash = Hash::new(to.as_ref());
654 let lamports = 50;
655 let req = FaucetRequest::GetAirdrop {
656 lamports,
657 blockhash,
658 to,
659 };
660 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
661 let req = serialize(&req).unwrap();
662
663 let keypair = Keypair::new();
664 let expected_instruction = system_instruction::transfer(&keypair.pubkey(), &to, lamports);
665 let message = Message::new(&[expected_instruction], Some(&keypair.pubkey()));
666 let expected_tx = Transaction::new(&[&keypair], message, blockhash);
667 let expected_bytes = serialize(&expected_tx).unwrap();
668 let mut expected_vec_with_length = vec![0; 2];
669 LittleEndian::write_u16(&mut expected_vec_with_length, expected_bytes.len() as u16);
670 expected_vec_with_length.extend_from_slice(&expected_bytes);
671
672 let mut faucet = Faucet::new(keypair, None, None, None);
673 let response = faucet.process_faucet_request(&req, ip);
674 let response_vec = response.unwrap().to_vec();
675 assert_eq!(expected_vec_with_length, response_vec);
676
677 let bad_bytes = "bad bytes".as_bytes();
678 assert!(faucet.process_faucet_request(bad_bytes, ip).is_err());
679 }
680}