Compare commits

..

18 Commits
v0.2.0 ... main

10 changed files with 1466 additions and 109 deletions

1071
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
base64 = "0.22.1"
bs58 = "0.4.0" bs58 = "0.4.0"
bytes = "1.2" bytes = "1.2"
bitcoin = { version = "0.32.5" } bitcoin = { version = "0.32.5" }
@ -20,11 +21,13 @@ hyper = { version = "1.3.1", features = ["http1","server"] }
hyper-util = { version = "0.1.3", features = ["tokio"] } hyper-util = { version = "0.1.3", features = ["tokio"] }
http-body-util = "0.1" http-body-util = "0.1"
log = "0.4.21" log = "0.4.21"
openssl = { version = "0.10.74", features = ["vendored"] }
sha2 = "0.10.8" sha2 = "0.10.8"
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.116" serde_json = "1.0.116"
sqlite = "0.34.0" sqlite = "0.34.0"
regex = "1.10.4" regex = "1.10.4"
reqwest = { version = "0.12.24", features = ["json","socks"] }
tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components tokio = { version = "1", features = ["rt", "net","macros","rt-multi-thread"] } # Keep only necessary runtime components
zmq = "0.10.0" zmq = "0.10.0"

View File

@ -1,10 +1,11 @@
# bal-server # bal-server
## Installation ## Installation
```bash ```bash
$ git clone .... $ git clone ....
$ cd bal-server $ cd bal-server
$ openssl genpkey -algorithm ED25519 -out private_key.pem
$ openssl pkey -in private_key.pem -pubout -out public_key.pem
$ cargo build --release $ cargo build --release
$ sudo cp target/release/bal-server /usr/local/bin $ sudo cp target/release/bal-server /usr/local/bin
$ bal-server $ bal-server
@ -20,6 +21,7 @@ The `bal-server` application can be configured using environment variables. The
| `BAL_SERVER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, a new one will be created. | `bal.db` | | `BAL_SERVER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, a new one will be created. | `bal.db` |
| `BAL_SERVER_BIND_ADDRESS` | Public address for listening to requests. | `127.0.0.1` | | `BAL_SERVER_BIND_ADDRESS` | Public address for listening to requests. | `127.0.0.1` |
| `BAL_SERVER_BIND_PORT` | Default port for listening to requests. | `9137` | | `BAL_SERVER_BIND_PORT` | Default port for listening to requests. | `9137` |
| `BAL_SERVER_PUB_KEY_PATH` | WillExecutor Ed25519 public key | `public_key.pem` |
| `BAL_SERVER_REGTEST_ADDRESS` | Bitcoin address for the regtest environment. | - | | `BAL_SERVER_REGTEST_ADDRESS` | Bitcoin address for the regtest environment. | - |
| `BAL_SERVER_REGTEST_FIXED_FEE` | Fixed fee for the regtest environment. | 50000 | | `BAL_SERVER_REGTEST_FIXED_FEE` | Fixed fee for the regtest environment. | 50000 |
| `BAL_SERVER_SIGNET_ADDRESS` | Bitcoin address for the signet environment. | - | | `BAL_SERVER_SIGNET_ADDRESS` | Bitcoin address for the signet environment. | - |
@ -28,3 +30,56 @@ The `bal-server` application can be configured using environment variables. The
| `BAL_SERVER_TESTNET_FIXED_FEE` | Fixed fee for the testnet environment. | 50000 | | `BAL_SERVER_TESTNET_FIXED_FEE` | Fixed fee for the testnet environment. | 50000 |
| `BAL_SERVER_BITCOIN_ADDRESS` | Bitcoin address for the mainnet environment. | - | | `BAL_SERVER_BITCOIN_ADDRESS` | Bitcoin address for the mainnet environment. | - |
| `BAL_SERVER_BITCOIN_FIXED_FEE` | Fixed fee for the mainnet environment. | 50000 | | `BAL_SERVER_BITCOIN_FIXED_FEE` | Fixed fee for the mainnet environment. | 50000 |
# bal-pusher
`bal-pusher` is a tool that retrieves Bitcoin transactions from a database and pushes them to the Bitcoin network when their **locktime** exceeds the **median time past** (MTP). It listens for Bitcoin block updates via ZMQ.
## Installation
To use `bal-pusher`, you need to compile and install Bitcoin with ZMQ (ZeroMQ) support enabled. Then, configure your Bitcoin node and `bal-pusher` to push the transactions.
### Prerequisites
1. **Bitcoin with ZMQ Support**:
Ensure that Bitcoin is compiled with ZMQ support. Add the following line to your `bitcoin.conf` file:
```
zmqpubhashblock=tcp://127.0.0.1:28332
```
2. **Install Rust and Cargo**:
If you haven't already installed Rust and Cargo, you can follow the official instructions to do so: [Rust Installation](https://www.rust-lang.org/tools/install).
## Configuration
`bal-pusher` can be configured using environment variables. If no configuration file is provided, a default configuration file will be created.
### Available Configuration Variables
| Variable | Description | Default |
|---------------------------------------|------------------------------------------|----------------------------------------------|
| `BAL_PUSHER_CONFIG_FILE` | Path to the configuration file. If the file does not exist, it will be created. | `$HOME/.config/bal-pusher/default-config.toml` |
| `BAL_PUSHER_DB_FILE` | Path to the SQLite3 database file. If the file does not exist, it will be created. | `bal.db` |
| `BAL_PUSHER_ZMQ_LISTENER` | ZMQ listener for Bitcoin updates. | `tcp://127.0.0.1:28332` |
| `BAL_PUSHER_BITCOIN_HOST` | Bitcoin server host for RPC connections. | `http://127.0.0.1` |
| `BAL_PUSHER_BITCOIN_PORT` | Bitcoin RPC server port. | `8332` |
| `BAL_PUSHER_BITCOIN_COOKIE_FILE` | Path to Bitcoin RPC cookie file. | `$HOME/.bitcoin/.cookie` |
| `BAL_PUSHER_BITCOIN_RPC_USER` | Bitcoin RPC username. | - |
| `BAL_PUSHER_BITCOIN_RPC_PASSWORD` | Bitcoin RPC password. | - |
| `BAL_PUSHER_SEND_STATS` | Contact welist to provide times | false |
| `WELIST_SERVER_URL` | welist server url to provide times | https://welist.bitcoin-afer.life |
| `BAL_SERVER_URL` | WillExecutor server url | - |
| `SSL_KEY_PATH` | Ed25519 private key pem file | `private_key.pem` |
## Running `bal-pusher`
Once the application is installed and configured, you can start `bal-pusher` by running the following command:
```bash
$ bal-pusher
```
This will start the service, which will listen for Bitcoin blocks via ZMQ and push transactions from the database when their locktime exceeds the median time past.

16
bal-pusher.env Normal file
View File

@ -0,0 +1,16 @@
RUST_LOG=info
BAL_PUSHER_DB_FILE=/home/bal/bal.db
BAL_PUSHER_BITCOIN_COOKIE_FILE=/home/bitcoin/.bitcoin/.cookie
BAL_PUSHER_REGTEST_COOKIE_FILE=/home/bitcoin/.bitcoin/regtest/.cookie
BAL_PUSHER_TESTNET_COOKIE_FILE=/home/bitcoin/.bitcoin/testnet3/.cookie
BAL_PUSHER_SIGNET_COOKIE_FILE=/home/bitcoin/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
BAL_PUSHER_SEND_STATS=true
WELIST_SERVER_URL=http://welist.bitcoin-after.life
SSL_KEY_PATH=/home/bal/privkey.pem
#your server domain. do not add https or final / only domain.
BAL_SERVER_URL="https://we.bitcoin-after.life"

14
bal-pusher.sh Normal file
View File

@ -0,0 +1,14 @@
RUST_LOG=trace
BAL_PUSHER_DB_FILE="$(pwd)/bal.db"
#export BAL_PUSHER_BITCOIN_COOKIE_FILE=/~/.bitcoin/.cookie
#export BAL_PUSHER_REGTEST_COOKIE_FILE=/~/.bitcoin/regtest/.cookie
#export BAL_PUSHER_TESTNET_COOKIE_FILE=/~/.bitcoin/testnet3/.cookie
#export BAL_PUSHER_SIGNET_COOKIE_FILE=/~/.bitcoin/signet/.cookie
BAL_PUSHER_ZMQ_LISTENER=tcp://127.0.0.1:28332
export BAL_PUSHER_SEND_STATS=true
export WELIST_SERVER_URL=http://localhost:8085
export BAL_SERVER_URL="http://127.0.0.1:9133"
export SSL_KEY_PATH="$(pwd)/private_key.pem"
cargo run --bin=bal-pusher regtest

View File

@ -1,10 +1,11 @@
RUST_LOG=trace RUST_LOG=info
BAL_SERVER_DB_FILE=/home/bal/bal.db BAL_SERVER_DB_FILE="/home/bal/bal.db"
BAL_SERVER_INFO="BAL server test willexecutor" BAL_SERVER_INFO="BAL server test willexecutor"
BAL_SERVER_BIND_ADDRESS=127.0.0.1 BAL_SERVER_BIND_ADDRESS=127.0.0.1
BAL_SERVER_BIND_PORT=9133 BAL_SERVER_BIND_PORT=9133
BAL_SERVER_BITCOIN_ADDRESS="your bitcoin to recive payments here" BAL_SERVER_BITCOIN_ADDRESS="your bitcoin or xpub to recive payments here"
BAL_SERVER_BITCOIN_FIXED_FEE=50000 BAL_SERVER_BITCOIN_FIXED_FEE=50000
BAL_SERVER_PUB_KEY_PATH="/home/bal/public_key.pem"
BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4" BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4"
BAL_SERVER_REGTEST_FEE=5000 BAL_SERVER_REGTEST_FEE=5000

24
bal-server.sh Normal file
View File

@ -0,0 +1,24 @@
WORKING_DIR=$(pwd)
if [ ! -f "$WORKING_DIR/public_key.pem" ]; then
echo "creating keypairs"
openssl genpkey -algorithm ED25519 -out private_key.pem
openssl pkey -in private_key.pem -pubout -out public_key.pem
fi
export RUST_LOG="trace"
export BAL_SERVER_DB_FILE="$WORKING_DIR/bal.db"
export BAL_SERVER_INFO="BAL devel willexecutor server"
export BAL_SERVER_BIND_ADDRESS="127.0.0.1"
export BAL_SERVER_BIND_PORT=9133
export BAL_SERVER_PUB_KEY_PATH="$WORKING_DIR/public_key.pem"
#export BAL_SERVER_BITCOIN_ADDRESS="your bitcoin address or xpub to recive payments here"
#export BAL_SERVER_BITCOIN_FIXED_FEE=50000
export BAL_SERVER_REGTEST_ADDRESS="vpub5UhLrYG1qQjnJhvJgBdqgpznyH11mxW9hwBYxf3KhfdjiupCFPUVDvgwpeZ9Wj5YUJXjKjXjy7DSbJNBW1sXbKwARiaphm1UjHYy3mKvTG4"
export BAL_SERVER_REGTEST_FEE=5000
#export BAL_SERVER_TESTNET_ADDRESS=
#export BAL_SERVER_TESTNET_FEE=100000
#export BAL_SERVER_SIGNET_ADDRESS=
#export BAL_SERVER_SIGNET_FEE=100000
cargo run --bin=bal-server

View File

@ -8,21 +8,35 @@ use bitcoincore_rpc_json::GetBlockchainInfoResult;
use sqlite::{Value}; use sqlite::{Value};
use serde::Serialize; use serde::Serialize;
use serde::Deserialize; use serde::Deserialize;
use serde_json::json;
use std::env; use std::env;
use std::fs::OpenOptions; use log::{info,warn,error,trace,debug};
use std::io::{ Write}; use zmq::{Context, Socket, DEALER, DONTWAIT};
use log::{info,debug,warn,error};
use zmq::{Context, Socket};
use std::str; use std::str;
use std::{thread, time::Duration}; use std::{thread, time::Duration};
use std::collections::HashMap; use std::collections::HashMap;
//use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};
//use std::io::Cursor; use std::io::Cursor;
use hex; use hex;
use std::error::Error as StdError; use std::error::Error as StdError;
const LOCKTIME_THRESHOLD:i64 = 5000000; use reqwest::Client as rClient;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey};
use openssl::sign::Signer;
use openssl::sign::Verifier;
use base64::{engine::general_purpose, Engine as _};
use std::fs;
use std::time::Instant;
const LOCKTIME_THRESHOLD:i64 = 5000000;
const VERSION:&str = "0.0.2";
#[derive(Debug, Clone,Serialize, Deserialize)] #[derive(Debug, Clone,Serialize, Deserialize)]
struct MyConfig { struct MyConfig {
zmq_listener: String, zmq_listener: String,
@ -33,8 +47,10 @@ struct MyConfig {
testnet: NetworkParams, testnet: NetworkParams,
signet: NetworkParams, signet: NetworkParams,
mainnet: NetworkParams, mainnet: NetworkParams,
send_stats: bool,
url: String,
secret_code: String,
ssl_key_path: String
} }
impl Default for MyConfig { impl Default for MyConfig {
@ -48,6 +64,10 @@ impl Default for MyConfig {
testnet: get_network_params_default(Network::Testnet), testnet: get_network_params_default(Network::Testnet),
signet: get_network_params_default(Network::Signet), signet: get_network_params_default(Network::Signet),
mainnet: get_network_params_default(Network::Bitcoin), mainnet: get_network_params_default(Network::Bitcoin),
send_stats: false,
url: "http://localhost/".to_string(),
secret_code: "xxx".to_string(),
ssl_key_path: "privkey.pem".to_string(),
} }
} }
} }
@ -73,7 +93,7 @@ fn get_network_params(cfg: &MyConfig,network:Network)-> &NetworkParams{
fn get_network_params_default(network:Network) -> NetworkParams{ fn get_network_params_default(network:Network) -> NetworkParams{
match network { match network {
Network::Testnet => NetworkParams{ Network::Testnet => NetworkParams{
host: "http://localhost".to_string(), host: "http://i27.0.0.1".to_string(),
port: 18332, port: 18332,
dir_path: "testnet3/".to_string(), dir_path: "testnet3/".to_string(),
db_field: "testnet".to_string(), db_field: "testnet".to_string(),
@ -82,7 +102,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
Network::Signet => NetworkParams{ Network::Signet => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 18332, port: 18332,
dir_path: "signet/".to_string(), dir_path: "signet/".to_string(),
db_field: "signet".to_string(), db_field: "signet".to_string(),
@ -91,7 +111,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
Network::Regtest => NetworkParams{ Network::Regtest => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 18443, port: 18443,
dir_path: "regtest/".to_string(), dir_path: "regtest/".to_string(),
db_field: "regtest".to_string(), db_field: "regtest".to_string(),
@ -100,7 +120,7 @@ fn get_network_params_default(network:Network) -> NetworkParams{
rpc_pass: "".to_string(), rpc_pass: "".to_string(),
}, },
_ => NetworkParams{ _ => NetworkParams{
host: "http://localhost".to_string(), host: "http://127.0.0.1".to_string(),
port: 8332, port: 8332,
dir_path: "".to_string(), dir_path: "".to_string(),
db_field: "bitcoin".to_string(), db_field: "bitcoin".to_string(),
@ -117,7 +137,6 @@ fn get_cookie_filename(network: &NetworkParams) ->Result<String,Box<dyn StdError
}else{ }else{
match env::var_os("HOME") { match env::var_os("HOME") {
Some(home) => { Some(home) => {
info!("some home {}",home.to_str().unwrap());
match home.to_str(){ match home.to_str(){
Some(home_str) => { Some(home_str) => {
let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path); let cookie_file_path = format!("{}/.bitcoin/{}.cookie",home_str, network.dir_path);
@ -148,9 +167,15 @@ fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,
match get_cookie_filename(network){ match get_cookie_filename(network){
Ok(cookie) => { Ok(cookie) => {
match Client::new(&url[..], Auth::CookieFile(cookie.into())) { match Client::new(&url[..], Auth::CookieFile(cookie.into())) {
Ok(client) => match client.get_blockchain_info(){ Ok(client) => {
Ok(bcinfo) => Ok((client,bcinfo)), match client.get_blockchain_info(){
Err(err) => Err(err.into()) Ok(bcinfo) => {
Ok((client,bcinfo))
},
Err(err) => {
Err(err.into())
}
}
}, },
Err(err)=>Err(err.into()) Err(err)=>Err(err.into())
@ -160,7 +185,7 @@ fn get_client_from_cookie(url: &String,network: &NetworkParams)->Result<(Client,
} }
} }
fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{ fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult),Box<dyn StdError>>{
let url = format!("{}:{}",network.host,&network.port); let url = format!("{}:{}/",network.host,&network.port);
match get_client_from_username(&url,network){ match get_client_from_username(&url,network){
Ok(client) =>{Ok(client)}, Ok(client) =>{Ok(client)},
Err(_) =>{ Err(_) =>{
@ -173,7 +198,7 @@ fn get_client(network: &NetworkParams) -> Result<(Client,GetBlockchainInfoResult
} }
} }
} }
fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> { async fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Error> {
/*let url = args.next().expect("Usage: <rpc_url> <username> <password>"); /*let url = args.next().expect("Usage: <rpc_url> <username> <password>");
@ -200,10 +225,22 @@ fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Err
//} //}
//let average_time = time_sum/11; //let average_time = time_sum/11;
info!("median time: {}",bcinfo.median_time); info!("median time: {}",bcinfo.median_time);
//info!("height time: {}",bcinfo.median_time);
info!("blocks: {}",bcinfo.blocks);
debug!("best block hash: {}",bcinfo.best_block_hash);
let average_time = bcinfo.median_time; let average_time = bcinfo.median_time;
let db = sqlite::open(&cfg.db_file).unwrap(); let db = sqlite::open(&cfg.db_file).unwrap();
let query_tx = db.prepare("SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);").unwrap().into_iter();
let sqlquery = "SELECT * FROM tbl_tx WHERE network = :network AND status = :status AND ( locktime < :bestblock_height OR locktime > :locktime_threshold AND locktime < :bestblock_time);";
let query_tx = db.prepare(sqlquery).unwrap().into_iter();
trace!("query_tx: {}",sqlquery);
trace!(":locktime_threshold: {}", LOCKTIME_THRESHOLD );
trace!(":bestblock_time: {}", average_time);
trace!(":bestblock_height: {}", bcinfo.blocks);
trace!(":network: {}", network_params.db_field.clone());
trace!(":status: {}", 0);
//let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter(); //let query_tx = db.prepare("SELECT * FROM tbl_tx where status = :status").unwrap().into_iter();
let mut pushed_txs:Vec<String> = Vec::new(); let mut pushed_txs:Vec<String> = Vec::new();
let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new(); let mut invalid_txs: std::collections::HashMap<String, String> = HashMap::new();
@ -223,25 +260,26 @@ fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Err
info!("to be pushed: {}: {}",txid, locktime); info!("to be pushed: {}: {}",txid, locktime);
match rpc.send_raw_transaction(tx){ match rpc.send_raw_transaction(tx){
Ok(o) => { Ok(o) => {
let mut file = OpenOptions::new() /*let mut file = OpenOptions::new()
.append(true) // Set the append option .append(true) // Set the append option
.create(true) // Create the file if it doesn't exist .create(true) // Create the file if it doesn't exist
.open("valid_txs")?; .open("valid_txs")?;
let data = format!("{}\t:\t{}\t:\t{}\n",txid,average_time,locktime); let data = format!("{}\t:\t{}\t:\t{}\n",txid,average_time,locktime);
file.write_all(data.as_bytes())?; file.write_all(data.as_bytes())?;
drop(file); drop(file);
*/
info!("tx: {} pusshata PUSHED\n{}",txid,o); info!("tx: {} pusshata PUSHED\n{}",txid,o);
pushed_txs.push(txid.to_string()); pushed_txs.push(txid.to_string());
}, },
Err(err) => { Err(err) => {
let mut file = OpenOptions::new() /*let mut file = OpenOptions::new()
.append(true) // Set the append option .append(true) // Set the append option
.create(true) // Create the file if it doesn't exist .create(true) // Create the file if it doesn't exist
.open("invalid_txs")?; .open("/home/bal/invalid_txs")?;
let data = format!("{}:\t{}\t:\t{}\t:\t{}\n",txid,err,average_time,locktime); let data = format!("{}:\t{}\t:\t{}\t:\t{}\n",txid,err,average_time,locktime);
file.write_all(data.as_bytes())?; file.write_all(data.as_bytes())?;
drop(file); drop(file);
*/
warn!("Error: {}\n{}",err,txid); warn!("Error: {}\n{}",err,txid);
//store err in invalid_txs //store err in invalid_txs
invalid_txs.insert(txid.to_string(), err.to_string()); invalid_txs.insert(txid.to_string(), err.to_string());
@ -251,21 +289,70 @@ fn main_result(cfg: &MyConfig, network_params: &NetworkParams) -> Result<(), Err
} }
if pushed_txs.len() > 0 { if pushed_txs.len() > 0 {
let _ = db.execute(format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"))); let sql = format!("UPDATE tbl_tx SET status = 1 WHERE txid in ('{}');",pushed_txs.join("','"));
trace!("sqlok: {}",&sql);
let _ = db.execute(&sql);
} }
if invalid_txs.len() > 0 { if invalid_txs.len() > 0 {
for (txid,txerr) in &invalid_txs{ for (txid,txerr) in &invalid_txs{
//let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','"))); //let _ = db.execute(format!("UPDATE tbl_tx SET status = 2 WHERE txid in ('{}'Yp);",invalid_txs.join("','")));
let _ = db.execute(format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'")); let sql = format!("UPDATE tbl_tx SET status = 2, push_err='{txerr}' WHERE txid = '{txid}'");
trace!("sqlerror: {}",&sql);
let _ = db.execute(&sql);
} }
} }
let _ = send_stats_report(cfg, bcinfo).await;
} }
Err(_)=>{ Err(erx)=>{
panic!("impossible to get client") panic!("impossible to get client {}",erx)
} }
} }
Ok(()) Ok(())
} }
async fn send_stats_report(cfg: &MyConfig, bcinfo: GetBlockchainInfoResult) -> Result<(),reqwest::Error>{
if cfg.send_stats {
debug!("sending report to welist");
let welist_url=env::var("WELIST_SERVER_URL").unwrap_or("https://welist.bitcoin-after.life".to_string());
let client = rClient::new();
let url = format!("{}/ping",welist_url);
let chain=bcinfo.chain.to_string().to_lowercase();
let message = format!("{0}{1}{2}{3}{4}",cfg.url,chain,bcinfo.blocks,bcinfo.median_time,bcinfo.best_block_hash);
let sign = sign_message(cfg.ssl_key_path.as_str(),&message.as_str());
let response = client.post(url)
.header("User-Agent", format!("bal-pusher/{}",VERSION))
.json(&json!(
{
"url": cfg.url,
"chain": chain,
"height": bcinfo.blocks,
"median_time": bcinfo.median_time,
"last_block_hash": bcinfo.best_block_hash,
"signature": sign,
}))
.send().await?;
let body = &(response.text().await?);
info!("Report to welist({})\tSent: {}", welist_url,body);
}else {
debug!("Not sending stats");
}
Ok(())
}
fn sign_message(private_key_path: &str, message: &str) -> String {
let key_data = fs::read(private_key_path).unwrap();
let private_key = PKey::private_key_from_pem(&key_data).unwrap();
let mut signer = Signer::new_without_digest(&private_key).unwrap();
let signature = signer.sign_oneshot_to_vec(message.as_bytes()).unwrap();
let signature_b64 = general_purpose::STANDARD.encode(&signature);
signature_b64
}
fn parse_env(cfg: &mut MyConfig){ fn parse_env(cfg: &mut MyConfig){
match env::var("BAL_PUSHER_ZMQ_LISTENER") { match env::var("BAL_PUSHER_ZMQ_LISTENER") {
@ -288,6 +375,27 @@ fn parse_env(cfg: &mut MyConfig){
cfg.bitcoin_dir = value;}, cfg.bitcoin_dir = value;},
Err(_) => {}, Err(_) => {},
} }
match env::var("BAL_PUSHER_SEND_STATS") {
Ok(value) => {
cfg.send_stats = value.parse::<bool>().unwrap();
},
Err(_) => {},
}
match env::var("BAL_SERVER_URL") {
Ok(value) => {
cfg.url= value;},
Err(_) => {},
}
match env::var("WELIST_SECRET_CODE") {
Ok(value) => {
cfg.secret_code = value;},
Err(_) => {},
}
match env::var("SSL_KEY_PATH") {
Ok(value) => {
cfg.ssl_key_path = value;},
Err(_) => {},
}
cfg.regtest = parse_env_netconfig(cfg,"regtest"); cfg.regtest = parse_env_netconfig(cfg,"regtest");
cfg.signet = parse_env_netconfig(cfg,"signet"); cfg.signet = parse_env_netconfig(cfg,"signet");
cfg.testnet = parse_env_netconfig(cfg,"testnet"); cfg.testnet = parse_env_netconfig(cfg,"testnet");
@ -345,7 +453,75 @@ fn get_default_config()-> MyConfig {
confy::load("bal-pusher",None).expect("cant_load") confy::load("bal-pusher",None).expect("cant_load")
} }
fn main(){ fn check_zmq_connection(endpoint: &str) -> bool {
trace!("check zmq connection");
let context = Context::new();
let socket = match context.socket(DEALER) {
Ok(sock) => sock,
Err(_) => return false,
};
if socket.connect(endpoint).is_err() {
return false;
}
// Try to send an empty message non-blocking
socket.send("", DONTWAIT).is_ok()
}
// Add this struct to monitor connection health
struct ConnectionMonitor {
last_message_time: Instant,
timeout: Duration,
consecutive_timeouts: u32,
max_consecutive_timeouts: u32,
}
impl ConnectionMonitor {
fn new(timeout_secs: u64, max_timeouts: u32) -> Self {
Self {
last_message_time: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
consecutive_timeouts: 0,
max_consecutive_timeouts: max_timeouts,
}
}
fn update(&mut self) {
self.last_message_time = Instant::now();
self.consecutive_timeouts = 0;
}
fn check_connection(&mut self) -> ConnectionStatus {
let elapsed = self.last_message_time.elapsed();
if elapsed > self.timeout {
self.consecutive_timeouts += 1;
if self.consecutive_timeouts >= self.max_consecutive_timeouts {
ConnectionStatus::Lost(elapsed)
} else {
ConnectionStatus::Warning(elapsed)
}
} else {
ConnectionStatus::Healthy
}
}
fn reset(&mut self) {
self.consecutive_timeouts = 0;
self.last_message_time = Instant::now();
}
}
enum ConnectionStatus {
Healthy,
Warning(Duration),
Lost(Duration),
}
#[tokio::main]
async fn main()-> std::io::Result<()>{
env_logger::init(); env_logger::init();
let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") { let mut cfg: MyConfig = match env::var("BAL_PUSHER_CONFIG_FILE") {
Ok(value) => { Ok(value) => {
@ -381,41 +557,43 @@ fn main(){
}; };
debug!("Network: {}",arg_network); info!("Network: {}",arg_network);
let network_params = get_network_params(&cfg,network); let network_params = get_network_params(&cfg,network);
let context = Context::new(); let context = Context::new();
let socket: Socket = context.socket(zmq::SUB).unwrap(); let socket: Socket = context.socket(zmq::SUB).unwrap();
let zmq_address = cfg.zmq_listener.clone(); let zmq_address = cfg.zmq_listener.clone();
info!("zmq listening on: {}",zmq_address);
socket.connect(&zmq_address).unwrap(); socket.connect(&zmq_address).unwrap();
socket.set_subscribe(b"").unwrap(); socket.set_subscribe(b"").unwrap();
let _ = main_result(&cfg,network_params); let _ = main_result(&cfg,network_params).await;
info!("waiting new blocks.."); info!("waiting new blocks..");
let mut last_seq:Vec<u8>=[0;4].to_vec(); let mut last_seq:Vec<u8>=[0;4].to_vec();
let mut counter=0;
let max=100;
loop { loop {
let message = socket.recv_multipart(0).unwrap(); let message = socket.recv_multipart(0).unwrap();
let topic = message[0].clone(); let topic = message[0].clone();
let body = message[1].clone(); let body = message[1].clone();
let seq = message[2].clone(); let seq = message[2].clone();
if last_seq >= seq {
continue
}
last_seq = seq; last_seq = seq;
//let mut sequence_str = "Unknown".to_string(); debug!("ZMQ:GET TOPIC: {}", String::from_utf8(topic.clone()).expect("invalid topic"));
/*if seq.len()==4{ trace!("ZMQ:GET BODY: {}", hex::encode(&body));
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
sequence_str = sequence.to_string();
}*/
if topic == b"hashblock" { if topic == b"hashblock" {
info!("NEW BLOCK{}", hex::encode(body)); info!("NEW BLOCK: {}", hex::encode(&body));
//let cfg = cfg.clone(); let _ = main_result(&cfg,network_params).await;
let _ = main_result(&cfg,network_params);
} }
thread::sleep(Duration::from_millis(100)); // Sleep for 100ms thread::sleep(Duration::from_millis(100)); // Sleep for 100ms
} }
} }
fn seq_to_str(seq:&Vec<u8>) -> String{
if seq.len()==4{
let mut rdr = Cursor::new(seq);
let sequence = rdr.read_u32::<LittleEndian>().expect("Failed to read integer");
return sequence.to_string();
}
"Unknown".to_string()
}

View File

@ -11,6 +11,7 @@ use std::net::IpAddr;
use std::env; use std::env;
//use std::time::{SystemTime,UNIX_EPOCH}; //use std::time::{SystemTime,UNIX_EPOCH};
use std::fs;
use std::sync::{ Arc, Mutex, MutexGuard }; use std::sync::{ Arc, Mutex, MutexGuard };
//use std::net::SocketAddr; //use std::net::SocketAddr;
use std::collections::HashMap; use std::collections::HashMap;
@ -33,7 +34,7 @@ use crate::db::{ create_database, get_next_address_index, insert_xpub, save_new_
#[path = "../xpub.rs"] #[path = "../xpub.rs"]
mod xpub; mod xpub;
use crate::xpub::new_address_from_xpub; use crate::xpub::new_address_from_xpub;
const VERSION:&str="0.2.0"; const VERSION:&str="0.2.2";
const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"]; const NETWORKS : [&str; 4]= ["bitcoin","testnet","signet","regtest"];
#[derive(Debug, Clone,Serialize, Deserialize)] #[derive(Debug, Clone,Serialize, Deserialize)]
struct NetConfig { struct NetConfig {
@ -48,26 +49,27 @@ struct NetConfig {
impl NetConfig { impl NetConfig {
fn default_network(name:String, network: Network) -> Self { fn default_network(name:String, network: Network) -> Self {
NetConfig { NetConfig {
address: "".to_string(), address: "".to_string(),
fixed_fee: 50000, fixed_fee: 50000,
xpub: false, xpub: false,
name, name,
network, network,
enabled: false, enabled: false,
} }
} }
} }
#[derive(Debug, Serialize, Deserialize,Clone)] #[derive(Debug, Serialize, Deserialize,Clone)]
struct MyConfig { struct MyConfig {
regtest: NetConfig, regtest: NetConfig,
signet: NetConfig, signet: NetConfig,
testnet: NetConfig, testnet: NetConfig,
mainnet: NetConfig, mainnet: NetConfig,
info: String, info: String,
bind_address: String, bind_address: String,
bind_port: u16, // Changed to u16 for port numbers bind_port: u16, // Changed to u16 for port numbers
db_file: String, db_file: String,
pub_key_path: String,
} }
#[derive(Debug,Serialize, Deserialize)] #[derive(Debug,Serialize, Deserialize)]
@ -83,14 +85,15 @@ pub struct Info {
impl Default for MyConfig { impl Default for MyConfig {
fn default() -> Self { fn default() -> Self {
MyConfig { MyConfig {
regtest: NetConfig::default_network("regtest".to_string(), Network::Regtest), regtest: NetConfig::default_network("regtest".to_string(), Network::Regtest),
signet: NetConfig::default_network("signet".to_string(), Network::Signet), signet: NetConfig::default_network("signet".to_string(), Network::Signet),
testnet: NetConfig::default_network("testnet".to_string(), Network::Testnet), testnet: NetConfig::default_network("testnet".to_string(), Network::Testnet),
mainnet: NetConfig::default_network("bitcoin".to_string(), Network::Bitcoin), mainnet: NetConfig::default_network("bitcoin".to_string(), Network::Bitcoin),
bind_address: "127.0.0.1".to_string(), bind_address: "127.0.0.1".to_string(),
bind_port: 9137, bind_port: 9137,
db_file: "bal.db".to_string(), db_file: "bal.db".to_string(),
info:"Will Executor Server".to_string() info: "Will Executor Server".to_string(),
pub_key_path: "public_key.pem".to_string(),
} }
} }
} }
@ -109,6 +112,18 @@ async fn echo_version(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
Ok(Response::new(full(VERSION))) Ok(Response::new(full(VERSION)))
} }
async fn echo_home(cfg: &MyConfig
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
debug!("echo_home: {}", cfg.info );
Ok(Response::new(full(cfg.info.clone())))
}
async fn echo_pub_key(
cfg: &MyConfig,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let pub_key = fs::read_to_string(&cfg.pub_key_path)
.expect(format!("Failed to read public key file {}",cfg.pub_key_path).as_str());
Ok(Response::new(full(pub_key)))
}
async fn echo_info( async fn echo_info(
param: &str, param: &str,
cfg: &MyConfig, cfg: &MyConfig,
@ -117,13 +132,13 @@ async fn echo_info(
info!("echo info!!!{}",param); info!("echo info!!!{}",param);
let netconfig=MyConfig::get_net_config(cfg,param); let netconfig=MyConfig::get_net_config(cfg,param);
if !netconfig.enabled { if !netconfig.enabled {
trace!("network disabled"); debug!("network disabled {}",param);
return Ok(Response::new(full("network disabled"))); return Ok(Response::new(full("network disabled")));
} }
let address = match netconfig.xpub{ let address = match netconfig.xpub{
false => { false => {
let address = netconfig.address.to_string(); let address = netconfig.address.to_string();
info!("is address: {}",&address); trace!("is address: {}",&address);
address address
}, },
true => { true => {
@ -134,8 +149,8 @@ async fn echo_info(
let next = get_next_address_index(&db,&netconfig.name,&netconfig.address); let next = get_next_address_index(&db,&netconfig.name,&netconfig.address);
let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap(); let address = new_address_from_xpub(&netconfig.address,next.1,netconfig.network).unwrap();
save_new_address(&db,next.0,&address.0,&address.1,&remote_addr); save_new_address(&db,next.0,&address.0,&address.1,&remote_addr);
debug!("address {} {}",address.0,address.1); debug!("save new address {} {}",address.0,address.1);
debug!("next {} {}",next.0,next.1); trace!("next {} {}",next.0,next.1);
address.0 address.0
} }
} }
@ -151,7 +166,10 @@ async fn echo_info(
}; };
trace!("address: {:#?}",info); trace!("address: {:#?}",info);
match serde_json::to_string(&info){ match serde_json::to_string(&info){
Ok(json_data) => Ok(Response::new(full(json_data))), Ok(json_data) => {
debug!("echo info reply: {}", json_data);
return Ok(Response::new(full(json_data)));
},
Err(err) => Ok(Response::new(full(format!("error:{}",err)))) Err(err) => Ok(Response::new(full(format!("error:{}",err))))
} }
@ -369,7 +387,7 @@ async fn echo_push(whole_body: &Bytes,
ptx.push((linenum+3,Value::String(line.to_string()))); ptx.push((linenum+3,Value::String(line.to_string())));
ptx.push((linenum+4,Value::String(locktime.to_string()))); ptx.push((linenum+4,Value::String(locktime.to_string())));
ptx.push((linenum+5,Value::String(req_time.to_string()))); ptx.push((linenum+5,Value::String(req_time.to_string())));
ptx.push((linenum+6,Value::String(param.to_string()))); ptx.push((linenum+6,Value::String(netconfig.name.to_string())));
ptx.push((linenum+7,Value::String(our_address.to_string()))); ptx.push((linenum+7,Value::String(our_address.to_string())));
ptx.push((linenum+8,Value::String(our_fees.to_string()))); ptx.push((linenum+8,Value::String(our_fees.to_string())));
linenum += 9; linenum += 9;
@ -438,6 +456,12 @@ async fn echo(
if uri=="/version"{ if uri=="/version"{
ret= echo_version().await; ret= echo_version().await;
} }
if uri=="/.pub_key.pem" {
ret = echo_pub_key(cfg).await;
}
if uri=="/"{
ret = echo_home(cfg).await;
}
ret ret
} }
@ -458,23 +482,35 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
.boxed() .boxed()
} }
fn parse_env(cfg: &Arc<Mutex<MyConfig>>){ fn parse_env(cfg: &Arc<Mutex<MyConfig>>){
for (key, value) in std::env::vars() {
debug!("ENVIRONMENT {key}: {value}");
}
let mut cfg_lock = cfg.lock().unwrap(); let mut cfg_lock = cfg.lock().unwrap();
if let Ok(value) = env::var("BAL_SERVER_DB_FILE") { if let Ok(value) = env::var("BAL_SERVER_DB_FILE") {
debug!("BAL_SERVER_DB_FILE: {}",value);
cfg_lock.db_file = value; cfg_lock.db_file = value;
} }
if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") { if let Ok(value) = env::var("BAL_SERVER_BIND_ADDRESS") {
debug!("BAL_SERVER_BIND_ADDRESS: {}",value);
cfg_lock.bind_address= value; cfg_lock.bind_address= value;
} }
if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") { if let Ok(value) = env::var("BAL_SERVER_BIND_PORT") {
debug!("BAL_SERVER_BIND_PORT: {}",value);
if let Ok(v) = value.parse::<u16>(){ if let Ok(v) = value.parse::<u16>(){
cfg_lock.bind_port = v; cfg_lock.bind_port = v;
} }
} }
if let Ok(value) = env::var("BAL_SERVER_INFO"){ if let Ok(value) = env::var("BAL_SERVER_PUB_KEY_PATH") {
cfg_lock.info = value; debug!("BAL_SERVER_PUB_KEY_PATH: {}",value);
cfg_lock.pub_key_path = value;
} }
if let Ok(value) = env::var("BAL_SERVER_INFO"){
debug!("BAL_SERVER_INFO: {}",value);
cfg_lock.info = value;
}
cfg_lock = parse_env_netconfig(cfg_lock,"regtest"); cfg_lock = parse_env_netconfig(cfg_lock,"regtest");
cfg_lock = parse_env_netconfig(cfg_lock,"signet"); cfg_lock = parse_env_netconfig(cfg_lock,"signet");
cfg_lock = parse_env_netconfig(cfg_lock,"testnet"); cfg_lock = parse_env_netconfig(cfg_lock,"testnet");
@ -489,6 +525,7 @@ fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a st
&_ => &mut cfg_lock.mainnet, &_ => &mut cfg_lock.mainnet,
}; };
if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) { if let Ok(value) = env::var(format!("BAL_SERVER_{}_ADDRESS",chain.to_uppercase())) {
debug!("BAL_SERVER_{}_ADDRESS: {}",chain.to_uppercase(),value);
cfg.address = value; cfg.address = value;
if cfg.address.len() > 5 { if cfg.address.len() > 5 {
if cfg.address[1..4] == *"pub" { if cfg.address[1..4] == *"pub" {
@ -499,7 +536,8 @@ fn parse_env_netconfig<'a>(mut cfg_lock: MutexGuard<'a, MyConfig>, chain: &'a st
} }
} }
if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXE_FEE",chain.to_uppercase())) { if let Ok(value) = env::var(format!("BAL_SERVER_{}_FIXED_FEE",chain.to_uppercase())) {
debug!("BAL_SERVER_{}_FIXED_FEE: {}",chain.to_uppercase(),value);
if let Ok(v) = value.parse::<u64>(){ if let Ok(v) = value.parse::<u64>(){
cfg.fixed_fee = v; cfg.fixed_fee = v;
} }

View File

@ -17,6 +17,8 @@ pub fn create_database(db: &Connection){
let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)"); let _ = db.execute("CREATE UNIQUE INDEX idx_xpub ON tbl_xpub (network, xpub)");
let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);"); let _ = db.execute("CREATE TABLE IF NOT EXISTS tbl_address (address TEXT PRIMARY_KEY, path TEXT NOT NULL, date_create TIMESTAMP DEFAULT CURRENT_TIMESTAMP, xpub INTEGER,remote_address TEXT);");
let _ = db.execute("UPDATE tbl_tx set network='bitcoin' where network='mainnet');");
} }
/* /*
@ -129,4 +131,17 @@ pub fn execute_insert(db: &Connection,
Ok(()) Ok(())
} }
pub fn get_total_transaction_number(db: Connection, network: &String) -> Result<i64,Error> {
let mut stmt = db.prepare("SELECT COUNT(*) as total_number FROM tbl_tx where network = ?;").unwrap();
stmt.bind((1,Value::String(network.to_string()))).unwrap();
match stmt.next(){
Ok(State::Row)=>{
Ok(stmt.read::<i64,_>("total_number").unwrap())
},
Ok(sqlite::State::Done) => todo!(),
Err(err)=>Err(err)
}
}