Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d5fe22cc14 | |||
| 83749afddd | |||
| dd075508b7 | |||
| 4ac492ba79 |
8
Cargo.lock
generated
8
Cargo.lock
generated
@ -1315,9 +1315,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.81"
|
||||
version = "1.0.103"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba"
|
||||
checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
|
||||
dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
@ -1753,9 +1753,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.60"
|
||||
version = "2.0.108"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3"
|
||||
checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
@ -11,12 +11,12 @@ use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
use std::env;
|
||||
use log::{info,warn,error,trace,debug};
|
||||
use zmq::{Context, Socket};
|
||||
use zmq::{Context, Socket, DEALER, DONTWAIT};
|
||||
use std::str;
|
||||
use std::{thread, time::Duration};
|
||||
use std::collections::HashMap;
|
||||
//use byteorder::{LittleEndian, ReadBytesExt};
|
||||
//use std::io::Cursor;
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use std::io::Cursor;
|
||||
use hex;
|
||||
use std::error::Error as StdError;
|
||||
|
||||
@ -27,12 +27,16 @@ use openssl::sign::Signer;
|
||||
use openssl::sign::Verifier;
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
use std::fs;
|
||||
use std::time::Instant;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
const LOCKTIME_THRESHOLD:i64 = 5000000;
|
||||
const VERSION:&str = "0.0.1";
|
||||
const VERSION:&str = "0.0.2";
|
||||
#[derive(Debug, Clone,Serialize, Deserialize)]
|
||||
struct MyConfig {
|
||||
zmq_listener: String,
|
||||
@ -328,7 +332,7 @@ async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> R
|
||||
}))
|
||||
.send().await?;
|
||||
let body = &(response.text().await?);
|
||||
trace!("Body: {}", body);
|
||||
info!("Report to welist({})\tSent: {}", welist_url,body);
|
||||
}else {
|
||||
debug!("Not sending stats");
|
||||
}
|
||||
@ -448,6 +452,74 @@ fn get_default_config()-> MyConfig {
|
||||
info!("Default configuration file path is: {:#?}", file);
|
||||
confy::load("bal-pusher",None).expect("cant_load")
|
||||
}
|
||||
|
||||
fn check_zmq_connection(endpoint: &str) -> bool {
|
||||
trace!("check zmq connection");
|
||||
let context = Context::new();
|
||||
let socket = match context.socket(DEALER) {
|
||||
Ok(sock) => sock,
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
if socket.connect(endpoint).is_err() {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Try to send an empty message non-blocking
|
||||
socket.send("", DONTWAIT).is_ok()
|
||||
}
|
||||
|
||||
// Add this struct to monitor connection health
|
||||
struct ConnectionMonitor {
|
||||
last_message_time: Instant,
|
||||
timeout: Duration,
|
||||
consecutive_timeouts: u32,
|
||||
max_consecutive_timeouts: u32,
|
||||
}
|
||||
|
||||
impl ConnectionMonitor {
|
||||
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
|
||||
Self {
|
||||
last_message_time: Instant::now(),
|
||||
timeout: Duration::from_secs(timeout_secs),
|
||||
consecutive_timeouts: 0,
|
||||
max_consecutive_timeouts: max_timeouts,
|
||||
}
|
||||
}
|
||||
|
||||
fn update(&mut self) {
|
||||
self.last_message_time = Instant::now();
|
||||
self.consecutive_timeouts = 0;
|
||||
}
|
||||
|
||||
fn check_connection(&mut self) -> ConnectionStatus {
|
||||
let elapsed = self.last_message_time.elapsed();
|
||||
|
||||
if elapsed > self.timeout {
|
||||
self.consecutive_timeouts += 1;
|
||||
|
||||
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
|
||||
ConnectionStatus::Lost(elapsed)
|
||||
} else {
|
||||
ConnectionStatus::Warning(elapsed)
|
||||
}
|
||||
} else {
|
||||
ConnectionStatus::Healthy
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.consecutive_timeouts = 0;
|
||||
self.last_message_time = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
enum ConnectionStatus {
|
||||
Healthy,
|
||||
Warning(Duration),
|
||||
Lost(Duration),
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main()-> std::io::Result<()>{
|
||||
env_logger::init();
|
||||
@ -488,7 +560,6 @@ async fn main()-> std::io::Result<()>{
|
||||
info!("Network: {}",arg_network);
|
||||
let network_params = get_network_params(&cfg,network);
|
||||
|
||||
|
||||
let context = Context::new();
|
||||
let socket: Socket = context.socket(zmq::SUB).unwrap();
|
||||
|
||||
@ -501,28 +572,28 @@ async fn main()-> std::io::Result<()>{
|
||||
let _ = main_result(&cfg,network_params).await;
|
||||
info!("waiting new blocks..");
|
||||
let mut last_seq:Vec<u8>=[0;4].to_vec();
|
||||
let mut counter=0;
|
||||
let max=100;
|
||||
loop {
|
||||
let message = socket.recv_multipart(0).unwrap();
|
||||
let topic = message[0].clone();
|
||||
let body = message[1].clone();
|
||||
let seq = message[2].clone();
|
||||
if last_seq >= seq {
|
||||
continue
|
||||
}
|
||||
last_seq = seq;
|
||||
//let mut sequence_str = "Unknown".to_string();
|
||||
/*if seq.len()==4{
|
||||
let mut rdr = Cursor::new(seq);
|
||||
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
|
||||
sequence_str = sequence.to_string();
|
||||
}*/
|
||||
debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
|
||||
trace!("ZMQ:GET BODY: {}", hex::encode(&body));
|
||||
if topic == b"hashblock" {
|
||||
info!("NEW BLOCK: {}", hex::encode(&body));
|
||||
//let cfg = cfg.clone();
|
||||
let _ = main_result(&cfg,network_params).await;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
|
||||
}
|
||||
}
|
||||
fn seq_to_str(seq:&Vec<u8>) -> String{
|
||||
if seq.len()==4{
|
||||
let mut rdr = Cursor::new(seq);
|
||||
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
|
||||
return sequence.to_string();
|
||||
}
|
||||
"Unknown".to_string()
|
||||
}
|
||||
|
||||
@ -34,7 +34,7 @@ use crate::db::{ create_database, get_next_address_index, insert_xpub, save_new_
|
||||
#[path = "../xpub.rs"]
|
||||
mod xpub;
|
||||
use crate::xpub::new_address_from_xpub;
|
||||
const VERSION:&str="0.2.1";
|
||||
const VERSION:&str="0.2.2";
|
||||
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
|
||||
#[derive(Debug, Clone,Serialize, Deserialize)]
|
||||
struct NetConfig {
|
||||
@ -112,6 +112,11 @@ async fn echo_version(
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
Ok(Response::new(full(VERSION)))
|
||||
}
|
||||
async fn echo_home(cfg: &MyConfig
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
debug!("echo_home: {}", cfg.info );
|
||||
Ok(Response::new(full(cfg.info.clone())))
|
||||
}
|
||||
async fn echo_pub_key(
|
||||
cfg: &MyConfig,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
@ -454,6 +459,9 @@ async fn echo(
|
||||
if uri=="/.pub_key.pem" {
|
||||
ret = echo_pub_key(cfg).await;
|
||||
}
|
||||
if uri=="/"{
|
||||
ret = echo_home(cfg).await;
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user