alloy_node_bindings/nodes/
reth.rs1use crate::{utils::extract_endpoint, NodeError, NODE_STARTUP_TIMEOUT};
4use alloy_genesis::Genesis;
5use rand::Rng;
6use std::{
7 ffi::OsString,
8 fs::create_dir,
9 io::{BufRead, BufReader},
10 path::PathBuf,
11 process::{Child, ChildStdout, Command, Stdio},
12 time::Instant,
13};
14use url::Url;
15
16const API: &str = "eth,net,web3,txpool,trace,rpc,reth,ots,admin,debug";
18
19const RETH: &str = "reth";
21
22const DEFAULT_HTTP_PORT: u16 = 8545;
24
25const DEFAULT_WS_PORT: u16 = 8546;
27
28const DEFAULT_AUTH_PORT: u16 = 8551;
30
31const DEFAULT_P2P_PORT: u16 = 30303;
33
34#[derive(Debug)]
38pub struct RethInstance {
39 pid: Child,
40 instance: u16,
41 http_port: u16,
42 ws_port: u16,
43 auth_port: Option<u16>,
44 p2p_port: Option<u16>,
45 ipc: Option<PathBuf>,
46 data_dir: Option<PathBuf>,
47 genesis: Option<Genesis>,
48}
49
50impl RethInstance {
51 pub const fn instance(&self) -> u16 {
53 self.instance
54 }
55
56 pub const fn http_port(&self) -> u16 {
58 self.http_port
59 }
60
61 pub const fn ws_port(&self) -> u16 {
63 self.ws_port
64 }
65
66 pub const fn auth_port(&self) -> Option<u16> {
68 self.auth_port
69 }
70
71 pub const fn p2p_port(&self) -> Option<u16> {
74 self.p2p_port
75 }
76
77 #[doc(alias = "http_endpoint")]
79 pub fn endpoint(&self) -> String {
80 format!("http://localhost:{}", self.http_port)
81 }
82
83 pub fn ws_endpoint(&self) -> String {
85 format!("ws://localhost:{}", self.ws_port)
86 }
87
88 pub fn ipc_endpoint(&self) -> String {
90 self.ipc.clone().map_or_else(|| "reth.ipc".to_string(), |ipc| ipc.display().to_string())
91 }
92
93 #[doc(alias = "http_endpoint_url")]
95 pub fn endpoint_url(&self) -> Url {
96 Url::parse(&self.endpoint()).unwrap()
97 }
98
99 pub fn ws_endpoint_url(&self) -> Url {
101 Url::parse(&self.ws_endpoint()).unwrap()
102 }
103
104 pub const fn data_dir(&self) -> Option<&PathBuf> {
106 self.data_dir.as_ref()
107 }
108
109 pub const fn genesis(&self) -> Option<&Genesis> {
111 self.genesis.as_ref()
112 }
113
114 pub fn stdout(&mut self) -> Result<ChildStdout, NodeError> {
119 self.pid.stdout.take().ok_or(NodeError::NoStdout)
120 }
121}
122
123impl Drop for RethInstance {
124 fn drop(&mut self) {
125 self.pid.kill().expect("could not kill reth");
126 }
127}
128
129#[derive(Clone, Debug, Default)]
148#[must_use = "This Builder struct does nothing unless it is `spawn`ed"]
149pub struct Reth {
150 dev: bool,
151 http_port: u16,
152 ws_port: u16,
153 auth_port: u16,
154 p2p_port: u16,
155 block_time: Option<String>,
156 instance: u16,
157 discovery_enabled: bool,
158 program: Option<PathBuf>,
159 ipc_path: Option<PathBuf>,
160 ipc_enabled: bool,
161 data_dir: Option<PathBuf>,
162 chain_or_path: Option<String>,
163 genesis: Option<Genesis>,
164 args: Vec<OsString>,
165 keep_stdout: bool,
166}
167
168impl Reth {
169 pub fn new() -> Self {
175 Self {
176 dev: false,
177 http_port: DEFAULT_HTTP_PORT,
178 ws_port: DEFAULT_WS_PORT,
179 auth_port: DEFAULT_AUTH_PORT,
180 p2p_port: DEFAULT_P2P_PORT,
181 block_time: None,
182 instance: rand::thread_rng().gen_range(1..200),
183 discovery_enabled: true,
184 program: None,
185 ipc_path: None,
186 ipc_enabled: false,
187 data_dir: None,
188 chain_or_path: None,
189 genesis: None,
190 args: Vec::new(),
191 keep_stdout: false,
192 }
193 }
194
195 pub fn at(path: impl Into<PathBuf>) -> Self {
208 Self::new().path(path)
209 }
210
211 pub fn path<T: Into<PathBuf>>(mut self, path: T) -> Self {
216 self.program = Some(path.into());
217 self
218 }
219
220 pub const fn dev(mut self) -> Self {
222 self.dev = true;
223 self
224 }
225
226 pub const fn http_port(mut self, http_port: u16) -> Self {
229 self.http_port = http_port;
230 self.instance = 0;
231 self
232 }
233
234 pub const fn ws_port(mut self, ws_port: u16) -> Self {
237 self.ws_port = ws_port;
238 self.instance = 0;
239 self
240 }
241
242 pub const fn auth_port(mut self, auth_port: u16) -> Self {
245 self.auth_port = auth_port;
246 self.instance = 0;
247 self
248 }
249
250 pub const fn p2p_port(mut self, p2p_port: u16) -> Self {
253 self.p2p_port = p2p_port;
254 self.instance = 0;
255 self
256 }
257
258 pub fn block_time(mut self, block_time: &str) -> Self {
262 self.block_time = Some(block_time.to_string());
263 self
264 }
265
266 pub const fn disable_discovery(mut self) -> Self {
268 self.discovery_enabled = false;
269 self
270 }
271
272 pub fn chain_or_path(mut self, chain_or_path: &str) -> Self {
274 self.chain_or_path = Some(chain_or_path.to_string());
275 self
276 }
277
278 pub const fn enable_ipc(mut self) -> Self {
280 self.ipc_enabled = true;
281 self
282 }
283
284 pub const fn instance(mut self, instance: u16) -> Self {
287 self.instance = instance;
288 self
289 }
290
291 pub fn ipc_path<T: Into<PathBuf>>(mut self, path: T) -> Self {
293 self.ipc_path = Some(path.into());
294 self
295 }
296
297 pub fn data_dir<T: Into<PathBuf>>(mut self, path: T) -> Self {
299 self.data_dir = Some(path.into());
300 self
301 }
302
303 pub fn genesis(mut self, genesis: Genesis) -> Self {
310 self.genesis = Some(genesis);
311 self
312 }
313
314 pub const fn keep_stdout(mut self) -> Self {
318 self.keep_stdout = true;
319 self
320 }
321
322 pub fn arg<T: Into<OsString>>(mut self, arg: T) -> Self {
326 self.args.push(arg.into());
327 self
328 }
329
330 pub fn args<I, S>(mut self, args: I) -> Self
334 where
335 I: IntoIterator<Item = S>,
336 S: Into<OsString>,
337 {
338 for arg in args {
339 self = self.arg(arg);
340 }
341 self
342 }
343
344 #[track_caller]
350 pub fn spawn(self) -> RethInstance {
351 self.try_spawn().unwrap()
352 }
353
354 pub fn try_spawn(self) -> Result<RethInstance, NodeError> {
356 let bin_path = self
357 .program
358 .as_ref()
359 .map_or_else(|| RETH.as_ref(), |bin| bin.as_os_str())
360 .to_os_string();
361 let mut cmd = Command::new(&bin_path);
362 cmd.stdout(Stdio::piped());
364
365 cmd.arg("node");
367
368 if self.http_port != DEFAULT_HTTP_PORT {
370 cmd.arg("--http.port").arg(self.http_port.to_string());
371 }
372
373 if self.ws_port != DEFAULT_WS_PORT {
374 cmd.arg("--ws.port").arg(self.ws_port.to_string());
375 }
376
377 if self.auth_port != DEFAULT_AUTH_PORT {
378 cmd.arg("--authrpc.port").arg(self.auth_port.to_string());
379 }
380
381 if self.p2p_port != DEFAULT_P2P_PORT {
382 cmd.arg("--discovery.port").arg(self.p2p_port.to_string());
383 }
384
385 if self.dev {
387 cmd.arg("--dev");
394
395 if let Some(block_time) = self.block_time {
397 cmd.arg("--dev.block-time").arg(block_time);
398 }
399 }
400
401 if !self.ipc_enabled {
403 cmd.arg("--ipcdisable");
404 }
405
406 cmd.arg("--http");
408 cmd.arg("--http.api").arg(API);
409
410 cmd.arg("--ws");
412 cmd.arg("--ws.api").arg(API);
413
414 if let Some(ipc) = &self.ipc_path {
416 cmd.arg("--ipcpath").arg(ipc);
417 }
418
419 if self.instance > 0 {
424 cmd.arg("--instance").arg(self.instance.to_string());
425 }
426
427 if let Some(data_dir) = &self.data_dir {
428 cmd.arg("--datadir").arg(data_dir);
429
430 if !data_dir.exists() {
432 create_dir(data_dir).map_err(NodeError::CreateDirError)?;
433 }
434 }
435
436 if self.discovery_enabled {
437 cmd.arg("--verbosity").arg("-vvv");
439 } else {
440 cmd.arg("--disable-discovery");
441 cmd.arg("--no-persist-peers");
442 }
443
444 if let Some(chain_or_path) = self.chain_or_path {
445 cmd.arg("--chain").arg(chain_or_path);
446 }
447
448 cmd.arg("--color").arg("never");
450
451 cmd.args(self.args);
453
454 let mut child = cmd.spawn().map_err(NodeError::SpawnError)?;
455
456 let stdout = child.stdout.take().ok_or(NodeError::NoStdout)?;
457
458 let start = Instant::now();
459 let mut reader = BufReader::new(stdout);
460
461 let mut http_port = 0;
462 let mut ws_port = 0;
463 let mut auth_port = 0;
464 let mut p2p_port = 0;
465
466 let mut ports_started = false;
467 let mut p2p_started = !self.discovery_enabled;
468
469 loop {
470 if start + NODE_STARTUP_TIMEOUT <= Instant::now() {
471 let _ = child.kill();
472 return Err(NodeError::Timeout);
473 }
474
475 let mut line = String::with_capacity(120);
476 reader.read_line(&mut line).map_err(NodeError::ReadLineError)?;
477
478 if line.contains("RPC HTTP server started") {
479 if let Some(addr) = extract_endpoint("url=", &line) {
480 http_port = addr.port();
481 }
482 }
483
484 if line.contains("RPC WS server started") {
485 if let Some(addr) = extract_endpoint("url=", &line) {
486 ws_port = addr.port();
487 }
488 }
489
490 if line.contains("RPC auth server started") {
491 if let Some(addr) = extract_endpoint("url=", &line) {
492 auth_port = addr.port();
493 }
494 }
495
496 if line.contains("ERROR") {
498 let _ = child.kill();
499 return Err(NodeError::Fatal(line));
500 }
501
502 if http_port != 0 && ws_port != 0 && auth_port != 0 {
503 ports_started = true;
504 }
505
506 if self.discovery_enabled {
507 if line.contains("Updated local ENR") {
508 if let Some(port) = extract_endpoint("IpV4 UDP Socket", &line) {
509 p2p_port = port.port();
510 p2p_started = true;
511 }
512 }
513 } else {
514 p2p_started = true;
515 }
516
517 if ports_started && p2p_started {
519 break;
520 }
521 }
522
523 if self.keep_stdout {
524 child.stdout = Some(reader.into_inner());
526 }
527
528 Ok(RethInstance {
529 pid: child,
530 instance: self.instance,
531 http_port,
532 ws_port,
533 p2p_port: (p2p_port != 0).then_some(p2p_port),
534 ipc: self.ipc_path,
535 data_dir: self.data_dir,
536 auth_port: Some(auth_port),
537 genesis: self.genesis,
538 })
539 }
540}