From 4ac492ba79d9246a835dd3c3fee2f45f75074916 Mon Sep 17 00:00:00 2001 From: bitcoinafterlife Date: Mon, 3 Nov 2025 07:42:09 -0400 Subject: [PATCH] fix bal-pusher zmq connection stability --- Cargo.lock | 8 +- src/bin/bal-pusher.rs | 235 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 234 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 346d404..3aee00b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,9 +1315,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.81" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" dependencies = [ "unicode-ident", ] @@ -1753,9 +1753,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.60" +version = "2.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" +checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917" dependencies = [ "proc-macro2", "quote", diff --git a/src/bin/bal-pusher.rs b/src/bin/bal-pusher.rs index c1a4189..e27c17c 100644 --- a/src/bin/bal-pusher.rs +++ b/src/bin/bal-pusher.rs @@ -11,12 +11,12 @@ use serde::Deserialize; use serde_json::json; use std::env; use log::{info,warn,error,trace,debug}; -use zmq::{Context, Socket}; +use zmq::{Context, Socket, DEALER, DONTWAIT}; use std::str; use std::{thread, time::Duration}; use std::collections::HashMap; -//use byteorder::{LittleEndian, ReadBytesExt}; -//use std::io::Cursor; +use byteorder::{LittleEndian, ReadBytesExt}; +use std::io::Cursor; use hex; use std::error::Error as StdError; @@ -27,6 +27,10 @@ use openssl::sign::Signer; use openssl::sign::Verifier; use base64::{engine::general_purpose, Engine as _}; use std::fs; +use std::time::Instant; + + + @@ -448,6 +452,74 @@ fn get_default_config()-> MyConfig { info!("Default configuration file path is: {:#?}", file); confy::load("bal-pusher",None).expect("cant_load") } + +fn check_zmq_connection(endpoint: &str) -> bool { + trace!("check zmq connection"); + let context = Context::new(); + let socket = match context.socket(DEALER) { + Ok(sock) => sock, + Err(_) => return false, + }; + + if socket.connect(endpoint).is_err() { + return false; + } + + // Try to send an empty message non-blocking + socket.send("", DONTWAIT).is_ok() +} + +// Add this struct to monitor connection health +struct ConnectionMonitor { + last_message_time: Instant, + timeout: Duration, + consecutive_timeouts: u32, + max_consecutive_timeouts: u32, +} + +impl ConnectionMonitor { + fn new(timeout_secs: u64, max_timeouts: u32) -> Self { + Self { + last_message_time: Instant::now(), + timeout: Duration::from_secs(timeout_secs), + consecutive_timeouts: 0, + max_consecutive_timeouts: max_timeouts, + } + } + + fn update(&mut self) { + self.last_message_time = Instant::now(); + self.consecutive_timeouts = 0; + } + + fn check_connection(&mut self) -> ConnectionStatus { + let elapsed = self.last_message_time.elapsed(); + + if elapsed > self.timeout { + self.consecutive_timeouts += 1; + + if self.consecutive_timeouts >= self.max_consecutive_timeouts { + ConnectionStatus::Lost(elapsed) + } else { + ConnectionStatus::Warning(elapsed) + } + } else { + ConnectionStatus::Healthy + } + } + + fn reset(&mut self) { + self.consecutive_timeouts = 0; + self.last_message_time = Instant::now(); + } +} + +enum ConnectionStatus { + Healthy, + Warning(Duration), + Lost(Duration), +} + #[tokio::main] async fn main()-> std::io::Result<()>{ env_logger::init(); @@ -489,6 +561,117 @@ async fn main()-> std::io::Result<()>{ let network_params = get_network_params(&cfg,network); + +/* + + let context = Context::new(); + let mut socket: Socket = context.socket(zmq::SUB).unwrap(); + + let zmq_address = cfg.zmq_listener.clone(); + info!("zmq listening on: {}", zmq_address); + socket.connect(&zmq_address).unwrap(); + socket.set_subscribe(b"").unwrap(); + + let _ = main_result(&cfg, network_params).await; + info!("waiting new blocks.."); + + let mut last_seq: Vec = [0;4].to_vec(); + let mut counter = 0; + let max = 100; + + // Initialize connection monitor - 10 second timeout, 3 consecutive timeouts before declaring lost + let mut connection_monitor = ConnectionMonitor::new(10, 3); + + loop { + counter += 1; + + // Check connection health every 100 iterations (about 10 seconds with your sleep) + if counter >= max { + match connection_monitor.check_connection() { + ConnectionStatus::Healthy => { + debug!("ZMQ connection healthy"); + } + ConnectionStatus::Warning(elapsed) => { + warn!("ZMQ connection warning - no messages for {:?}", elapsed); + // Try to send a ping or do a lightweight check + if let Err(e) = socket.send("", zmq::DONTWAIT) { + warn!("ZMQ send test failed: {}", e); + } + } + ConnectionStatus::Lost(elapsed) => { + error!("🚨 ZMQ connection LOST - no messages for {:?}", elapsed); + error!("Attempting to reconnect..."); + + // Reconnection logic + drop(socket); + let new_socket = context.socket(zmq::SUB).unwrap(); + socket = new_socket; + socket.connect(&zmq_address).unwrap(); + socket.set_subscribe(b"").unwrap(); + + connection_monitor.reset(); + info!("Reconnected to ZMQ endpoint"); + counter = 0; + continue; + } + } + counter = 0; + } + + // Use non-blocking receive with timeout + let mut items = [socket.as_poll_item(zmq::POLLIN)]; + match zmq::poll(&mut items, 1000) { // 1 second timeout + Ok(0) => { + // Timeout - no message received + debug!("No message received (timeout)"); + thread::sleep(Duration::from_millis(100)); + continue; + } + Ok(_) => { + if items[0].is_readable() { + match socket.recv_multipart(0) { + Ok(message) => { + // Update connection monitor - we got a message! + connection_monitor.update(); + + let topic = message[0].clone(); + let body = message[1].clone(); + let seq = message[2].clone(); + + if last_seq >= seq { + continue; + } + last_seq = seq; + + debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic")); + trace!("ZMQ:GET BODY: {}", hex::encode(&body)); + + if topic == b"hashblock" { + info!("NEW BLOCK: {}", hex::encode(&body)); + let _ = main_result(&cfg, network_params).await; + } + } + Err(e) => { + error!("Error receiving ZMQ message: {}", e); + thread::sleep(Duration::from_millis(1000)); // Longer sleep on error + } + } + } + } + Err(e) => { + error!("ZMQ poll error: {}", e); + thread::sleep(Duration::from_millis(1000)); + } + } + + thread::sleep(Duration::from_millis(100)); + } +} + +*/ + + + let context = Context::new(); let socket: Socket = context.socket(zmq::SUB).unwrap(); @@ -501,14 +684,28 @@ async fn main()-> std::io::Result<()>{ let _ = main_result(&cfg,network_params).await; info!("waiting new blocks.."); let mut last_seq:Vec=[0;4].to_vec(); + let mut counter=0; + let max=100; loop { - let message = socket.recv_multipart(0).unwrap(); + counter+=1; + println!("hello"); + if counter >=max{ + println!("hello"); + check_zmq_connection(&zmq_address); + counter=0; + } + println!("viva la figa"); + let message = recv_multipart(&socket,0).unwrap(); + println!("e chi la castiga"); let topic = message[0].clone(); let body = message[1].clone(); let seq = message[2].clone(); - if last_seq >= seq { + println!("last_seq({}) - seq({}) ",seq_to_str(&last_seq),seq_to_str(&seq)); + /*if last_seq == seq { + println!(">="); continue } + */ last_seq = seq; //let mut sequence_str = "Unknown".to_string(); /*if seq.len()==4{ @@ -523,6 +720,34 @@ async fn main()-> std::io::Result<()>{ //let cfg = cfg.clone(); let _ = main_result(&cfg,network_params).await; } + println!("sleep"); thread::sleep(Duration::from_millis(100)); // Sleep for 100ms + println!("sleep done"); } } +pub fn recv_multipart(socket: &Socket, flags: i32) -> Result>,zmq::Error> { + let mut parts: Vec> = vec![]; + loop { + println!("hellorec"); + let part = socket.recv_bytes(flags)?; + println!("push"); + parts.push(part); + + println!("more parts"); + let more_parts = socket.get_rcvmore()?; + println!("more parts done"); + if !more_parts { + break; + } + } + Ok(parts) + } + +fn seq_to_str(seq:&Vec) -> String{ + if seq.len()==4{ + let mut rdr = Cursor::new(seq); + let sequence = rdr.read_u32::().expect("Failed to read integer"); + return sequence.to_string(); + } + "Unknown".to_string() +}