ci: fix lint baseline and stabilize failing main tests
Some checks failed
Gitea CI / test (push) Failing after 5s
Gitea CI / test (pull_request) Failing after 5s

This commit is contained in:
agentson
2026-03-01 20:17:13 +09:00
parent 6f047a6daf
commit 5730f0db2a
64 changed files with 1041 additions and 1380 deletions

View File

@@ -2,8 +2,8 @@
from __future__ import annotations
from dataclasses import dataclass
import math
from dataclasses import dataclass
@dataclass(frozen=True)

View File

@@ -2,12 +2,11 @@
from __future__ import annotations
from dataclasses import dataclass
import math
from dataclasses import dataclass
from random import Random
from typing import Literal
OrderSide = Literal["BUY", "SELL"]
@@ -77,7 +76,9 @@ class BacktestExecutionModel:
reason="execution_failure",
)
slip_mult = 1.0 + (slippage_bps / 10000.0 if request.side == "BUY" else -slippage_bps / 10000.0)
slip_mult = 1.0 + (
slippage_bps / 10000.0 if request.side == "BUY" else -slippage_bps / 10000.0
)
exec_price = request.reference_price * slip_mult
if self._rng.random() < partial_rate:

View File

@@ -10,8 +10,7 @@ from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime
from statistics import mean
from typing import Literal
from typing import cast
from typing import Literal, cast
from src.analysis.backtest_cost_guard import BacktestCostModel, validate_backtest_cost_model
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier

View File

@@ -104,6 +104,7 @@ class MarketScanner:
# Store in L7 real-time layer
from datetime import UTC, datetime
timeframe = datetime.now(UTC).isoformat()
self.context_store.set_context(
ContextLayer.L7_REALTIME,
@@ -158,12 +159,8 @@ class MarketScanner:
top_movers = valid_metrics[: self.top_n]
# Detect breakouts and breakdowns
breakouts = [
m.stock_code for m in valid_metrics if self.analyzer.is_breakout(m)
]
breakdowns = [
m.stock_code for m in valid_metrics if self.analyzer.is_breakdown(m)
]
breakouts = [m.stock_code for m in valid_metrics if self.analyzer.is_breakout(m)]
breakdowns = [m.stock_code for m in valid_metrics if self.analyzer.is_breakdown(m)]
logger.info(
"%s scan complete: %d scanned, top momentum=%.1f, %d breakouts, %d breakdowns",
@@ -228,10 +225,9 @@ class MarketScanner:
# If we removed too many, backfill from current watchlist
if len(updated) < len(current_watchlist):
backfill = [
code for code in current_watchlist
if code not in updated
][: len(current_watchlist) - len(updated)]
backfill = [code for code in current_watchlist if code not in updated][
: len(current_watchlist) - len(updated)
]
updated.extend(backfill)
logger.info(

View File

@@ -158,7 +158,12 @@ class SmartVolatilityScanner:
price = latest_close
latest_high = _safe_float(latest.get("high"))
latest_low = _safe_float(latest.get("low"))
if latest_close > 0 and latest_high > 0 and latest_low > 0 and latest_high >= latest_low:
if (
latest_close > 0
and latest_high > 0
and latest_low > 0
and latest_high >= latest_low
):
intraday_range_pct = (latest_high - latest_low) / latest_close * 100.0
if volume <= 0:
volume = _safe_float(latest.get("volume"))
@@ -234,9 +239,7 @@ class SmartVolatilityScanner:
limit=50,
)
except Exception as exc:
logger.warning(
"Overseas fluctuation ranking failed for %s: %s", market.code, exc
)
logger.warning("Overseas fluctuation ranking failed for %s: %s", market.code, exc)
fluct_rows = []
if not fluct_rows:
@@ -250,9 +253,7 @@ class SmartVolatilityScanner:
limit=50,
)
except Exception as exc:
logger.warning(
"Overseas volume ranking failed for %s: %s", market.code, exc
)
logger.warning("Overseas volume ranking failed for %s: %s", market.code, exc)
volume_rows = []
for idx, row in enumerate(volume_rows):
@@ -433,16 +434,10 @@ def _extract_intraday_range_pct(row: dict[str, Any], price: float) -> float:
if price <= 0:
return 0.0
high = _safe_float(
row.get("high")
or row.get("ovrs_hgpr")
or row.get("stck_hgpr")
or row.get("day_hgpr")
row.get("high") or row.get("ovrs_hgpr") or row.get("stck_hgpr") or row.get("day_hgpr")
)
low = _safe_float(
row.get("low")
or row.get("ovrs_lwpr")
or row.get("stck_lwpr")
or row.get("day_lwpr")
row.get("low") or row.get("ovrs_lwpr") or row.get("stck_lwpr") or row.get("day_lwpr")
)
if high <= 0 or low <= 0 or high < low:
return 0.0

View File

@@ -6,10 +6,10 @@ Implements first-touch labeling with upper/lower/time barriers.
from __future__ import annotations
import warnings
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Literal, Sequence
from typing import Literal
TieBreakMode = Literal["stop_first", "take_first"]
@@ -92,7 +92,10 @@ def label_with_triple_barrier(
else:
assert spec.max_holding_bars is not None
warnings.warn(
"TripleBarrierSpec.max_holding_bars is deprecated; use max_holding_minutes with timestamps instead.",
(
"TripleBarrierSpec.max_holding_bars is deprecated; "
"use max_holding_minutes with timestamps instead."
),
DeprecationWarning,
stacklevel=2,
)

View File

@@ -92,9 +92,7 @@ class VolatilityAnalyzer:
recent_tr = true_ranges[-period:]
return sum(recent_tr) / len(recent_tr)
def calculate_price_change(
self, current_price: float, past_price: float
) -> float:
def calculate_price_change(self, current_price: float, past_price: float) -> float:
"""Calculate price change percentage.
Args:
@@ -108,9 +106,7 @@ class VolatilityAnalyzer:
return 0.0
return ((current_price - past_price) / past_price) * 100
def calculate_volume_surge(
self, current_volume: float, avg_volume: float
) -> float:
def calculate_volume_surge(self, current_volume: float, avg_volume: float) -> float:
"""Calculate volume surge ratio.
Args:
@@ -240,11 +236,7 @@ class VolatilityAnalyzer:
Momentum score (0-100)
"""
# Weight recent changes more heavily
weighted_change = (
price_change_1m * 0.4 +
price_change_5m * 0.3 +
price_change_15m * 0.2
)
weighted_change = price_change_1m * 0.4 + price_change_5m * 0.3 + price_change_15m * 0.2
# Volume contribution (normalized to 0-10 scale)
volume_contribution = min(10.0, (volume_surge - 1.0) * 5.0)
@@ -301,17 +293,11 @@ class VolatilityAnalyzer:
if len(close_prices) > 0:
if len(close_prices) >= 1:
price_change_1m = self.calculate_price_change(
current_price, close_prices[-1]
)
price_change_1m = self.calculate_price_change(current_price, close_prices[-1])
if len(close_prices) >= 5:
price_change_5m = self.calculate_price_change(
current_price, close_prices[-5]
)
price_change_5m = self.calculate_price_change(current_price, close_prices[-5])
if len(close_prices) >= 15:
price_change_15m = self.calculate_price_change(
current_price, close_prices[-15]
)
price_change_15m = self.calculate_price_change(current_price, close_prices[-15])
# Calculate volume surge
avg_volume = sum(volumes) / len(volumes) if volumes else current_volume