PeakeCoin CrossChain Transfer - hive_bridge

Attempting to work on and debug our backend cross-chain swapping actions. We try and keep all code accessible.
hive_bridge/main.py
from common.config import HIVE_BRIDGE_ACCOUNT, PEK_TOKEN_SYMBOL
from common.logger import logger
from common.db import init_db, get_db
from beem import Hive
from beem.account import Account
from beem.nodelist import NodeList
import json as pyjson
import re
import signal
import sys
import time
import sqlite3
HIVE_NODES_FALLBACK = [
"https://api.hive.blog",
"https://anyx.io",
"https://rpc.ausbit.dev",
"https://hive.roelandp.nl",
]
# ---- graceful shutdown -------------------------------------------------------
_shutdown = False
def _handle_sigint(signum, frame):
global _shutdown
_shutdown = True
signal.signal(signal.SIGINT, _handle_sigint)
signal.signal(signal.SIGTERM, _handle_sigint)
# ---- simple KV state (last processed block) ----------------------------------
def _get_state(key: str, default=None):
conn = get_db()
try:
c = conn.cursor()
c.execute("CREATE TABLE IF NOT EXISTS state (key TEXT PRIMARY KEY, value TEXT)")
c.execute("SELECT value FROM state WHERE key = ?", (key,))
row = c.fetchone()
return row[0] if row else default
finally:
conn.close()
def _set_state(key: str, value: str):
conn = get_db()
try:
c = conn.cursor()
c.execute("INSERT INTO state(key, value) VALUES(?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, str(value)))
conn.commit()
finally:
conn.close()
# ---- event insert with dedupe ------------------------------------------------
def _insert_event(source: str, tx_id: str, amount: float, to_address: str):
conn = get_db()
try:
c = conn.cursor()
# Make sure your common.db init created a UNIQUE index like:
# CREATE UNIQUE INDEX IF NOT EXISTS ux_events_source_txid ON events(source, tx_id);
c.execute(
"INSERT OR IGNORE INTO events (source, tx_id, amount, to_address, status) VALUES (?, ?, ?, ?, ?)",
(source, tx_id, amount, to_address, 'pending')
)
conn.commit()
return c.rowcount > 0
finally:
conn.close()
# ---- validation --------------------------------------------------------------
_polygon_addr_re = re.compile(r"^0x[a-fA-F0-9]{40}$")
def _is_polygon_address(s: str) -> bool:
return bool(s and _polygon_addr_re.match(s.strip()))
# ---- hive client w/ rotation -------------------------------------------------
def _make_hive(nodes):
# Do not broadcast, short timeouts, auto-retries
return Hive(node=nodes, nobroadcast=True, timeout=30, num_retries=3)
def monitor_hive_transfers(poll_interval_sec: int = 2, backoff_max: int = 16):
logger.info("Monitoring Hive Engine for PEK → bridge transfers …")
# Build node list (prefer live list, fall back to static)
try:
nl = NodeList()
nl.update_nodes()
hive_nodes = nl.get_nodes(normal=True, appbase=True) or HIVE_NODES_FALLBACK[:]
except Exception:
hive_nodes = HIVE_NODES_FALLBACK[:]
hive = _make_hive(hive_nodes)
backoff = 1
# Start from last irreversible block to reduce dupes; persist progress
last_processed = _get_state("hive_last_processed_block")
try:
dgp = hive.get_dynamic_global_properties()
lib = dgp["last_irreversible_block_num"]
except Exception as e:
logger.warning(f"Could not fetch DGP at start: {e}")
lib = None
if last_processed is None:
# First run: start a bit behind LIB (or head) to be safe
try:
head = hive.get_dynamic_global_properties()["head_block_number"]
except Exception:
head = None
start_from = (lib or head or 0) - 5
last_processed = max(start_from, 0)
_set_state("hive_last_processed_block", last_processed)
logger.info(f"Initialized last_processed to {last_processed}")
last_processed = int(last_processed)
while not _shutdown:
try:
dgp = hive.get_dynamic_global_properties()
head = dgp["head_block_number"]
lib = dgp["last_irreversible_block_num"]
# Only process up to LIB to avoid reorgs
target = lib
if target <= last_processed:
time.sleep(poll_interval_sec)
continue
# Process blocks (cap each loop to avoid huge catch-ups)
end = min(last_processed + 200, target)
for block_num in range(last_processed + 1, end + 1):
block = hive.rpc.get_block(block_num)
if not block:
continue
txs = block.get("transactions", [])
for tx in txs:
# NOTE: HIVE "transactions" don't always include tx_id here.
# We derive tx_id from the operation receipt returned by nodes that provide it,
# but some RPCs won't. Fallback: compose a pseudo-id for dedupe.
tx_id = tx.get("transaction_id") or f"{block_num}:{hash(pyjson.dumps(tx, sort_keys=True))}"
for op in tx.get("operations", []):
if not isinstance(op, (list, tuple)) or len(op) != 2:
continue
op_name, op_data = op
if op_name != "custom_json":
continue
if op_data.get("id") != "ssc-mainnet-hive":
continue
raw_json = op_data.get("json")
try:
data = pyjson.loads(raw_json) if isinstance(raw_json, str) else (raw_json or {})
except Exception as e:
logger.debug(f"custom_json parse error at block {block_num}: {e}")
continue
# Looking for: tokens.transfer to bridge account with PEK symbol
if data.get("contractName") == "tokens" and data.get("contractAction") == "transfer":
payload = data.get("contractPayload") or {}
if (
payload.get("symbol") == PEK_TOKEN_SYMBOL
and payload.get("to") == HIVE_BRIDGE_ACCOUNT
):
# quantity can be string; memo should be polygon address
try:
amount = float(payload.get("quantity", 0))
except Exception:
amount = 0.0
memo = (payload.get("memo") or "").strip()
if not _is_polygon_address(memo):
logger.warning(
f"Ignoring transfer (invalid Polygon memo): memo='{memo}' tx={tx_id} blk={block_num}"
)
continue
if _insert_event("hive", tx_id, amount, memo):
logger.info(
f"Bridge event recorded: {amount:g} {PEK_TOKEN_SYMBOL} → {memo} (tx={tx_id}, blk={block_num})"
)
else:
logger.debug(f"Duplicate event ignored (tx={tx_id})")
last_processed = block_num
if block_num % 20 == 0:
_set_state("hive_last_processed_block", last_processed)
# Persist after each batch
_set_state("hive_last_processed_block", last_processed)
# Healthy pass → reset backoff
backoff = 1
time.sleep(poll_interval_sec)
except Exception as e:
logger.error(f"Hive monitor error: {e}")
# Rotate nodes
try:
hive_nodes.append(hive_nodes.pop(0))
hive = _make_hive(hive_nodes)
except Exception as e2:
logger.error(f"Node rotation failed: {e2}")
# Backoff (capped)
time.sleep(backoff)
backoff = min(backoff * 2, backoff_max)
logger.info("Shutdown requested; saving state …")
_set_state("hive_last_processed_block", last_processed)
if __name__ == "__main__":
# Ensure DB has the structures we need. Your common.db should create:
# - events(id PK, source TEXT, tx_id TEXT, amount REAL, to_address TEXT, status TEXT, created_at default now)
# - UNIQUE index on (source, tx_id)
# - state(key PRIMARY KEY, value TEXT)
init_db()
monitor_hive_transfers()
Upvoted! Thank you for supporting witness @jswit.