Compare commits

..

No commits in common. "dd075508b7255d001321fac8798139aaefd89ad3" and "ae52b2b4e597fb9596da7cea6f903bd85dffc570" have entirely different histories.

2 changed files with 19 additions and 90 deletions

8
Cargo.lock generated
View File

@ -1315,9 +1315,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.103" version = "1.0.81"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -1753,9 +1753,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.108" version = "2.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917" checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -11,12 +11,12 @@ use serde::Deserialize;
use serde_json::json; use serde_json::json;
use std::env; use std::env;
use log::{info,warn,error,trace,debug}; use log::{info,warn,error,trace,debug};
use zmq::{Context, Socket, DEALER, DONTWAIT}; use zmq::{Context, Socket};
use std::str; use std::str;
use std::{thread, time::Duration}; use std::{thread, time::Duration};
use std::collections::HashMap; use std::collections::HashMap;
use byteorder::{LittleEndian, ReadBytesExt}; //use byteorder::{LittleEndian, ReadBytesExt};
use std::io::Cursor; //use std::io::Cursor;
use hex; use hex;
use std::error::Error as StdError; use std::error::Error as StdError;
@ -27,10 +27,6 @@ use openssl::sign::Signer;
use openssl::sign::Verifier; use openssl::sign::Verifier;
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use std::fs; use std::fs;
use std::time::Instant;
@ -332,7 +328,7 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
})) }))
.send().await?; .send().await?;
let body = &(response.text().await?); let body = &(response.text().await?);
info!("Report to welist({})\tSent: {}", welist_url,body); trace!("Body: {}", body);
}else { }else {
debug!("Not sending stats"); debug!("Not sending stats");
} }
@ -452,74 +448,6 @@ fn get_default_config()-> MyConfig {
info!("Default configuration file path is: {:#?}", file); info!("Default configuration file path is: {:#?}", file);
confy::load("bal-pusher",None).expect("cant_load") confy::load("bal-pusher",None).expect("cant_load")
} }
fn check_zmq_connection(endpoint: &str) -> bool {
trace!("check zmq connection");
let context = Context::new();
let socket = match context.socket(DEALER) {
Ok(sock) => sock,
Err(_) => return false,
};
if socket.connect(endpoint).is_err() {
return false;
}
// Try to send an empty message non-blocking
socket.send("", DONTWAIT).is_ok()
}
// Add this struct to monitor connection health
struct ConnectionMonitor {
last_message_time: Instant,
timeout: Duration,
consecutive_timeouts: u32,
max_consecutive_timeouts: u32,
}
impl ConnectionMonitor {
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
Self {
last_message_time: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
consecutive_timeouts: 0,
max_consecutive_timeouts: max_timeouts,
}
}
fn update(&mut self) {
self.last_message_time = Instant::now();
self.consecutive_timeouts = 0;
}
fn check_connection(&mut self) -> ConnectionStatus {
let elapsed = self.last_message_time.elapsed();
if elapsed > self.timeout {
self.consecutive_timeouts += 1;
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
ConnectionStatus::Lost(elapsed)
} else {
ConnectionStatus::Warning(elapsed)
}
} else {
ConnectionStatus::Healthy
}
}
fn reset(&mut self) {
self.consecutive_timeouts = 0;
self.last_message_time = Instant::now();
}
}
enum ConnectionStatus {
Healthy,
Warning(Duration),
Lost(Duration),
}
#[tokio::main] #[tokio::main]
async fn main()-> std::io::Result<()>{ async fn main()-> std::io::Result<()>{
env_logger::init(); env_logger::init();
@ -560,6 +488,7 @@ async fn main()-> std::io::Result<()>{
info!("Network: {}",arg_network); info!("Network: {}",arg_network);
let network_params = get_network_params(&cfg,network); let network_params = get_network_params(&cfg,network);
let context = Context::new(); let context = Context::new();
let socket: Socket = context.socket(zmq::SUB).unwrap(); let socket: Socket = context.socket(zmq::SUB).unwrap();
@ -572,28 +501,28 @@ async fn main()-> std::io::Result<()>{
let _ = main_result(&cfg,network_params).await; let _ = main_result(&cfg,network_params).await;
info!("waiting new blocks.."); info!("waiting new blocks..");
let mut last_seq:Vec<u8>=[0;4].to_vec(); let mut last_seq:Vec<u8>=[0;4].to_vec();
let mut counter=0;
let max=100;
loop { loop {
let message = socket.recv_multipart(0).unwrap(); let message = socket.recv_multipart(0).unwrap();
let topic = message[0].clone(); let topic = message[0].clone();
let body = message[1].clone(); let body = message[1].clone();
let seq = message[2].clone(); let seq = message[2].clone();
if last_seq >= seq {
continue
}
last_seq = seq; last_seq = seq;
//let mut sequence_str = "Unknown".to_string();
/*if seq.len()==4{
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
sequence_str = sequence.to_string();
}*/
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic")); debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
trace!("ZMQ:GET BODY: {}", hex::encode(&body)); trace!("ZMQ:GET BODY: {}", hex::encode(&body));
if topic == b"hashblock" { if topic == b"hashblock" {
info!("NEW BLOCK: {}", hex::encode(&body)); info!("NEW BLOCK: {}", hex::encode(&body));
//let cfg = cfg.clone();
let _ = main_result(&cfg,network_params).await; let _ = main_result(&cfg,network_params).await;
} }
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
} }
} }
fn seq_to_str(seq:&Vec<u8>) -> String{
if seq.len()==4{
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
return sequence.to_string();
}
"Unknown".to_string()
}