1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use {
rdkafka::{
config::FromClientConfig,
error::KafkaResult,
producer::{DefaultProducerContext, ThreadedProducer},
ClientConfig,
},
serde::Deserialize,
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPluginError, Result as PluginResult,
},
std::{collections::HashMap, fs::File, path::Path},
};
#[derive(Deserialize)]
pub struct Config {
pub kafka: HashMap<String, String>,
#[serde(default)]
pub shutdown_timeout_ms: u64,
#[serde(default)]
pub update_account_topic: String,
#[serde(default)]
pub slot_status_topic: String,
#[serde(default)]
pub transaction_topic: String,
#[serde(default)]
pub program_ignores: Vec<String>,
#[serde(default)]
pub publish_all_accounts: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
kafka: HashMap::new(),
shutdown_timeout_ms: 30_000,
update_account_topic: "".to_owned(),
slot_status_topic: "".to_owned(),
transaction_topic: "".to_owned(),
program_ignores: Vec::new(),
publish_all_accounts: false,
}
}
}
impl Config {
pub fn read_from<P: AsRef<Path>>(config_path: P) -> PluginResult<Self> {
let file = File::open(config_path)?;
let mut this: Self = serde_json::from_reader(file)
.map_err(|e| GeyserPluginError::ConfigFileReadError { msg: e.to_string() })?;
this.fill_defaults();
Ok(this)
}
pub fn producer(&self) -> KafkaResult<Producer> {
let mut config = ClientConfig::new();
for (k, v) in self.kafka.iter() {
config.set(k, v);
}
ThreadedProducer::from_config(&config)
}
fn set_default(&mut self, k: &'static str, v: &'static str) {
if !self.kafka.contains_key(k) {
self.kafka.insert(k.to_owned(), v.to_owned());
}
}
fn fill_defaults(&mut self) {
self.set_default("request.required.acks", "1");
self.set_default("message.timeout.ms", "30000");
self.set_default("compression.type", "lz4");
self.set_default("partitioner", "murmur2_random");
}
}
pub type Producer = ThreadedProducer<DefaultProducerContext>;