Building a Real-Time Options Market Alerting System
This guide covers architecture, implementation, and production deployment of an alerting system for GEX level breaches, unusual options volume, and VIX regime changes.
Building a Real-Time Options Market Alerting System
A Comprehensive Technical Guide
This guide covers architecture, implementation, and production deployment of an alerting system for GEX level breaches, unusual options volume, and VIX regime changes.
1. System Architecture Overview
High-Level Design
┌─────────────────────────────────────────────────────────────────────┐ │ DATA INGESTION LAYER │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌───────────────────────────┐ │ │ │ CBOE WebSocket│ │ Options Chain │ │ VIX / Volatility Surface │ │ │ │ (GEX Levels) │ │ (Volume Data) │ │ (Regime Detection) │ │ │ └──────┬───────┘ └──────┬───────┘ └───────────┬───────────────┘ │ └─────────┼─────────────────┼───────────────────────┼─────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ PROCESSING & ANALYSIS LAYER │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌───────────────────────────┐ │ │ │ GEX Calculator│ │ Volume │ │ VIX Regime │ │ │ │ & Level │ │ Anomaly │ │ Classifier │ │ │ │ Monitor │ │ Detector │ │ (HMM / Threshold) │ │ │ └──────┬───────┘ └──────┬───────┘ └───────────┬───────────────┘ │ └─────────┼─────────────────┼───────────────────────┼─────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ ALERT ENGINE LAYER │ │ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ Condition Evaluator + Deduplication │ │ │ │ Cooldown Manager + Priority Router │ │ │ └──────────────────────────┬───────────────────────────────────┘ │ └─────────────────────────────┼───────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ NOTIFICATION DISPATCH LAYER │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌───────┐ ┌────────────┐ │ │ │ Telegram │ │ Pushover │ │ AWS SNS │ │ ntfy │ │ Discord/ │ │ │ │ Bot API │ │ │ │ │ │ .sh │ │ Slack │ │ │ └──────────┘ └──────────┘ └──────────┘ └───────┘ └────────────┘ │ └─────────────────────────────────────────────────────────────────────┘
Key Design Principles
- Separation of concerns: Data ingestion, analysis, alerting logic, and notification dispatch are independent layers
- Async-first: All I/O-bound work uses
asyncioto maximize throughput - Backpressure handling: Bounded queues between layers prevent memory exhaustion during spikes
- Idempotent alerts: Deduplication and cooldown prevent notification storms
- Graceful degradation: If one notification channel fails, others continue
2. Data Ingestion Layer
2.1 WebSocket Client for Real-Time Market Data
The foundation is a robust WebSocket client. Python’s websockets library is the standard choice for production async WebSocket connections.
# ingestion/ws_client.py import asyncio import json import logging from datetime import datetime, timezone from typing import Callable, Optional from dataclasses import dataclass, field import websockets from websockets.exceptions import ConnectionClosed logger = logging.getLogger(__name__) @dataclass class WebSocketConfig: uri: str ping_interval: float = 20.0 ping_timeout: float = 10.0 max_reconnect_delay: float = 60.0 initial_reconnect_delay: float = 1.0 max_message_size: int = 10 * 1024 * 1024 # 10 MB subscribe_messages: list = field(default_factory=list) class ResilientWebSocket: """ Production WebSocket client with exponential backoff reconnection, heartbeat monitoring, and message routing. """ def __init__( self, config: WebSocketConfig, on_message: Callable, on_connect: Optional[Callable] = None, ): self.config = config self.on_message = on_message self.on_connect = on_connect self._ws: Optional[websockets.WebSocketClientProtocol] = None self._reconnect_delay = config.initial_reconnect_delay self._running = False self._last_message_time: Optional[datetime] = None self._message_count = 0 self._connection_count = 0 async def start(self): """Main entry point. Runs until stop() is called.""" self._running = True while self._running: try: await self._connect_and_listen() except Exception as e: if not self._running: break logger.error(f"Connection error: {e}") await self._backoff_reconnect() async def stop(self): self._running = False if self._ws: await self._ws.close() async def _connect_and_listen(self): async with websockets.connect( self.config.uri, ping_interval=self.config.ping_interval, ping_timeout=self.config.ping_timeout, max_size=self.config.max_message_size, ) as ws: self._ws = ws self._connection_count += 1 self._reconnect_delay = self.config.initial_reconnect_delay logger.info( f"Connected to {self.config.uri} " f"(connection #{self._connection_count})" ) # Send subscription messages for msg in self.config.subscribe_messages: await ws.send(json.dumps(msg)) logger.debug(f"Sent subscription: {msg}") if self.on_connect: await self.on_connect() # Main receive loop async for raw_message in ws: self._last_message_time = datetime.now(timezone.utc) self._message_count += 1 try: data = json.loads(raw_message) await self.on_message(data) except json.JSONDecodeError: logger.warning(f"Non-JSON message: {raw_message[:200]}") async def _backoff_reconnect(self): delay = self._reconnect_delay logger.info(f"Reconnecting in {delay:.1f}s...") await asyncio.sleep(delay) self._reconnect_delay = min( delay * 2, self.config.max_reconnect_delay ) @property def stats(self) -> dict: return { "messages_received": self._message_count, "connections": self._connection_count, "last_message": self._last_message_time, "connected": self._ws is not None and self._ws.open, }
2.2 Data Source Adapters
Different data providers require different connection patterns. Here is the adapter pattern for CBOE, Polygon.io, and Tradier.
# ingestion/adapters.py import asyncio import aiohttp from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime from typing import AsyncIterator, Optional @dataclass class OptionsSnapshot: """Normalized options data regardless of source.""" symbol: str timestamp: datetime strike: float expiry: str option_type: str # "call" or "put" bid: float ask: float volume: int open_interest: int implied_vol: float delta: float gamma: float @dataclass class VIXQuote: timestamp: datetime value: float change: float change_pct: float class DataAdapter(ABC): @abstractmethod async def stream_options(self, symbol: str) -> AsyncIterator[OptionsSnapshot]: ... @abstractmethod async def get_vix(self) -> VIXQuote: ... class PolygonAdapter(DataAdapter): """ Polygon.io provides WebSocket streaming for options trades/quotes. Business plan required for real-time options data. """ WS_URI = "wss://socket.polygon.io/options" def __init__(self, api_key: str): self.api_key = api_key self._queue: asyncio.Queue[OptionsSnapshot] = asyncio.Queue(maxsize=10_000) async def stream_options(self, symbol: str) -> AsyncIterator[OptionsSnapshot]: config = WebSocketConfig( uri=self.WS_URI, subscribe_messages=[ {"action": "auth", "params": self.api_key}, {"action": "subscribe", "params": f"T.O:{symbol}*"}, ], ) client = ResilientWebSocket(config, on_message=self._handle_message) asyncio.create_task(client.start()) while True: snapshot = await self._queue.get() yield snapshot async def _handle_message(self, data: dict): # Polygon sends arrays of events if isinstance(data, list): for event in data: if event.get("ev") == "T": # Trade event snapshot = self._parse_trade(event) if snapshot and not self._queue.full(): await self._queue.put(snapshot) def _parse_trade(self, event: dict) -> Optional[OptionsSnapshot]: try: # Polygon OCC symbol format: O:SPY251219C00600000 sym = event.get("sym", "") return OptionsSnapshot( symbol=sym, timestamp=datetime.fromtimestamp(event["t"] / 1000), strike=event.get("p", 0), expiry=self._parse_expiry(sym), option_type="call" if "C" in sym else "put", bid=0, # Fill from quotes stream ask=0, volume=event.get("s", 0), open_interest=0, implied_vol=0, delta=0, gamma=0, ) except (KeyError, ValueError) as e: return None @staticmethod def _parse_expiry(occ_symbol: str) -> str: # Extract YYMMDD from OCC format # O:SPY251219C00600000 -> 251219 -> 2025-12-19 parts = occ_symbol.split(":")[-1] date_str = parts[3:9] return f"20{date_str[:2]}-{date_str[2:4]}-{date_str[4:6]}" async def get_vix(self) -> VIXQuote: async with aiohttp.ClientSession() as session: url = ( f"https://api.polygon.io/v2/snapshot/locale/us/markets/indices/" f"tickers/I:VIX?apiKey={self.api_key}" ) async with session.get(url) as resp: data = await resp.json() ticker = data["ticker"] return VIXQuote( timestamp=datetime.now(), value=ticker["lastTrade"]["p"], change=ticker["todaysChange"], change_pct=ticker["todaysChangePerc"], ) class TradierAdapter(DataAdapter): """ Tradier provides REST-based options chains with real-time quotes. Good for polling-based approaches (free tier: 1 req/sec). """ BASE_URL = "https://api.tradier.com/v1" def __init__(self, access_token: str): self.headers = { "Authorization": f"Bearer {access_token}", "Accept": "application/json", } async def get_options_chain( self, symbol: str, expiration: str ) -> list[OptionsSnapshot]: async with aiohttp.ClientSession(headers=self.headers) as session: url = f"{self.BASE_URL}/markets/options/chains" params = { "symbol": symbol, "expiration": expiration, "greeks": "true", } async with session.get(url, params=params) as resp: data = await resp.json() options = data.get("options", {}).get("option", []) return [self._parse_option(o, symbol) for o in options] def _parse_option(self, o: dict, symbol: str) -> OptionsSnapshot: greeks = o.get("greeks", {}) return OptionsSnapshot( symbol=symbol, timestamp=datetime.now(), strike=o["strike"], expiry=o["expiration_date"], option_type=o["option_type"], bid=o.get("bid", 0), ask=o.get("ask", 0), volume=o.get("volume", 0), open_interest=o.get("open_interest", 0), implied_vol=greeks.get("mid_iv", 0), delta=greeks.get("delta", 0), gamma=greeks.get("gamma", 0), )
2.3 Polling vs. Streaming Decision Matrix
| Factor | WebSocket (Streaming) | REST Polling |
|---|---|---|
| Latency | 10-100ms | 1-60s (poll interval) |
| Cost | Higher (sustained connection) | Lower (per-request) |
| Complexity | Reconnection, heartbeats | Simpler, stateless |
| Data providers | Polygon, IBKR TWS, Alpaca | Tradier, Yahoo, CBOE delayed |
| Best for | GEX breach detection (fast) | VIX regime (slower cadence) |
| Rate limits | Connection-based | Request-based |
Recommendation: Use WebSocket for GEX and volume alerts (latency-sensitive), polling for VIX regime changes (5-15 second intervals are sufficient since regime changes are slow-moving).
3. Processing & Analysis Layer
3.1 GEX Level Calculator and Breach Monitor
# analysis/gex_monitor.py import numpy as np from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Optional from collections import deque @dataclass class GEXLevel: """A significant GEX level (support/resistance).""" strike: float gex_value: float # Net gamma exposure at this strike level_type: str # "positive_gamma_wall", "negative_gamma_zone", "flip_point" significance: float # 0-1, how important this level is @dataclass class GEXState: """Current GEX profile for a symbol.""" symbol: str timestamp: datetime total_gex: float levels: list[GEXLevel] gamma_flip_point: Optional[float] # Price where net gamma flips sign spot_price: float regime: str # "positive_gamma" or "negative_gamma" class GEXCalculator: """ Calculates Gamma Exposure (GEX) from options chain data. GEX = sum over all strikes of: (call_OI * call_gamma - put_OI * put_gamma) * 100 * spot_price The factor of 100 accounts for the contract multiplier. Dealer positioning assumption: dealers are short calls / long puts from retail flow, so net dealer gamma is the inverse. """ def __init__(self, contract_multiplier: int = 100): self.contract_multiplier = contract_multiplier self._history: deque[GEXState] = deque(maxlen=1000) def calculate( self, options: list, spot_price: float, symbol: str ) -> GEXState: strike_gex = {} for opt in options: strike = opt.strike if strike not in strike_gex: strike_gex[strike] = 0.0 gamma = abs(opt.gamma) oi = opt.open_interest if opt.option_type == "call": # Dealers short calls -> negative gamma for dealers # We track dealer gamma, so calls contribute negatively # But convention: GEX positive = dealers long gamma # Market maker hedging: long call gamma stabilizes price strike_gex[strike] += ( oi * gamma * self.contract_multiplier * spot_price ) else: # Dealers long puts from retail -> positive gamma for dealers strike_gex[strike] -= ( oi * gamma * self.contract_multiplier * spot_price ) # Find significant levels levels = self._identify_levels(strike_gex, spot_price) # Find gamma flip point flip_point = self._find_flip_point(strike_gex, spot_price) total_gex = sum(strike_gex.values()) regime = "positive_gamma" if total_gex > 0 else "negative_gamma" state = GEXState( symbol=symbol, timestamp=datetime.now(), total_gex=total_gex, levels=levels, gamma_flip_point=flip_point, spot_price=spot_price, regime=regime, ) self._history.append(state) return state def _identify_levels( self, strike_gex: dict[float, float], spot_price: float ) -> list[GEXLevel]: if not strike_gex: return [] values = np.array(list(strike_gex.values())) strikes = np.array(list(strike_gex.keys())) # Only consider strikes within 10% of spot mask = np.abs(strikes - spot_price) / spot_price < 0.10 if not mask.any(): return [] filtered_values = values[mask] filtered_strikes = strikes[mask] # Significance threshold: top 20% by absolute GEX threshold = np.percentile(np.abs(filtered_values), 80) levels = [] for strike, gex in zip(filtered_strikes, filtered_values): if abs(gex) < threshold: continue if gex > 0: level_type = "positive_gamma_wall" else: level_type = "negative_gamma_zone" significance = min(abs(gex) / np.max(np.abs(filtered_values)), 1.0) levels.append(GEXLevel( strike=float(strike), gex_value=float(gex), level_type=level_type, significance=float(significance), )) # Sort by significance descending levels.sort(key=lambda l: l.significance, reverse=True) return levels[:10] # Top 10 levels def _find_flip_point( self, strike_gex: dict[float, float], spot_price: float ) -> Optional[float]: """Find the strike where cumulative GEX changes sign near spot.""" sorted_strikes = sorted(strike_gex.keys()) cumulative = 0.0 prev_sign = None for strike in sorted_strikes: cumulative += strike_gex[strike] current_sign = 1 if cumulative >= 0 else -1 if prev_sign is not None and current_sign != prev_sign: # Sign flip found -- if it is near spot, this is meaningful if abs(strike - spot_price) / spot_price < 0.05: return strike prev_sign = current_sign return None class GEXBreachDetector: """ Detects when price crosses significant GEX levels. Breach types: - Price crossing a major positive gamma wall (support/resistance test) - Price entering a negative gamma zone (potential volatility expansion) - Price crossing the gamma flip point (regime change) """ def __init__(self, proximity_pct: float = 0.002): self.proximity_pct = proximity_pct # 0.2% = "touching" a level self._previous_state: Optional[GEXState] = None def check(self, state: GEXState) -> list[dict]: alerts = [] if self._previous_state is None: self._previous_state = state return alerts prev = self._previous_state spot = state.spot_price prev_spot = prev.spot_price # Check gamma flip point crossing if state.gamma_flip_point and prev.gamma_flip_point: flip = state.gamma_flip_point if (prev_spot < flip <= spot) or (prev_spot > flip >= spot): direction = "above" if spot > flip else "below" alerts.append({ "type": "gamma_flip_breach", "severity": "critical", "symbol": state.symbol, "message": ( f"{state.symbol} crossed gamma flip point at " f"${flip:.2f} -- now {direction} " f"(spot=${spot:.2f}). Regime: {state.regime}" ), "data": { "flip_point": flip, "spot": spot, "regime": state.regime, }, }) # Check major level breaches for level in state.levels: if level.significance < 0.5: continue # Only alert on significant levels strike = level.strike proximity = abs(spot - strike) / spot # Check if price crossed this level crossed = (prev_spot < strike <= spot) or (prev_spot > strike >= spot) touching = proximity < self.proximity_pct if crossed: alerts.append({ "type": "gex_level_breach", "severity": "high" if level.significance > 0.7 else "medium", "symbol": state.symbol, "message": ( f"{state.symbol} breached {level.level_type} at " f"${strike:.2f} (GEX={level.gex_value:,.0f}, " f"significance={level.significance:.0%})" ), "data": { "strike": strike, "gex_value": level.gex_value, "level_type": level.level_type, "significance": level.significance, }, }) elif touching and level.level_type == "positive_gamma_wall": alerts.append({ "type": "gex_level_approach", "severity": "low", "symbol": state.symbol, "message": ( f"{state.symbol} approaching gamma wall at " f"${strike:.2f} (${abs(spot - strike):.2f} away)" ), "data": {"strike": strike, "distance": abs(spot - strike)}, }) # Check regime change (positive -> negative gamma or vice versa) if prev.regime != state.regime: alerts.append({ "type": "gamma_regime_change", "severity": "critical", "symbol": state.symbol, "message": ( f"{state.symbol} gamma regime changed: " f"{prev.regime} -> {state.regime}. " f"Total GEX: {state.total_gex:,.0f}" ), "data": { "old_regime": prev.regime, "new_regime": state.regime, "total_gex": state.total_gex, }, }) self._previous_state = state return alerts
3.2 Unusual Options Volume Detector
# analysis/volume_detector.py import numpy as np from dataclasses import dataclass from datetime import datetime, timedelta from collections import defaultdict, deque from typing import Optional @dataclass class VolumeAnomaly: symbol: str strike: float expiry: str option_type: str current_volume: int average_volume: float z_score: float volume_oi_ratio: float timestamp: datetime anomaly_type: str # "volume_spike", "oi_divergence", "sweep" class UnusualVolumeDetector: """ Detects unusual options activity using multiple signals: 1. Z-score of volume vs. 20-day rolling average 2. Volume-to-OI ratio (high ratio = new positioning) 3. Rapid volume accumulation (sweep detection) 4. Put/call volume ratio divergence Thresholds are calibrated per-symbol to avoid false positives on naturally high-volume names (e.g., SPY vs. a small-cap). """ def __init__( self, z_score_threshold: float = 2.5, vol_oi_threshold: float = 1.5, sweep_window_seconds: int = 60, sweep_threshold_contracts: int = 500, lookback_days: int = 20, ): self.z_score_threshold = z_score_threshold self.vol_oi_threshold = vol_oi_threshold self.sweep_window = timedelta(seconds=sweep_window_seconds) self.sweep_threshold = sweep_threshold_contracts self.lookback_days = lookback_days # Rolling volume history: symbol -> strike -> deque of daily volumes self._volume_history: dict[str, dict[float, deque]] = defaultdict( lambda: defaultdict(lambda: deque(maxlen=lookback_days)) ) # Recent trades for sweep detection: symbol -> deque of (timestamp, volume) self._recent_trades: dict[str, deque] = defaultdict( lambda: deque(maxlen=5000) ) def update_daily_volume( self, symbol: str, strike: float, volume: int ): """Call at end of day to update rolling baseline.""" self._volume_history[symbol][strike].append(volume) def check(self, snapshot: "OptionsSnapshot") -> list[VolumeAnomaly]: anomalies = [] # 1. Z-score check history = self._volume_history[snapshot.symbol].get(snapshot.strike) if history and len(history) >= 5: hist_array = np.array(list(history)) mean = hist_array.mean() std = hist_array.std() if std > 0: z_score = (snapshot.volume - mean) / std if z_score > self.z_score_threshold: anomalies.append(VolumeAnomaly( symbol=snapshot.symbol, strike=snapshot.strike, expiry=snapshot.expiry, option_type=snapshot.option_type, current_volume=snapshot.volume, average_volume=float(mean), z_score=float(z_score), volume_oi_ratio=( snapshot.volume / max(snapshot.open_interest, 1) ), timestamp=snapshot.timestamp, anomaly_type="volume_spike", )) # 2. Volume-to-OI ratio check if snapshot.open_interest > 0: vol_oi = snapshot.volume / snapshot.open_interest if vol_oi > self.vol_oi_threshold and snapshot.volume > 100: anomalies.append(VolumeAnomaly( symbol=snapshot.symbol, strike=snapshot.strike, expiry=snapshot.expiry, option_type=snapshot.option_type, current_volume=snapshot.volume, average_volume=0, z_score=0, volume_oi_ratio=vol_oi, timestamp=snapshot.timestamp, anomaly_type="oi_divergence", )) # 3. Sweep detection (rapid volume accumulation) key = f"{snapshot.symbol}:{snapshot.strike}:{snapshot.option_type}" self._recent_trades[key].append( (snapshot.timestamp, snapshot.volume) ) sweep_volume = self._calculate_sweep_volume(key, snapshot.timestamp) if sweep_volume > self.sweep_threshold: anomalies.append(VolumeAnomaly( symbol=snapshot.symbol, strike=snapshot.strike, expiry=snapshot.expiry, option_type=snapshot.option_type, current_volume=sweep_volume, average_volume=0, z_score=0, volume_oi_ratio=0, timestamp=snapshot.timestamp, anomaly_type="sweep", )) return anomalies def _calculate_sweep_volume(self, key: str, now: datetime) -> int: total = 0 cutoff = now - self.sweep_window for ts, vol in self._recent_trades[key]: if ts >= cutoff: total += vol return total
3.3 VIX Regime Classifier
# analysis/vix_regime.py import numpy as np from dataclasses import dataclass from datetime import datetime from collections import deque from enum import Enum from typing import Optional class VIXRegime(Enum): LOW_VOL = "low_volatility" # VIX < 15 NORMAL = "normal" # 15 <= VIX < 20 ELEVATED = "elevated" # 20 <= VIX < 25 HIGH_VOL = "high_volatility" # 25 <= VIX < 30 EXTREME = "extreme_fear" # VIX >= 30 @dataclass class VIXRegimeState: current_regime: VIXRegime previous_regime: Optional[VIXRegime] vix_value: float vix_percentile: float # Where current VIX sits vs. 1Y history regime_duration_minutes: int # How long in current regime vvix: Optional[float] # Vol-of-vol if available term_structure_slope: Optional[float] # VIX - VIX3M timestamp: datetime class VIXRegimeClassifier: """ Classifies VIX into regimes and detects transitions. Uses both absolute thresholds and rate-of-change: - Absolute levels for regime classification - Rate of change for spike/crash detection - Term structure (contango/backwardation) for regime confirmation - VVIX for uncertainty-of-uncertainty The classifier uses hysteresis to prevent rapid oscillation between regimes at boundary values. """ # Regime thresholds with hysteresis band THRESHOLDS = { VIXRegime.LOW_VOL: (0, 15), VIXRegime.NORMAL: (14, 20), VIXRegime.ELEVATED: (19, 25), VIXRegime.HIGH_VOL: (24, 30), VIXRegime.EXTREME: (29, float("inf")), } def __init__( self, spike_threshold_pct: float = 15.0, # 15% move in VIX spike_window_minutes: int = 30, history_size: int = 252 * 78, # ~1Y of 5-min bars ): self.spike_threshold_pct = spike_threshold_pct self.spike_window = spike_window_minutes self._history: deque[tuple[datetime, float]] = deque(maxlen=history_size) self._current_regime: Optional[VIXRegime] = None self._regime_start: Optional[datetime] = None def update(self, vix: float, timestamp: datetime, vvix: Optional[float] = None, vix3m: Optional[float] = None) -> list[dict]: """ Process a new VIX reading and return any regime-change alerts. """ self._history.append((timestamp, vix)) alerts = [] # Classify current regime new_regime = self._classify(vix) # Calculate percentile percentile = self._calculate_percentile(vix) # Check for regime transition if self._current_regime is not None and new_regime != self._current_regime: duration = 0 if self._regime_start: duration = int((timestamp - self._regime_start).total_seconds() / 60) severity = self._transition_severity(self._current_regime, new_regime) term_slope = None if vix3m is not None: term_slope = vix - vix3m # Positive = backwardation (fear) state = VIXRegimeState( current_regime=new_regime, previous_regime=self._current_regime, vix_value=vix, vix_percentile=percentile, regime_duration_minutes=duration, vvix=vvix, term_structure_slope=term_slope, timestamp=timestamp, ) alerts.append({ "type": "vix_regime_change", "severity": severity, "message": self._format_regime_message(state), "data": { "vix": vix, "old_regime": self._current_regime.value, "new_regime": new_regime.value, "percentile": percentile, "vvix": vvix, "term_structure": term_slope, "prev_regime_duration_min": duration, }, }) self._regime_start = timestamp elif self._current_regime is None: self._regime_start = timestamp self._current_regime = new_regime # Check for VIX spike (rapid move) spike_alert = self._check_spike(vix, timestamp) if spike_alert: alerts.append(spike_alert) return alerts def _classify(self, vix: float) -> VIXRegime: """ Classify VIX with hysteresis: once in a regime, you need to cross the *next* regime's lower bound to transition up, or the *current* regime's lower bound to transition down. """ if self._current_regime is None: # First reading -- use midpoints for regime in VIXRegime: lo, hi = self.THRESHOLDS[regime] if lo <= vix < hi: return regime return VIXRegime.EXTREME # With hysteresis: check if VIX has moved far enough to change current_lo, current_hi = self.THRESHOLDS[self._current_regime] if current_lo <= vix < current_hi: return self._current_regime # Stay in current regime # VIX has moved outside current regime bounds for regime in VIXRegime: lo, hi = self.THRESHOLDS[regime] if lo <= vix < hi: return regime return VIXRegime.EXTREME def _check_spike(self, current_vix: float, now: datetime) -> Optional[dict]: """Detect rapid VIX spikes within the spike window.""" if len(self._history) < 2: return None cutoff = now - __import__("datetime").timedelta(minutes=self.spike_window) baseline_values = [v for t, v in self._history if t <= cutoff] if not baseline_values: return None baseline = baseline_values[-1] # Most recent value before window change_pct = ((current_vix - baseline) / baseline) * 100 if abs(change_pct) > self.spike_threshold_pct: direction = "spike" if change_pct > 0 else "crash" return { "type": f"vix_{direction}", "severity": "critical" if abs(change_pct) > 25 else "high", "message": ( f"VIX {direction}: {change_pct:+.1f}% in " f"{self.spike_window} minutes " f"({baseline:.2f} -> {current_vix:.2f})" ), "data": { "vix": current_vix, "baseline": baseline, "change_pct": change_pct, "window_minutes": self.spike_window, }, } return None def _calculate_percentile(self, vix: float) -> float: if len(self._history) < 10: return 50.0 values = [v for _, v in self._history] return float(np.percentile(values, 50)) # placeholder # Correct implementation: return float(np.searchsorted(np.sort(values), vix) / len(values) * 100) def _transition_severity( self, old: VIXRegime, new: VIXRegime ) -> str: regimes_ordered = list(VIXRegime) old_idx = regimes_ordered.index(old) new_idx = regimes_ordered.index(new) jump = abs(new_idx - old_idx) if jump >= 2: return "critical" if new in (VIXRegime.HIGH_VOL, VIXRegime.EXTREME): return "high" return "medium" def _format_regime_message(self, state: VIXRegimeState) -> str: msg = ( f"VIX regime change: {state.previous_regime.value} -> " f"{state.current_regime.value} (VIX={state.vix_value:.2f}, " f"{state.vix_percentile:.0f}th percentile)" ) if state.term_structure_slope is not None: structure = ( "backwardation" if state.term_structure_slope > 0 else "contango" ) msg += f" | Term structure: {structure} ({state.term_structure_slope:+.2f})" if state.vvix is not None: msg += f" | VVIX={state.vvix:.1f}" return msg
4. Alert Engine Layer
4.1 Condition Evaluator with Deduplication and Cooldowns
# engine/alert_engine.py import asyncio import hashlib import logging from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Optional from enum import Enum logger = logging.getLogger(__name__) class AlertSeverity(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class AlertState(Enum): PENDING = "pending" SENT = "sent" SUPPRESSED = "suppressed" FAILED = "failed" @dataclass class Alert: id: str type: str severity: AlertSeverity symbol: str message: str data: dict timestamp: datetime = field(default_factory=datetime.now) state: AlertState = AlertState.PENDING channels_sent: list[str] = field(default_factory=list) fingerprint: str = "" def __post_init__(self): if not self.fingerprint: # Fingerprint for deduplication: same type + symbol + rounded data raw = f"{self.type}:{self.symbol}:{self._round_data()}" self.fingerprint = hashlib.sha256(raw.encode()).hexdigest()[:16] def _round_data(self) -> str: """Round numerical data to prevent near-duplicate alerts.""" rounded = {} for k, v in self.data.items(): if isinstance(v, float): rounded[k] = round(v, 2) else: rounded[k] = v return str(sorted(rounded.items())) @dataclass class CooldownConfig: """Per-alert-type cooldown configuration.""" default: timedelta = field(default_factory=lambda: timedelta(minutes=5)) overrides: dict[str, timedelta] = field(default_factory=lambda: { "gex_level_approach": timedelta(minutes=15), "gex_level_breach": timedelta(minutes=5), "gamma_flip_breach": timedelta(minutes=2), "gamma_regime_change": timedelta(minutes=1), "volume_spike": timedelta(minutes=10), "oi_divergence": timedelta(minutes=30), "sweep": timedelta(minutes=5), "vix_regime_change": timedelta(minutes=1), "vix_spike": timedelta(minutes=2), }) @dataclass class RoutingRule: """Maps alert severity to notification channels.""" severity: AlertSeverity channels: list[str] # If True, send to all channels. If False, send to first available. broadcast: bool = False class AlertEngine: """ Central alert processing engine. Responsibilities: 1. Receive raw alert dicts from analysis modules 2. Deduplicate based on fingerprints 3. Apply cooldown periods per alert type 4. Route to appropriate notification channels by severity 5. Track alert state and delivery confirmation """ DEFAULT_ROUTING = [ RoutingRule(AlertSeverity.CRITICAL, ["telegram", "pushover"], broadcast=True), RoutingRule(AlertSeverity.HIGH, ["telegram"], broadcast=False), RoutingRule(AlertSeverity.MEDIUM, ["ntfy"], broadcast=False), RoutingRule(AlertSeverity.LOW, ["ntfy"], broadcast=False), ] def __init__( self, cooldown_config: Optional[CooldownConfig] = None, routing_rules: Optional[list[RoutingRule]] = None, max_alerts_per_minute: int = 20, ): self.cooldown = cooldown_config or CooldownConfig() self.routing = routing_rules or self.DEFAULT_ROUTING self.max_alerts_per_minute = max_alerts_per_minute self._sent_fingerprints: dict[str, datetime] = {} self._alert_history: list[Alert] = [] self._recent_alert_times: list[datetime] = [] self._dispatchers: dict[str, "NotificationDispatcher"] = {} # Output queue for notification dispatch self.outbound_queue: asyncio.Queue[Alert] = asyncio.Queue(maxsize=1000) def register_dispatcher(self, name: str, dispatcher: "NotificationDispatcher"): self._dispatchers[name] = dispatcher async def process(self, raw_alert: dict) -> Optional[Alert]: """ Process a raw alert dict from an analysis module. Returns the Alert if it was dispatched, None if suppressed. """ alert = Alert( id=f"{raw_alert['type']}_{datetime.now().timestamp():.0f}", type=raw_alert["type"], severity=AlertSeverity(raw_alert.get("severity", "medium")), symbol=raw_alert.get("symbol", ""), message=raw_alert["message"], data=raw_alert.get("data", {}), ) # Rate limit check if self._is_rate_limited(): logger.warning( f"Rate limited. Suppressing alert: {alert.type}" ) alert.state = AlertState.SUPPRESSED return None # Deduplication check if self._is_duplicate(alert): logger.debug( f"Duplicate suppressed: {alert.fingerprint}" ) alert.state = AlertState.SUPPRESSED return None # Cooldown check if self._is_in_cooldown(alert): logger.debug( f"Cooldown active for {alert.type}, suppressing" ) alert.state = AlertState.SUPPRESSED return None # Route and dispatch channels = self._get_channels(alert.severity) for channel_name in channels: if channel_name in self._dispatchers: try: await self._dispatchers[channel_name].send(alert) alert.channels_sent.append(channel_name) except Exception as e: logger.error( f"Failed to send via {channel_name}: {e}" ) if alert.channels_sent: alert.state = AlertState.SENT else: alert.state = AlertState.FAILED logger.error(f"Alert delivery failed on all channels: {alert.id}") # Record for dedup and cooldown self._sent_fingerprints[alert.fingerprint] = alert.timestamp self._alert_history.append(alert) self._recent_alert_times.append(alert.timestamp) self._cleanup_old_records() return alert def _is_duplicate(self, alert: Alert) -> bool: if alert.fingerprint in self._sent_fingerprints: last_sent = self._sent_fingerprints[alert.fingerprint] cooldown = self._get_cooldown(alert.type) if datetime.now() - last_sent < cooldown: return True return False def _is_in_cooldown(self, alert: Alert) -> bool: cooldown = self._get_cooldown(alert.type) for past_alert in reversed(self._alert_history): if past_alert.type == alert.type and past_alert.symbol == alert.symbol: if datetime.now() - past_alert.timestamp < cooldown: return True break # Only check the most recent matching alert return False def _is_rate_limited(self) -> bool: now = datetime.now() cutoff = now - timedelta(minutes=1) recent = [t for t in self._recent_alert_times if t > cutoff] return len(recent) >= self.max_alerts_per_minute def _get_cooldown(self, alert_type: str) -> timedelta: return self.cooldown.overrides.get(alert_type, self.cooldown.default) def _get_channels(self, severity: AlertSeverity) -> list[str]: for rule in self.routing: if rule.severity == severity: return rule.channels return ["ntfy"] # Default fallback def _cleanup_old_records(self): cutoff = datetime.now() - timedelta(hours=1) self._recent_alert_times = [ t for t in self._recent_alert_times if t > cutoff ] # Prune old fingerprints expired = [ fp for fp, ts in self._sent_fingerprints.items() if datetime.now() - ts > timedelta(hours=1) ] for fp in expired: del self._sent_fingerprints[fp]
5. Notification Dispatch Layer
5.1 Dispatcher Interface and Implementations
# notifications/dispatchers.py import asyncio import aiohttp import logging from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Optional logger = logging.getLogger(__name__) class NotificationDispatcher(ABC): """Base class for all notification dispatchers.""" @abstractmethod async def send(self, alert: "Alert") -> bool: """Send alert. Returns True on success.""" ... @abstractmethod async def health_check(self) -> bool: """Check if the notification service is reachable.""" ... class TelegramDispatcher(NotificationDispatcher): """ Telegram Bot API dispatcher. Setup: 1. Create bot via @BotFather -> get bot token 2. Get chat_id: send message to bot, then GET https://api.telegram.org/bot<token>/getUpdates 3. Use chat_id from the response Rate limits: 30 messages/sec to different chats, 1 message/sec to same chat (burst up to 20). """ def __init__(self, bot_token: str, chat_id: str, parse_mode: str = "HTML"): self.bot_token = bot_token self.chat_id = chat_id self.parse_mode = parse_mode self.base_url = f"https://api.telegram.org/bot{bot_token}" self._session: Optional[aiohttp.ClientSession] = None async def _get_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=10) ) return self._session async def send(self, alert: "Alert") -> bool: session = await self._get_session() text = self._format_message(alert) payload = { "chat_id": self.chat_id, "text": text, "parse_mode": self.parse_mode, "disable_web_page_preview": True, } try: async with session.post( f"{self.base_url}/sendMessage", json=payload ) as resp: if resp.status == 200: logger.info(f"Telegram alert sent: {alert.id}") return True elif resp.status == 429: # Rate limited -- extract retry_after data = await resp.json() retry_after = data.get("parameters", {}).get( "retry_after", 5 ) logger.warning( f"Telegram rate limited, retry in {retry_after}s" ) await asyncio.sleep(retry_after) return await self.send(alert) # Retry once else: body = await resp.text() logger.error( f"Telegram error {resp.status}: {body}" ) return False except aiohttp.ClientError as e: logger.error(f"Telegram connection error: {e}") return False def _format_message(self, alert: "Alert") -> str: severity_icons = { "critical": "!!!", "high": "!!", "medium": "!", "low": "~", } icon = severity_icons.get(alert.severity.value, "") lines = [ f"<b>[{icon} {alert.severity.value.upper()}]</b>", f"<b>{alert.type.replace('_', ' ').title()}</b>", "", alert.message, "", f"<code>Time: {alert.timestamp.strftime('%H:%M:%S')}</code>", ] # Add key data points for key, value in alert.data.items(): if isinstance(value, float): lines.append(f"<code>{key}: {value:.4f}</code>") else: lines.append(f"<code>{key}: {value}</code>") return "\n".join(lines) async def health_check(self) -> bool: session = await self._get_session() try: async with session.get(f"{self.base_url}/getMe") as resp: return resp.status == 200 except aiohttp.ClientError: return False async def close(self): if self._session: await self._session.close() class PushoverDispatcher(NotificationDispatcher): """ Pushover notification dispatcher. Good for: Mobile push with priority levels, quiet hours, and delivery receipts for emergency priority. Rate limit: 7,500 messages/month per application. Emergency priority: re-notifies every 30s until acknowledged. """ API_URL = "https://api.pushover.net/1/messages.json" # Pushover priority levels PRIORITY_MAP = { "low": -1, # No sound/vibration "medium": 0, # Normal "high": 1, # High priority, bypass quiet hours "critical": 2, # Emergency: repeat until acknowledged } def __init__(self, user_key: str, api_token: str, device: Optional[str] = None): self.user_key = user_key self.api_token = api_token self.device = device async def send(self, alert: "Alert") -> bool: priority = self.PRIORITY_MAP.get(alert.severity.value, 0) payload = { "token": self.api_token, "user": self.user_key, "message": alert.message, "title": f"[{alert.severity.value.upper()}] {alert.type}", "priority": priority, "timestamp": int(alert.timestamp.timestamp()), "sound": "siren" if priority >= 1 else "pushover", } if self.device: payload["device"] = self.device # Emergency priority requires retry/expire params if priority == 2: payload["retry"] = 60 # Re-notify every 60 seconds payload["expire"] = 600 # Stop after 10 minutes async with aiohttp.ClientSession() as session: try: async with session.post(self.API_URL, data=payload) as resp: if resp.status == 200: logger.info(f"Pushover alert sent: {alert.id}") return True else: body = await resp.text() logger.error( f"Pushover error {resp.status}: {body}" ) return False except aiohttp.ClientError as e: logger.error(f"Pushover connection error: {e}") return False async def health_check(self) -> bool: url = "https://api.pushover.net/1/users/validate.json" async with aiohttp.ClientSession() as session: try: async with session.post(url, data={ "token": self.api_token, "user": self.user_key, }) as resp: return resp.status == 200 except aiohttp.ClientError: return False class NtfyDispatcher(NotificationDispatcher): """ ntfy.sh dispatcher -- self-hostable, no account required. Advantages: - Free and open source - Self-hostable (docker: binwiederhier/ntfy) - No API key needed for public topics - Supports Android/iOS/Web - UnifiedPush compatible Best for: Low/medium severity alerts, development/testing. """ def __init__(self, topic: str, server: str = "https://ntfy.sh", token: Optional[str] = None): self.topic = topic self.server = server.rstrip("/") self.token = token # For self-hosted with auth async def send(self, alert: "Alert") -> bool: url = f"{self.server}/{self.topic}" priority_map = { "low": "2", "medium": "3", "high": "4", "critical": "5", } headers = { "Title": f"[{alert.severity.value.upper()}] {alert.type}", "Priority": priority_map.get(alert.severity.value, "3"), "Tags": f"{alert.type},{alert.symbol}" if alert.symbol else alert.type, } if self.token: headers["Authorization"] = f"Bearer {self.token}" async with aiohttp.ClientSession() as session: try: async with session.post( url, data=alert.message, headers=headers ) as resp: if resp.status == 200: logger.info(f"ntfy alert sent: {alert.id}") return True else: body = await resp.text() logger.error(f"ntfy error {resp.status}: {body}") return False except aiohttp.ClientError as e: logger.error(f"ntfy connection error: {e}") return False async def health_check(self) -> bool: async with aiohttp.ClientSession() as session: try: async with session.get( f"{self.server}/v1/health" ) as resp: return resp.status == 200 except aiohttp.ClientError: return False class SNSDispatcher(NotificationDispatcher): """ AWS SNS dispatcher for SMS, email, or fanout to SQS/Lambda. Best for: - SMS delivery (most reliable for critical alerts) - Fanout to multiple downstream systems - Enterprise environments already on AWS Cost: ~$0.50 per SMS (US), $0.00 for email/HTTP. Requires: boto3, configured AWS credentials. """ def __init__(self, topic_arn: str, region: str = "us-east-1"): self.topic_arn = topic_arn self.region = region self._client = None def _get_client(self): if self._client is None: import boto3 self._client = boto3.client("sns", region_name=self.region) return self._client async def send(self, alert: "Alert") -> bool: """ SNS publish is not natively async. Run in executor to avoid blocking the event loop. """ loop = asyncio.get_event_loop() try: response = await loop.run_in_executor( None, self._publish, alert ) logger.info( f"SNS alert sent: {alert.id}, " f"MessageId={response['MessageId']}" ) return True except Exception as e: logger.error(f"SNS error: {e}") return False def _publish(self, alert: "Alert") -> dict: client = self._get_client() return client.publish( TopicArn=self.topic_arn, Subject=f"[{alert.severity.value.upper()}] {alert.type}", Message=alert.message, MessageAttributes={ "severity": { "DataType": "String", "StringValue": alert.severity.value, }, "alert_type": { "DataType": "String", "StringValue": alert.type, }, }, ) async def health_check(self) -> bool: loop = asyncio.get_event_loop() try: await loop.run_in_executor( None, lambda: self._get_client().get_topic_attributes( TopicArn=self.topic_arn ), ) return True except Exception: return False
5.2 Notification Service Comparison
| Service | Latency | Reliability | Cost | Mobile Push | Self-Host | Best For |
|---|---|---|---|---|---|---|
| Telegram Bot | 100-500ms | High | Free | Yes | No | Primary channel, rich formatting |
| Pushover | 200-800ms | Very High | $5 one-time | Yes | No | Critical alerts, ack required |
| ntfy.sh | 50-200ms | High (self-host) | Free | Yes | Yes | Dev/testing, privacy |
| AWS SNS | 100-300ms | Very High | ~$0.50/SMS | Via SMS | No | Enterprise, SMS fallback |
| Discord | 200-500ms | Medium | Free | Yes | No | Team dashboards |
| Slack | 300-800ms | High | Free tier | Yes | No | Team workflows |
Recommended production setup: Telegram as primary, Pushover as critical-only backup with acknowledgment tracking, ntfy.sh for self-hosted low-priority feed.
6. Putting It All Together: The Main Orchestrator
# main.py import asyncio import logging import signal import os from datetime import datetime from ingestion.adapters import PolygonAdapter, TradierAdapter from analysis.gex_monitor import GEXCalculator, GEXBreachDetector from analysis.volume_detector import UnusualVolumeDetector from analysis.vix_regime import VIXRegimeClassifier from engine.alert_engine import AlertEngine, CooldownConfig from notifications.dispatchers import ( TelegramDispatcher, PushoverDispatcher, NtfyDispatcher, ) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger(__name__) class AlertSystem: """ Main orchestrator that wires together all layers. Runs three concurrent analysis loops: 1. GEX monitoring (WebSocket-driven, fastest cadence) 2. Unusual volume scanning (WebSocket or polling, medium cadence) 3. VIX regime monitoring (polling, slowest cadence) """ def __init__(self): # Configuration from environment self.symbols = os.getenv("WATCH_SYMBOLS", "SPY,QQQ,IWM").split(",") self.gex_interval = int(os.getenv("GEX_INTERVAL_SEC", "30")) self.vix_interval = int(os.getenv("VIX_INTERVAL_SEC", "15")) # Analysis modules self.gex_calculator = GEXCalculator() self.gex_detector = GEXBreachDetector() self.volume_detector = UnusualVolumeDetector() self.vix_classifier = VIXRegimeClassifier() # Alert engine self.alert_engine = AlertEngine( cooldown_config=CooldownConfig(), max_alerts_per_minute=20, ) # Data adapters polygon_key = os.getenv("POLYGON_API_KEY", "") tradier_token = os.getenv("TRADIER_TOKEN", "") self.polygon = PolygonAdapter(polygon_key) if polygon_key else None self.tradier = TradierAdapter(tradier_token) if tradier_token else None # Notification dispatchers self._setup_dispatchers() # Graceful shutdown self._shutdown_event = asyncio.Event() def _setup_dispatchers(self): tg_token = os.getenv("TELEGRAM_BOT_TOKEN") tg_chat = os.getenv("TELEGRAM_CHAT_ID") if tg_token and tg_chat: self.alert_engine.register_dispatcher( "telegram", TelegramDispatcher(tg_token, tg_chat) ) po_user = os.getenv("PUSHOVER_USER_KEY") po_token = os.getenv("PUSHOVER_API_TOKEN") if po_user and po_token: self.alert_engine.register_dispatcher( "pushover", PushoverDispatcher(po_user, po_token) ) ntfy_topic = os.getenv("NTFY_TOPIC") if ntfy_topic: self.alert_engine.register_dispatcher( "ntfy", NtfyDispatcher( ntfy_topic, server=os.getenv("NTFY_SERVER", "https://ntfy.sh"), ), ) async def run(self): logger.info( f"Starting alert system for symbols: {self.symbols}" ) tasks = [ asyncio.create_task(self._gex_loop(), name="gex_loop"), asyncio.create_task(self._vix_loop(), name="vix_loop"), asyncio.create_task(self._health_loop(), name="health_loop"), ] # Add volume monitoring for each symbol for symbol in self.symbols: tasks.append( asyncio.create_task( self._volume_loop(symbol), name=f"volume_{symbol}", ) ) # Wait for shutdown signal or task failure try: done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_EXCEPTION ) for task in done: if task.exception(): logger.error( f"Task {task.get_name()} failed: " f"{task.exception()}" ) finally: for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) logger.info("Alert system shut down") async def _gex_loop(self): """ Periodically fetch options chain, calculate GEX, check for breaches. """ if not self.tradier: logger.warning("No Tradier adapter configured, GEX loop disabled") return while not self._shutdown_event.is_set(): for symbol in self.symbols: try: # Get nearest expiries (0DTE + next weekly) expirations = await self._get_near_expirations(symbol) all_options = [] for exp in expirations[:3]: # Top 3 nearest chain = await self.tradier.get_options_chain(symbol, exp) all_options.extend(chain) if not all_options: continue # Get current spot price (from the options data midpoint) spot = await self._get_spot_price(symbol) # Calculate GEX gex_state = self.gex_calculator.calculate( all_options, spot, symbol ) # Check for breaches alerts = self.gex_detector.check(gex_state) for alert_data in alerts: await self.alert_engine.process(alert_data) except Exception as e: logger.error(f"GEX loop error for {symbol}: {e}") await asyncio.sleep(self.gex_interval) async def _volume_loop(self, symbol: str): """Monitor unusual options volume for a single symbol.""" if self.polygon: # Streaming mode async for snapshot in self.polygon.stream_options(symbol): anomalies = self.volume_detector.check(snapshot) for anomaly in anomalies: await self.alert_engine.process({ "type": anomaly.anomaly_type, "severity": self._volume_severity(anomaly), "symbol": symbol, "message": self._format_volume_alert(anomaly), "data": { "strike": anomaly.strike, "expiry": anomaly.expiry, "volume": anomaly.current_volume, "z_score": anomaly.z_score, "vol_oi_ratio": anomaly.volume_oi_ratio, }, }) else: # Polling fallback while not self._shutdown_event.is_set(): # Poll implementation here await asyncio.sleep(60) async def _vix_loop(self): """Monitor VIX for regime changes.""" adapter = self.polygon or self.tradier if not adapter: logger.warning("No data adapter for VIX, loop disabled") return while not self._shutdown_event.is_set(): try: vix_quote = await adapter.get_vix() alerts = self.vix_classifier.update( vix=vix_quote.value, timestamp=vix_quote.timestamp, ) for alert_data in alerts: await self.alert_engine.process(alert_data) except Exception as e: logger.error(f"VIX loop error: {e}") await asyncio.sleep(self.vix_interval) async def _health_loop(self): """Periodic health checks for all dispatchers.""" while not self._shutdown_event.is_set(): for name, dispatcher in self.alert_engine._dispatchers.items(): try: healthy = await dispatcher.health_check() if not healthy: logger.warning( f"Dispatcher {name} health check failed" ) except Exception as e: logger.error( f"Dispatcher {name} health check error: {e}" ) await asyncio.sleep(300) # Every 5 minutes async def _get_near_expirations(self, symbol: str) -> list[str]: """Get upcoming option expiration dates.""" if self.tradier: async with __import__("aiohttp").ClientSession( headers=self.tradier.headers ) as session: url = f"{self.tradier.BASE_URL}/markets/options/expirations" async with session.get( url, params={"symbol": symbol} ) as resp: data = await resp.json() dates = data.get("expirations", {}).get("date", []) return sorted(dates)[:5] return [] async def _get_spot_price(self, symbol: str) -> float: """Get current spot price for a symbol.""" if self.tradier: async with __import__("aiohttp").ClientSession( headers=self.tradier.headers ) as session: url = f"{self.tradier.BASE_URL}/markets/quotes" async with session.get( url, params={"symbols": symbol} ) as resp: data = await resp.json() return data["quotes"]["quote"]["last"] return 0.0 @staticmethod def _volume_severity(anomaly) -> str: if anomaly.anomaly_type == "sweep" and anomaly.current_volume > 2000: return "high" if anomaly.z_score > 4.0: return "high" if anomaly.z_score > 3.0 or anomaly.anomaly_type == "sweep": return "medium" return "low" @staticmethod def _format_volume_alert(anomaly) -> str: if anomaly.anomaly_type == "sweep": return ( f"Options sweep detected: {anomaly.symbol} " f"${anomaly.strike} {anomaly.option_type} " f"exp {anomaly.expiry} -- " f"{anomaly.current_volume:,} contracts in 60s" ) return ( f"Unusual volume: {anomaly.symbol} " f"${anomaly.strike} {anomaly.option_type} " f"exp {anomaly.expiry} -- " f"Vol={anomaly.current_volume:,} " f"(z={anomaly.z_score:.1f}, V/OI={anomaly.volume_oi_ratio:.1f}x)" ) def main(): system = AlertSystem() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Handle shutdown signals for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, system._shutdown_event.set) try: loop.run_until_complete(system.run()) finally: loop.close() if __name__ == "__main__": main()
7. Latency Considerations
7.1 End-to-End Latency Budget
Market event occurs 0 ms | v WebSocket receives data 10-100 ms (network + provider processing) | v Deserialize + normalize 1-5 ms | v Analysis (GEX calc / vol check) 5-50 ms (depends on chain size) | v Alert engine processing <1 ms (dedup, cooldown, routing) | v HTTP POST to notification API 100-500 ms (Telegram/Pushover/ntfy) | v Push to mobile device 200-2000 ms (APNS/FCM delivery) | =================================== TOTAL: 300ms - 2.5s typical
7.2 Optimization Techniques
Data ingestion:
- Use orjson instead of json for 3-5x faster parsing
- Pre-allocate message buffers
- Use uvloop as the event loop (pip install uvloop, then asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()))
Analysis:
- Cache intermediate GEX calculations; only recompute when OI changes
- Use numpy vectorized operations (not Python loops) for GEX across strikes
- Pre-filter options chain: ignore strikes far OTM (>15% from spot)
Notification dispatch:
- Keep HTTP sessions alive (connection pooling via aiohttp.ClientSession)
- Fire-and-forget for low-priority alerts (do not await delivery confirmation)
- For critical alerts, await confirmation but with timeout
# Example: fire-and-forget for low priority, await for high async def dispatch_alert(alert, dispatcher): if alert.severity in (AlertSeverity.CRITICAL, AlertSeverity.HIGH): # Await confirmation with timeout try: success = await asyncio.wait_for( dispatcher.send(alert), timeout=5.0 ) if not success: logger.error(f"Critical alert delivery failed: {alert.id}") except asyncio.TimeoutError: logger.error(f"Critical alert timed out: {alert.id}") else: # Fire and forget asyncio.create_task(dispatcher.send(alert))
8. Production Reliability
8.1 Supervision and Process Management
Use systemd for process management on Linux.
# /etc/systemd/system/options-alerter.service [Unit] Description=Options Market Alert System After=network-online.target Wants=network-online.target [Service] Type=simple User=alerter Group=alerter WorkingDirectory=/opt/options-alerter ExecStart=/opt/options-alerter/.venv/bin/python main.py Restart=always RestartSec=10 StartLimitBurst=5 StartLimitIntervalSec=300 # Environment file with secrets EnvironmentFile=/opt/options-alerter/.env # Resource limits MemoryMax=512M CPUQuota=50% # Logging StandardOutput=journal StandardError=journal SyslogIdentifier=options-alerter [Install] WantedBy=multi-user.target
8.2 Monitoring the Monitor
The alerting system itself needs monitoring. A dead alerter is worse than no alerter because it creates a false sense of coverage.
# monitoring/watchdog.py import asyncio import aiohttp from datetime import datetime, timedelta class SystemWatchdog: """ Self-monitoring for the alert system. Sends a heartbeat to an external service every N minutes. If heartbeat stops, the external service alerts you through an independent channel. Options: - Healthchecks.io (free tier: 20 checks) - UptimeRobot (free tier: 50 monitors) - Cronitor - Simple cron job that checks process + sends curl """ def __init__( self, healthcheck_url: str, # e.g., https://hc-ping.com/your-uuid interval_seconds: int = 300, stale_data_threshold: timedelta = timedelta(minutes=5), ): self.healthcheck_url = healthcheck_url self.interval = interval_seconds self.stale_threshold = stale_data_threshold self._last_data_received: dict[str, datetime] = {} def record_data_received(self, source: str): self._last_data_received[source] = datetime.now() async def run(self): """Heartbeat loop -- runs alongside main system.""" while True: all_healthy = True # Check data freshness now = datetime.now() for source, last_time in self._last_data_received.items(): if now - last_time > self.stale_threshold: all_healthy = False # Log but also try to send alert through # backup channel (not the system being monitored) if all_healthy: await self._ping_healthy() else: await self._ping_failure() await asyncio.sleep(self.interval) async def _ping_healthy(self): async with aiohttp.ClientSession() as session: try: await session.get(self.healthcheck_url, timeout=10) except Exception: pass # Best effort async def _ping_failure(self): async with aiohttp.ClientSession() as session: try: await session.get(f"{self.healthcheck_url}/fail", timeout=10) except Exception: pass
8.3 State Persistence and Recovery
# persistence/state_store.py import json import sqlite3 from datetime import datetime from pathlib import Path from typing import Optional class AlertStateStore: """ SQLite-backed state persistence for crash recovery. On restart, the system: 1. Loads last-known GEX state (avoids false breach on first tick) 2. Loads cooldown timers (avoids duplicate alerts after restart) 3. Loads VIX regime state (avoids false regime change) """ def __init__(self, db_path: str = "alert_state.db"): self.db_path = db_path self._conn = sqlite3.connect(db_path) self._create_tables() def _create_tables(self): self._conn.executescript(""" CREATE TABLE IF NOT EXISTS alert_log ( id TEXT PRIMARY KEY, type TEXT NOT NULL, severity TEXT NOT NULL, symbol TEXT, message TEXT, data_json TEXT, timestamp TEXT NOT NULL, state TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS system_state ( key TEXT PRIMARY KEY, value_json TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_alert_type_ts ON alert_log(type, timestamp); CREATE INDEX IF NOT EXISTS idx_alert_symbol ON alert_log(symbol, timestamp); """) self._conn.commit() def save_alert(self, alert: "Alert"): self._conn.execute( """INSERT OR REPLACE INTO alert_log (id, type, severity, symbol, message, data_json, timestamp, state) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( alert.id, alert.type, alert.severity.value, alert.symbol, alert.message, json.dumps(alert.data), alert.timestamp.isoformat(), alert.state.value, ), ) self._conn.commit() def save_state(self, key: str, value: dict): self._conn.execute( """INSERT OR REPLACE INTO system_state (key, value_json, updated_at) VALUES (?, ?, ?)""", (key, json.dumps(value), datetime.now().isoformat()), ) self._conn.commit() def load_state(self, key: str) -> Optional[dict]: row = self._conn.execute( "SELECT value_json FROM system_state WHERE key = ?", (key,) ).fetchone() if row: return json.loads(row[0]) return None def get_recent_alerts( self, alert_type: str, minutes: int = 60 ) -> list[dict]: cutoff = datetime.now().isoformat() rows = self._conn.execute( """SELECT * FROM alert_log WHERE type = ? AND timestamp > datetime(?, '-' || ? || ' minutes') ORDER BY timestamp DESC""", (alert_type, cutoff, minutes), ).fetchall() return [ {"id": r[0], "type": r[1], "severity": r[2], "symbol": r[3], "message": r[4], "data": json.loads(r[5]), "timestamp": r[6]} for r in rows ]
8.4 Docker Deployment
# Dockerfile FROM python:3.12-slim WORKDIR /app # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . # Non-root user RUN useradd -m alerter USER alerter # Health check HEALTHCHECK --interval=60s --timeout=10s --retries=3 \ CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')" CMD ["python", "main.py"]
# docker-compose.yml version: "3.8" services: alerter: build: . restart: unless-stopped env_file: .env volumes: - alert-data:/app/data # For SQLite persistence deploy: resources: limits: memory: 512M cpus: "0.5" logging: driver: json-file options: max-size: "10m" max-file: "3" volumes: alert-data:
9. Configuration Reference
# config.py -- example .env file documentation """ Required environment variables: # Data Sources (at least one required) POLYGON_API_KEY=your_polygon_key # Polygon.io API key TRADIER_TOKEN=your_tradier_token # Tradier API access token # Symbols WATCH_SYMBOLS=SPY,QQQ,IWM # Comma-separated symbols # Notification Channels (at least one required) TELEGRAM_BOT_TOKEN=123456:ABC-DEF... # Telegram bot token from @BotFather TELEGRAM_CHAT_ID=-1001234567890 # Telegram chat/channel ID PUSHOVER_USER_KEY=uQiRzpo4DXg... # Pushover user key PUSHOVER_API_TOKEN=azGDORePK8g... # Pushover app API token NTFY_TOPIC=my-options-alerts # ntfy.sh topic name NTFY_SERVER=https://ntfy.sh # ntfy server (default: ntfy.sh) # Optional: AWS SNS AWS_SNS_TOPIC_ARN=arn:aws:sns:... # SNS topic ARN AWS_DEFAULT_REGION=us-east-1 # Monitoring HEALTHCHECK_URL=https://hc-ping.com/your-uuid # Tuning GEX_INTERVAL_SEC=30 # GEX recalculation interval VIX_INTERVAL_SEC=15 # VIX polling interval MAX_ALERTS_PER_MINUTE=20 # Rate limit """
10. Summary of Key Architectural Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Async framework | asyncio + aiohttp | Native Python, no heavy framework needed |
| WebSocket library | websockets | Mature, well-maintained, async-native |
| Event loop | uvloop (production) | 2-4x faster than default CPython loop |
| State storage | SQLite | Zero-config, single-file, sufficient for this scale |
| Primary notification | Telegram Bot API | Free, rich formatting, fast delivery |
| Critical backup | Pushover | Delivery receipts, acknowledgment tracking |
| Self-monitoring | Healthchecks.io | Independent failure detection |
| Deployment | Docker + systemd | Simple, reliable, well-understood |
| Deduplication | Content fingerprint + cooldown | Prevents notification storms during volatility |
The system is designed so each layer can be tested independently. The analysis modules are pure functions that take data in and produce alert dicts. The alert engine applies policy (dedup, cooldown, routing) without knowing about markets. The dispatchers only care about delivering messages. This separation makes it straightforward to add new alert types (e.g., dark pool prints, skew alerts) or new notification channels without touching existing code.
Get workflow automation insights that cut through the noise
One email per week. Practical frameworks, not product pitches.
Ready to Run Autonomous Enterprise Operations?
See how QorSync AI deploys governed agents across your enterprise systems.
Request DemoNot ready for a demo? Start here instead: