governance: enforce runtime verification coverage gates (#301)
This commit is contained in:
78
scripts/runtime_verify_monitor.sh
Executable file
78
scripts/runtime_verify_monitor.sh
Executable file
@@ -0,0 +1,78 @@
|
||||
#!/usr/bin/env bash
|
||||
# Runtime verification monitor with NOT_OBSERVED detection.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
ROOT_DIR="${ROOT_DIR:-/home/agentson/repos/The-Ouroboros}"
|
||||
LOG_DIR="${LOG_DIR:-$ROOT_DIR/data/overnight}"
|
||||
INTERVAL_SEC="${INTERVAL_SEC:-60}"
|
||||
MAX_HOURS="${MAX_HOURS:-24}"
|
||||
|
||||
cd "$ROOT_DIR"
|
||||
|
||||
OUT_LOG="$LOG_DIR/runtime_verify_$(date +%Y%m%d_%H%M%S).log"
|
||||
END_TS=$(( $(date +%s) + MAX_HOURS*3600 ))
|
||||
|
||||
log() {
|
||||
printf '%s %s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "$1" | tee -a "$OUT_LOG" >/dev/null
|
||||
}
|
||||
|
||||
check_signal() {
|
||||
local name="$1"
|
||||
local pattern="$2"
|
||||
local run_log="$3"
|
||||
|
||||
if rg -q "$pattern" "$run_log"; then
|
||||
log "[COVERAGE] ${name}=PASS pattern=${pattern}"
|
||||
return 0
|
||||
fi
|
||||
log "[COVERAGE] ${name}=NOT_OBSERVED pattern=${pattern}"
|
||||
return 1
|
||||
}
|
||||
|
||||
log "[INFO] runtime verify monitor started interval=${INTERVAL_SEC}s max_hours=${MAX_HOURS}"
|
||||
|
||||
while true; do
|
||||
now=$(date +%s)
|
||||
if [ "$now" -ge "$END_TS" ]; then
|
||||
log "[INFO] monitor completed (time window reached)"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
latest_run="$(ls -t "$LOG_DIR"/run_*.log 2>/dev/null | head -n1 || true)"
|
||||
if [ -z "$latest_run" ]; then
|
||||
log "[ANOMALY] no run log found"
|
||||
sleep "$INTERVAL_SEC"
|
||||
continue
|
||||
fi
|
||||
|
||||
# Basic liveness hints.
|
||||
app_pid="$(cat "$LOG_DIR/app.pid" 2>/dev/null || true)"
|
||||
wd_pid="$(cat "$LOG_DIR/watchdog.pid" 2>/dev/null || true)"
|
||||
app_alive=0
|
||||
wd_alive=0
|
||||
port_alive=0
|
||||
[ -n "$app_pid" ] && kill -0 "$app_pid" 2>/dev/null && app_alive=1
|
||||
[ -n "$wd_pid" ] && kill -0 "$wd_pid" 2>/dev/null && wd_alive=1
|
||||
ss -ltnp 2>/dev/null | rg -q ':8080' && port_alive=1
|
||||
log "[HEARTBEAT] run_log=$latest_run app_alive=$app_alive watchdog_alive=$wd_alive port8080=$port_alive"
|
||||
|
||||
# Coverage matrix rows (session paths and policy gate evidence).
|
||||
not_observed=0
|
||||
check_signal "LIVE_MODE" "Mode: live" "$latest_run" || not_observed=$((not_observed+1))
|
||||
check_signal "KR_LOOP" "Processing market: Korea Exchange" "$latest_run" || not_observed=$((not_observed+1))
|
||||
check_signal "NXT_PATH" "NXT_PRE|NXT_AFTER|session=NXT_" "$latest_run" || not_observed=$((not_observed+1))
|
||||
check_signal "US_PRE_PATH" "US_PRE|session=US_PRE" "$latest_run" || not_observed=$((not_observed+1))
|
||||
check_signal "US_DAY_PATH" "US_DAY|session=US_DAY|Processing market: .*NASDAQ|Processing market: .*NYSE|Processing market: .*AMEX" "$latest_run" || not_observed=$((not_observed+1))
|
||||
check_signal "US_AFTER_PATH" "US_AFTER|session=US_AFTER" "$latest_run" || not_observed=$((not_observed+1))
|
||||
check_signal "ORDER_POLICY_SESSION" "Order policy rejected .*\\[session=" "$latest_run" || not_observed=$((not_observed+1))
|
||||
|
||||
if [ "$not_observed" -gt 0 ]; then
|
||||
log "[ANOMALY] coverage_not_observed=$not_observed (treat as FAIL)"
|
||||
else
|
||||
log "[OK] coverage complete (NOT_OBSERVED=0)"
|
||||
fi
|
||||
|
||||
sleep "$INTERVAL_SEC"
|
||||
done
|
||||
|
||||
61
scripts/validate_governance_assets.py
Normal file
61
scripts/validate_governance_assets.py
Normal file
@@ -0,0 +1,61 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Validate persistent governance assets for agent workflow safety."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def must_contain(path: Path, required: list[str], errors: list[str]) -> None:
|
||||
if not path.exists():
|
||||
errors.append(f"missing file: {path}")
|
||||
return
|
||||
text = path.read_text(encoding="utf-8")
|
||||
for token in required:
|
||||
if token not in text:
|
||||
errors.append(f"{path}: missing required token -> {token}")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
errors: list[str] = []
|
||||
|
||||
pr_template = Path(".gitea/PULL_REQUEST_TEMPLATE.md")
|
||||
issue_template = Path(".gitea/ISSUE_TEMPLATE/runtime_verification.md")
|
||||
|
||||
must_contain(
|
||||
pr_template,
|
||||
[
|
||||
"Closes #N",
|
||||
"Main -> Verifier Directive Contract",
|
||||
"Coverage Matrix",
|
||||
"NOT_OBSERVED",
|
||||
"tea",
|
||||
"gh",
|
||||
],
|
||||
errors,
|
||||
)
|
||||
must_contain(
|
||||
issue_template,
|
||||
[
|
||||
"[RUNTIME-VERIFY][SCN-XXX]",
|
||||
"Requirement Mapping",
|
||||
"Close Criteria",
|
||||
"NOT_OBSERVED = 0",
|
||||
],
|
||||
errors,
|
||||
)
|
||||
|
||||
if errors:
|
||||
print("[FAIL] governance asset validation failed")
|
||||
for err in errors:
|
||||
print(f"- {err}")
|
||||
return 1
|
||||
|
||||
print("[OK] governance assets validated")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
||||
Reference in New Issue
Block a user