这是indexloc提供的服务,不要输入任何密码
Skip to content

feat: 实现代理健康检查功能 #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions app/router/config_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from app.core.security import verify_auth_token
from app.log.logger import Logger, get_config_routes_logger
from app.service.config.config_service import ConfigService
from app.service.proxy.proxy_check_service import get_proxy_check_service, ProxyCheckResult
from app.utils.helpers import redact_key_for_logging

router = APIRouter(prefix="/api/config", tags=["config"])
Expand Down Expand Up @@ -132,3 +133,93 @@ async def get_ui_models(request: Request):
status_code=500,
detail=f"An unexpected error occurred while fetching UI models: {str(e)}",
)


class ProxyCheckRequest(BaseModel):
"""Proxy check request"""
proxy: str = Field(..., description="Proxy address to check")
use_cache: bool = Field(True, description="Whether to use cached results")


class ProxyBatchCheckRequest(BaseModel):
"""Batch proxy check request"""
proxies: List[str] = Field(..., description="List of proxy addresses to check")
use_cache: bool = Field(True, description="Whether to use cached results")
max_concurrent: int = Field(5, description="Maximum concurrent check count", ge=1, le=10)


@router.post("/proxy/check", response_model=ProxyCheckResult)
async def check_single_proxy(proxy_request: ProxyCheckRequest, request: Request):
"""Check if a single proxy is available"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to proxy check")
return RedirectResponse(url="/", status_code=302)

try:
logger.info(f"Checking single proxy: {proxy_request.proxy}")
proxy_service = get_proxy_check_service()
result = await proxy_service.check_single_proxy(
proxy_request.proxy,
proxy_request.use_cache
)
return result
except Exception as e:
logger.error(f"Proxy check failed: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Proxy check failed: {str(e)}")


@router.post("/proxy/check-all", response_model=List[ProxyCheckResult])
async def check_all_proxies(batch_request: ProxyBatchCheckRequest, request: Request):
"""Check multiple proxies availability"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to batch proxy check")
return RedirectResponse(url="/", status_code=302)

try:
logger.info(f"Batch checking {len(batch_request.proxies)} proxies")
proxy_service = get_proxy_check_service()
results = await proxy_service.check_multiple_proxies(
batch_request.proxies,
batch_request.use_cache,
batch_request.max_concurrent
)
return results
except Exception as e:
logger.error(f"Batch proxy check failed: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Batch proxy check failed: {str(e)}")


@router.get("/proxy/cache-stats")
async def get_proxy_cache_stats(request: Request):
"""Get proxy check cache statistics"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to proxy cache stats")
return RedirectResponse(url="/", status_code=302)

try:
proxy_service = get_proxy_check_service()
stats = proxy_service.get_cache_stats()
return stats
except Exception as e:
logger.error(f"Get proxy cache stats failed: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Get cache stats failed: {str(e)}")


@router.post("/proxy/clear-cache")
async def clear_proxy_cache(request: Request):
"""Clear proxy check cache"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to clear proxy cache")
return RedirectResponse(url="/", status_code=302)

try:
proxy_service = get_proxy_check_service()
proxy_service.clear_cache()
return {"success": True, "message": "Proxy check cache cleared"}
except Exception as e:
logger.error(f"Clear proxy cache failed: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Clear cache failed: {str(e)}")
7 changes: 7 additions & 0 deletions app/service/proxy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Proxy service module
"""

from .proxy_check_service import ProxyCheckService

__all__ = ["ProxyCheckService"]
219 changes: 219 additions & 0 deletions app/service/proxy/proxy_check_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
"""
Proxy detection service module
"""
import asyncio
import time
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse

import httpx
from pydantic import BaseModel

from app.log.logger import get_config_routes_logger

logger = get_config_routes_logger()


class ProxyCheckResult(BaseModel):
"""Proxy check result model"""
proxy: str
is_available: bool
response_time: Optional[float] = None
error_message: Optional[str] = None
checked_at: float


class ProxyCheckService:
"""Proxy detection service class"""

# Target URL for checking
CHECK_URL = "https://www.google.com"
# Timeout in seconds
TIMEOUT_SECONDS = 10
# Cache duration in seconds
CACHE_DURATION = 10 # 10s

def __init__(self):
self._cache: Dict[str, ProxyCheckResult] = {}

def _is_valid_proxy_format(self, proxy: str) -> bool:
"""Validate proxy format"""
try:
parsed = urlparse(proxy)
return parsed.scheme in ['http', 'https', 'socks5'] and parsed.hostname
except Exception:
return False

def _get_cached_result(self, proxy: str) -> Optional[ProxyCheckResult]:
"""Get cached check result"""
if proxy in self._cache:
result = self._cache[proxy]
# Check if cache is expired
if time.time() - result.checked_at < self.CACHE_DURATION:
logger.debug(f"Using cached proxy check result: {proxy}")
return result
else:
# Remove expired cache
del self._cache[proxy]
return None

def _cache_result(self, result: ProxyCheckResult) -> None:
"""Cache check result"""
self._cache[result.proxy] = result

async def check_single_proxy(self, proxy: str, use_cache: bool = True) -> ProxyCheckResult:
"""
Check if a single proxy is available

Args:
proxy: Proxy address in format like http://host:port or socks5://host:port
use_cache: Whether to use cached results

Returns:
ProxyCheckResult: Check result
"""
# Check cache first
if use_cache:
cached = self._get_cached_result(proxy)
if cached:
return cached

# Validate proxy format
if not self._is_valid_proxy_format(proxy):
result = ProxyCheckResult(
proxy=proxy,
is_available=False,
error_message="Invalid proxy format",
checked_at=time.time()
)
self._cache_result(result)
return result

# Perform check
start_time = time.time()
try:
logger.info(f"Starting proxy check: {proxy}")

timeout = httpx.Timeout(self.TIMEOUT_SECONDS, read=self.TIMEOUT_SECONDS)
async with httpx.AsyncClient(timeout=timeout, proxy=proxy) as client:
response = await client.head(self.CHECK_URL)

response_time = time.time() - start_time

# Check response status
is_available = response.status_code in [200, 204, 301, 302, 307, 308]

result = ProxyCheckResult(
proxy=proxy,
is_available=is_available,
response_time=round(response_time, 3),
error_message=None if is_available else f"HTTP {response.status_code}",
checked_at=time.time()
)

logger.info(f"Proxy check completed: {proxy}, available: {is_available}, response_time: {response_time:.3f}s")

except asyncio.TimeoutError:
result = ProxyCheckResult(
proxy=proxy,
is_available=False,
error_message="Connection timeout",
checked_at=time.time()
)
logger.warning(f"Proxy check timeout: {proxy}")

except Exception as e:
result = ProxyCheckResult(
proxy=proxy,
is_available=False,
error_message=str(e),
checked_at=time.time()
)
logger.error(f"Proxy check failed: {proxy}, error: {str(e)}")

# Cache result
self._cache_result(result)
return result

async def check_multiple_proxies(
self,
proxies: List[str],
use_cache: bool = True,
max_concurrent: int = 5
) -> List[ProxyCheckResult]:
"""
Check multiple proxies concurrently

Args:
proxies: List of proxy addresses
use_cache: Whether to use cached results
max_concurrent: Maximum concurrent check count

Returns:
List[ProxyCheckResult]: List of check results
"""
if not proxies:
return []

logger.info(f"Starting batch proxy check for {len(proxies)} proxies")

# Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)

async def check_with_semaphore(proxy: str) -> ProxyCheckResult:
async with semaphore:
return await self.check_single_proxy(proxy, use_cache)

# Execute checks concurrently
tasks = [check_with_semaphore(proxy) for proxy in proxies]
results = await asyncio.gather(*tasks, return_exceptions=True)

# Handle exception results
final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Proxy check task exception: {proxies[i]}, error: {str(result)}")
final_results.append(ProxyCheckResult(
proxy=proxies[i],
is_available=False,
error_message=f"Check task exception: {str(result)}",
checked_at=time.time()
))
else:
final_results.append(result)

available_count = sum(1 for r in final_results if r.is_available)
logger.info(f"Batch proxy check completed: {available_count}/{len(proxies)} proxies available")

return final_results

def get_cache_stats(self) -> Dict[str, int]:
"""Get cache statistics"""
current_time = time.time()
valid_cache_count = sum(
1 for result in self._cache.values()
if current_time - result.checked_at < self.CACHE_DURATION
)

return {
"total_cached": len(self._cache),
"valid_cached": valid_cache_count,
"expired_cached": len(self._cache) - valid_cache_count
}

def clear_cache(self) -> None:
"""Clear all cache"""
self._cache.clear()
logger.info("Proxy check cache cleared")


# Global instance
_proxy_check_service: Optional[ProxyCheckService] = None


def get_proxy_check_service() -> ProxyCheckService:
"""Get proxy check service instance"""
global _proxy_check_service
if _proxy_check_service is None:
_proxy_check_service = ProxyCheckService()
return _proxy_check_service
Loading