fix bal-pusher zmq connection stability
This commit is contained in:
parent
ae52b2b4e5
commit
4ac492ba79
8
Cargo.lock
generated
8
Cargo.lock
generated
@ -1315,9 +1315,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro2"
|
name = "proc-macro2"
|
||||||
version = "1.0.81"
|
version = "1.0.103"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba"
|
checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
@ -1753,9 +1753,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "2.0.60"
|
version = "2.0.108"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3"
|
checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
|||||||
@ -11,12 +11,12 @@ use serde::Deserialize;
|
|||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::env;
|
use std::env;
|
||||||
use log::{info,warn,error,trace,debug};
|
use log::{info,warn,error,trace,debug};
|
||||||
use zmq::{Context, Socket};
|
use zmq::{Context, Socket, DEALER, DONTWAIT};
|
||||||
use std::str;
|
use std::str;
|
||||||
use std::{thread, time::Duration};
|
use std::{thread, time::Duration};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
//use byteorder::{LittleEndian, ReadBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt};
|
||||||
//use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use hex;
|
use hex;
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
|
|
||||||
@ -27,6 +27,10 @@ use openssl::sign::Signer;
|
|||||||
use openssl::sign::Verifier;
|
use openssl::sign::Verifier;
|
||||||
use base64::{engine::general_purpose, Engine as _};
|
use base64::{engine::general_purpose, Engine as _};
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -448,6 +452,74 @@ fn get_default_config()-> MyConfig {
|
|||||||
info!("Default configuration file path is: {:#?}", file);
|
info!("Default configuration file path is: {:#?}", file);
|
||||||
confy::load("bal-pusher",None).expect("cant_load")
|
confy::load("bal-pusher",None).expect("cant_load")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_zmq_connection(endpoint: &str) -> bool {
|
||||||
|
trace!("check zmq connection");
|
||||||
|
let context = Context::new();
|
||||||
|
let socket = match context.socket(DEALER) {
|
||||||
|
Ok(sock) => sock,
|
||||||
|
Err(_) => return false,
|
||||||
|
};
|
||||||
|
|
||||||
|
if socket.connect(endpoint).is_err() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to send an empty message non-blocking
|
||||||
|
socket.send("", DONTWAIT).is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add this struct to monitor connection health
|
||||||
|
struct ConnectionMonitor {
|
||||||
|
last_message_time: Instant,
|
||||||
|
timeout: Duration,
|
||||||
|
consecutive_timeouts: u32,
|
||||||
|
max_consecutive_timeouts: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionMonitor {
|
||||||
|
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
|
||||||
|
Self {
|
||||||
|
last_message_time: Instant::now(),
|
||||||
|
timeout: Duration::from_secs(timeout_secs),
|
||||||
|
consecutive_timeouts: 0,
|
||||||
|
max_consecutive_timeouts: max_timeouts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update(&mut self) {
|
||||||
|
self.last_message_time = Instant::now();
|
||||||
|
self.consecutive_timeouts = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_connection(&mut self) -> ConnectionStatus {
|
||||||
|
let elapsed = self.last_message_time.elapsed();
|
||||||
|
|
||||||
|
if elapsed > self.timeout {
|
||||||
|
self.consecutive_timeouts += 1;
|
||||||
|
|
||||||
|
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
|
||||||
|
ConnectionStatus::Lost(elapsed)
|
||||||
|
} else {
|
||||||
|
ConnectionStatus::Warning(elapsed)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ConnectionStatus::Healthy
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&mut self) {
|
||||||
|
self.consecutive_timeouts = 0;
|
||||||
|
self.last_message_time = Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ConnectionStatus {
|
||||||
|
Healthy,
|
||||||
|
Warning(Duration),
|
||||||
|
Lost(Duration),
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main()-> std::io::Result<()>{
|
async fn main()-> std::io::Result<()>{
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
@ -489,6 +561,117 @@ async fn main()-> std::io::Result<()>{
|
|||||||
let network_params = get_network_params(&cfg,network);
|
let network_params = get_network_params(&cfg,network);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
let context = Context::new();
|
||||||
|
let mut socket: Socket = context.socket(zmq::SUB).unwrap();
|
||||||
|
|
||||||
|
let zmq_address = cfg.zmq_listener.clone();
|
||||||
|
info!("zmq listening on: {}", zmq_address);
|
||||||
|
socket.connect(&zmq_address).unwrap();
|
||||||
|
socket.set_subscribe(b"").unwrap();
|
||||||
|
|
||||||
|
let _ = main_result(&cfg, network_params).await;
|
||||||
|
info!("waiting new blocks..");
|
||||||
|
|
||||||
|
let mut last_seq: Vec<u8> = [0;4].to_vec();
|
||||||
|
let mut counter = 0;
|
||||||
|
let max = 100;
|
||||||
|
|
||||||
|
// Initialize connection monitor - 10 second timeout, 3 consecutive timeouts before declaring lost
|
||||||
|
let mut connection_monitor = ConnectionMonitor::new(10, 3);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
counter += 1;
|
||||||
|
|
||||||
|
// Check connection health every 100 iterations (about 10 seconds with your sleep)
|
||||||
|
if counter >= max {
|
||||||
|
match connection_monitor.check_connection() {
|
||||||
|
ConnectionStatus::Healthy => {
|
||||||
|
debug!("ZMQ connection healthy");
|
||||||
|
}
|
||||||
|
ConnectionStatus::Warning(elapsed) => {
|
||||||
|
warn!("ZMQ connection warning - no messages for {:?}", elapsed);
|
||||||
|
// Try to send a ping or do a lightweight check
|
||||||
|
if let Err(e) = socket.send("", zmq::DONTWAIT) {
|
||||||
|
warn!("ZMQ send test failed: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ConnectionStatus::Lost(elapsed) => {
|
||||||
|
error!("🚨 ZMQ connection LOST - no messages for {:?}", elapsed);
|
||||||
|
error!("Attempting to reconnect...");
|
||||||
|
|
||||||
|
// Reconnection logic
|
||||||
|
drop(socket);
|
||||||
|
let new_socket = context.socket(zmq::SUB).unwrap();
|
||||||
|
socket = new_socket;
|
||||||
|
socket.connect(&zmq_address).unwrap();
|
||||||
|
socket.set_subscribe(b"").unwrap();
|
||||||
|
|
||||||
|
connection_monitor.reset();
|
||||||
|
info!("Reconnected to ZMQ endpoint");
|
||||||
|
counter = 0;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
counter = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use non-blocking receive with timeout
|
||||||
|
let mut items = [socket.as_poll_item(zmq::POLLIN)];
|
||||||
|
match zmq::poll(&mut items, 1000) { // 1 second timeout
|
||||||
|
Ok(0) => {
|
||||||
|
// Timeout - no message received
|
||||||
|
debug!("No message received (timeout)");
|
||||||
|
thread::sleep(Duration::from_millis(100));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
if items[0].is_readable() {
|
||||||
|
match socket.recv_multipart(0) {
|
||||||
|
Ok(message) => {
|
||||||
|
// Update connection monitor - we got a message!
|
||||||
|
connection_monitor.update();
|
||||||
|
|
||||||
|
let topic = message[0].clone();
|
||||||
|
let body = message[1].clone();
|
||||||
|
let seq = message[2].clone();
|
||||||
|
|
||||||
|
if last_seq >= seq {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
last_seq = seq;
|
||||||
|
|
||||||
|
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
|
||||||
|
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
|
||||||
|
|
||||||
|
if topic == b"hashblock" {
|
||||||
|
info!("NEW BLOCK: {}", hex::encode(&body));
|
||||||
|
let _ = main_result(&cfg, network_params).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error receiving ZMQ message: {}", e);
|
||||||
|
thread::sleep(Duration::from_millis(1000)); // Longer sleep on error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("ZMQ poll error: {}", e);
|
||||||
|
thread::sleep(Duration::from_millis(1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
thread::sleep(Duration::from_millis(100));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
let context = Context::new();
|
let context = Context::new();
|
||||||
let socket: Socket = context.socket(zmq::SUB).unwrap();
|
let socket: Socket = context.socket(zmq::SUB).unwrap();
|
||||||
|
|
||||||
@ -501,14 +684,28 @@ async fn main()-> std::io::Result<()>{
|
|||||||
let _ = main_result(&cfg,network_params).await;
|
let _ = main_result(&cfg,network_params).await;
|
||||||
info!("waiting new blocks..");
|
info!("waiting new blocks..");
|
||||||
let mut last_seq:Vec<u8>=[0;4].to_vec();
|
let mut last_seq:Vec<u8>=[0;4].to_vec();
|
||||||
|
let mut counter=0;
|
||||||
|
let max=100;
|
||||||
loop {
|
loop {
|
||||||
let message = socket.recv_multipart(0).unwrap();
|
counter+=1;
|
||||||
|
println!("hello");
|
||||||
|
if counter >=max{
|
||||||
|
println!("hello");
|
||||||
|
check_zmq_connection(&zmq_address);
|
||||||
|
counter=0;
|
||||||
|
}
|
||||||
|
println!("viva la figa");
|
||||||
|
let message = recv_multipart(&socket,0).unwrap();
|
||||||
|
println!("e chi la castiga");
|
||||||
let topic = message[0].clone();
|
let topic = message[0].clone();
|
||||||
let body = message[1].clone();
|
let body = message[1].clone();
|
||||||
let seq = message[2].clone();
|
let seq = message[2].clone();
|
||||||
if last_seq >= seq {
|
println!("last_seq({}) - seq({}) ",seq_to_str(&last_seq),seq_to_str(&seq));
|
||||||
|
/*if last_seq == seq {
|
||||||
|
println!(">=");
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
last_seq = seq;
|
last_seq = seq;
|
||||||
//let mut sequence_str = "Unknown".to_string();
|
//let mut sequence_str = "Unknown".to_string();
|
||||||
/*if seq.len()==4{
|
/*if seq.len()==4{
|
||||||
@ -523,6 +720,34 @@ async fn main()-> std::io::Result<()>{
|
|||||||
//let cfg = cfg.clone();
|
//let cfg = cfg.clone();
|
||||||
let _ = main_result(&cfg,network_params).await;
|
let _ = main_result(&cfg,network_params).await;
|
||||||
}
|
}
|
||||||
|
println!("sleep");
|
||||||
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
|
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
|
||||||
|
println!("sleep done");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn recv_multipart(socket: &Socket, flags: i32) -> Result<Vec<Vec<u8>>,zmq::Error> {
|
||||||
|
let mut parts: Vec<Vec<u8>> = vec![];
|
||||||
|
loop {
|
||||||
|
println!("hellorec");
|
||||||
|
let part = socket.recv_bytes(flags)?;
|
||||||
|
println!("push");
|
||||||
|
parts.push(part);
|
||||||
|
|
||||||
|
println!("more parts");
|
||||||
|
let more_parts = socket.get_rcvmore()?;
|
||||||
|
println!("more parts done");
|
||||||
|
if !more_parts {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(parts)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn seq_to_str(seq:&Vec<u8>) -> String{
|
||||||
|
if seq.len()==4{
|
||||||
|
let mut rdr = Cursor::new(seq);
|
||||||
|
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
|
||||||
|
return sequence.to_string();
|
||||||
|
}
|
||||||
|
"Unknown".to_string()
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user