Skip to content

高级设置

本页面介绍AdminCloud代理的高级配置选项,帮助您优化性能、提升稳定性和实现更复杂的功能。

会话管理

会话保持

通过会话ID保持相同的IP地址:

bash
# 添加会话ID
curl -x gtx.paopaous.net:38082 \
  --proxy-user "myuser:ip:res-c:US:sid-12345:your_password_here" \
  -L https://httpbin.org/ip

会话参数

参数说明示例
sid会话标识符sid-
12345自定义字符串任意字母数字组合

编程实现

Python会话管理

python
import requests
import uuid

class ProxySession:
    def __init__(self, username, password, country="US"):
        self.username = username
        self.password = password
        self.country = country
        self.session_id = str(uuid.uuid4())[:8]
    
    def get_proxies(self):
        proxy_user = f"{self.username}:ip:res-c:{self.country}:sid-{self.session_id}"
        return {
            "http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
            "https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
        }
    
    def test_session(self):
        proxies = self.get_proxies()
        response = requests.get("https://httpbin.org/ip", proxies=proxies)
        return response.json()['origin']

# 使用示例
session = ProxySession("myuser", "your_password_here", "US")

# 测试会话保持
for i in range(3):
    ip = session.test_session()
    print(f"请求 {i+1}: {ip}")

Node.js会话管理

javascript
const axios = require('axios');
const { v4: uuidv4 } = require('uuid');

class ProxySession {
    constructor(username, password, country = 'US') {
        this.username = username;
        this.password = password;
        this.country = country;
        this.sessionId = uuidv4().substring(0, 8);
    }
    
    getProxyConfig() {
        const proxyUser = `${this.username}:ip:res-c:${this.country}:sid-${this.sessionId}`;
        return {
            protocol: 'http',
            host: 'gtx.paopaous.net',
            port: 38082,
            auth: {
                username: proxyUser,
                password: this.password
            }
        };
    }
    
    async testSession() {
        const proxyConfig = this.getProxyConfig();
        const response = await axios.get('https://httpbin.org/ip', { proxy: proxyConfig });
        return response.data.origin;
    }
}

// 使用示例
const session = new ProxySession('myuser', 'your_password_here', 'US');

for (let i = 0; i < 3; i++) {
    session.testSession().then(ip => {
        console.log(`请求 ${i + 1}: ${ip}`);
    });
}

连接池优化

Python连接池

python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class OptimizedProxyClient:
    def __init__(self, username, password, country="US"):
        self.username = username
        self.password = password
        self.country = country
        self.session = self._create_session()
    
    def _create_session(self):
        session = requests.Session()
        
        # 配置重试策略
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        
        # 配置适配器
        adapter = HTTPAdapter(
            max_retries=retry_strategy,
            pool_connections=20,
            pool_maxsize=50
        )
        
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        return session
    
    def get_proxies(self):
        proxy_user = f"{self.username}:ip:res-c:{self.country}"
        return {
            "http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
            "https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
        }
    
    def request(self, method, url, **kwargs):
        kwargs.setdefault('proxies', self.get_proxies())
        kwargs.setdefault('timeout', (10, 30))
        return self.session.request(method, url, **kwargs)

# 使用示例
client = OptimizedProxyClient("myuser", "your_password_here", "US")

for i in range(10):
    response = client.request("GET", "https://httpbin.org/ip")
    print(f"请求 {i+1}: {response.json()['origin']}")

Node.js连接池

javascript
const axios = require('axios');
const { Pool } = require('agentkeepalive');

class OptimizedProxyClient {
    constructor(username, password, country = 'US') {
        this.username = username;
        this.password = password;
        this.country = country;
        this.axiosInstance = this._createAxiosInstance();
    }
    
    _createAxiosInstance() {
        const httpsAgent = new Pool({
            maxSockets: 50,
            maxFreeSockets: 10,
            timeout: 30000,
            freeSocketTimeout: 30000
        });
        
        const proxyConfig = {
            protocol: 'http',
            host: 'gtx.paopaous.net',
            port: 38082,
            auth: {
                username: `${this.username}:ip:res-c:${this.country}`,
                password: this.password
            }
        };
        
        return axios.create({
            proxy: proxyConfig,
            httpsAgent,
            timeout: 30000
        });
    }
    
    async request(method, url, options = {}) {
        try {
            const response = await this.axiosInstance.request({
                method,
                url,
                ...options
            });
            return response;
        } catch (error) {
            console.error('请求失败:', error.message);
            throw error;
        }
    }
}

// 使用示例
const client = new OptimizedProxyClient('myuser', 'your_password_here', 'US');

for (let i = 0; i < 10; i++) {
    client.request('GET', 'https://httpbin.org/ip')
        .then(response => console.log(`请求 ${i + 1}: ${response.data.origin}`))
        .catch(error => console.error(error));
}

轮换策略

自动轮换

默认情况下,每次请求使用不同的IP:

python
import requests

proxies = {
    "http": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082",
    "https": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082"
}

# 每次请求使用不同IP
for i in range(5):
    response = requests.get("https://httpbin.org/ip", proxies=proxies)
    print(f"请求 {i+1}: {response.json()['origin']}")

时间控制轮换

python
import time
import requests

def timed_rotation():
    proxies = {
        "http": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082",
        "https": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082"
    }
    
    start_time = time.time()
    rotation_interval = 60  # 60秒轮换一次
    
    while time.time() - start_time < 300:  # 运行5分钟
        response = requests.get("https://httpbin.org/ip", proxies=proxies)
        current_ip = response.json()['origin']
        print(f"当前IP: {current_ip}")
        
        time.sleep(10)  # 每10秒检查一次

timed_rotation()

条件轮换

python
import requests
import random

def conditional_rotation():
    proxies = {
        "http": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082",
        "https": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082"
    }
    
    current_ip = None
    request_count = 0
    
    while request_count < 20:
        response = requests.get("https://httpbin.org/ip", proxies=proxies)
        new_ip = response.json()['origin']
        
        if new_ip != current_ip:
            print(f"IP轮换: {current_ip}{new_ip}")
            current_ip = new_ip
        
        request_count += 1
        
        # 随机延迟,模拟真实用户行为
        delay = random.uniform(1, 5)
        time.sleep(delay)

conditional_rotation()

错误处理与重试

指数退避重试

python
import time
import requests
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    print(f"请求失败,{delay}秒后重试... (尝试 {attempt + 1}/{max_retries})")
                    time.sleep(delay)
            
            return None
        return wrapper
    return decorator

class RobustProxyClient:
    def __init__(self, username, password, country="US"):
        self.username = username
        self.password = password
        self.country = country
    
    def get_proxies(self):
        proxy_user = f"{self.username}:ip:res-c:{self.country}"
        return {
            "http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
            "https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
        }
    
    @retry_with_backoff(max_retries=3, base_delay=2)
    def get(self, url, **kwargs):
        kwargs.setdefault('proxies', self.get_proxies())
        kwargs.setdefault('timeout', 30)
        return requests.get(url, **kwargs)

# 使用示例
client = RobustProxyClient("myuser", "your_password_here", "US")

try:
    response = client.get("https://httpbin.org/ip")
    print(f"请求成功: {response.json()['origin']}")
except Exception as e:
    print(f"请求最终失败: {e}")

智能错误分类

python
import requests
from enum import Enum

class ErrorType(Enum):
    NETWORK_ERROR = "network"
    AUTH_ERROR = "auth"
    PROXY_ERROR = "proxy"
    RATE_LIMIT = "rate_limit"
    SERVER_ERROR = "server"

class SmartProxyClient:
    def __init__(self, username, password, country="US"):
        self.username = username
        self.password = password
        self.country = country
    
    def classify_error(self, error):
        if isinstance(error, requests.exceptions.ConnectionError):
            return ErrorType.NETWORK_ERROR
        elif isinstance(error, requests.exceptions.ProxyError):
            return ErrorType.AUTH_ERROR
        elif isinstance(error, requests.exceptions.Timeout):
            return ErrorType.PROXY_ERROR
        elif hasattr(error, 'response') and error.response.status_code == 429:
            return ErrorType.RATE_LIMIT
        elif hasattr(error, 'response') and error.response.status_code >= 500:
            return ErrorType.SERVER_ERROR
        else:
            return ErrorType.NETWORK_ERROR
    
    def handle_error(self, error_type):
        handlers = {
            ErrorType.NETWORK_ERROR: self._handle_network_error,
            ErrorType.AUTH_ERROR: self._handle_auth_error,
            ErrorType.PROXY_ERROR: self._handle_proxy_error,
            ErrorType.RATE_LIMIT: self._handle_rate_limit,
            ErrorType.SERVER_ERROR: self._handle_server_error
        }
        
        return handlers.get(error_type, self._handle_generic_error)
    
    def _handle_network_error(self):
        print("网络错误,检查网络连接")
        return 2  # 重试间隔2秒
    
    def _handle_auth_error(self):
        print("认证错误,检查用户名密码")
        return 0  # 不重试
    
    def _handle_proxy_error(self):
        print("代理错误,稍后重试")
        return 5  # 重试间隔5秒
    
    def _handle_rate_limit(self):
        print("请求频率过高,延长重试间隔")
        return 60  # 重试间隔60秒
    
    def _handle_server_error(self):
        print("服务器错误,稍后重试")
        return 10  # 重试间隔10秒
    
    def _handle_generic_error(self):
        print("未知错误,使用默认重试策略")
        return 3  # 重试间隔3秒
    
    def request_with_smart_retry(self, url, max_retries=3):
        for attempt in range(max_retries):
            try:
                proxies = {
                    "http": f"http://{self.username}:ip:res-c:{self.country}:{self.password}@gtx.paopaous.net:38082",
                    "https": f"http://{self.username}:ip:res-c:{self.country}:{self.password}@gtx.paopaous.net:38082"
                }
                
                response = requests.get(url, proxies=proxies, timeout=30)
                return response
            
            except Exception as e:
                error_type = self.classify_error(e)
                retry_delay = self.handle_error(error_type)
                
                if attempt == max_retries - 1 or retry_delay == 0:
                    raise e
                
                print(f"等待 {retry_delay} 秒后重试...")
                time.sleep(retry_delay)

# 使用示例
client = SmartProxyClient("myuser", "your_password_here", "US")

try:
    response = client.request_with_smart_retry("https://httpbin.org/ip")
    print(f"请求成功: {response.json()['origin']}")
except Exception as e:
    print(f"请求失败: {e}")

性能监控

响应时间监控

python
import time
import requests
from statistics import mean, median

class PerformanceMonitor:
    def __init__(self, username, password, country="US"):
        self.username = username
        self.password = password
        self.country = country
        self.response_times = []
    
    def get_proxies(self):
        proxy_user = f"{self.username}:ip:res-c:{self.country}"
        return {
            "http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
            "https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
        }
    
    def timed_request(self, url):
        start_time = time.time()
        
        try:
            response = requests.get(url, proxies=self.get_proxies(), timeout=30)
            end_time = time.time()
            
            response_time = end_time - start_time
            self.response_times.append(response_time)
            
            return {
                'success': True,
                'response_time': response_time,
                'status_code': response.status_code,
                'ip': response.json()['origin']
            }
        except Exception as e:
            end_time = time.time()
            response_time = end_time - start_time
            
            return {
                'success': False,
                'response_time': response_time,
                'error': str(e)
            }
    
    def run_performance_test(self, url, requests_count=50):
        print(f"开始性能测试,共 {requests_count} 个请求...")
        
        results = []
        for i in range(requests_count):
            result = self.timed_request(url)
            results.append(result)
            
            status = "✅" if result['success'] else "❌"
            print(f"请求 {i+1}: {status} - {result['response_time']:.2f}s")
            
            time.sleep(0.1)  # 避免过于频繁的请求
        
        self.print_statistics(results)
        return results
    
    def print_statistics(self, results):
        successful_results = [r for r in results if r['success']]
        failed_results = [r for r in results if not r['success']]
        
        if successful_results:
            response_times = [r['response_time'] for r in successful_results]
            
            print("\n=== 性能统计 ===")
            print(f"成功请求: {len(successful_results)}/{len(results)}")
            print(f"失败请求: {len(failed_results)}")
            print(f"成功率: {len(successful_results)/len(results)*100:.1f}%")
            print(f"平均响应时间: {mean(response_times):.2f}s")
            print(f"中位数响应时间: {median(response_times):.2f}s")
            print(f"最快响应时间: {min(response_times):.2f}s")
            print(f"最慢响应时间: {max(response_times):.2f}s")
        
        if failed_results:
            print(f"\n=== 失败原因 ===")
            error_counts = {}
            for result in failed_results:
                error = result.get('error', 'Unknown')
                error_counts[error] = error_counts.get(error, 0) + 1
            
            for error, count in error_counts.items():
                print(f"{error}: {count}次")

# 使用示例
monitor = PerformanceMonitor("myuser", "your_password_here", "US")
results = monitor.run_performance_test("https://httpbin.org/ip", 20)

并发性能测试

python
import concurrent.futures
import time
import requests

class ConcurrencyTester:
    def __init__(self, username, password, country="US"):
        self.username = username
        self.password = password
        self.country = country
    
    def get_proxies(self):
        proxy_user = f"{self.username}:ip:res-c:{self.country}"
        return {
            "http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
            "https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
        }
    
    def single_request(self, url, request_id):
        start_time = time.time()
        
        try:
            response = requests.get(url, proxies=self.get_proxies(), timeout=30)
            end_time = time.time()
            
            return {
                'request_id': request_id,
                'success': True,
                'response_time': end_time - start_time,
                'ip': response.json()['origin']
            }
        except Exception as e:
            end_time = time.time()
            
            return {
                'request_id': request_id,
                'success': False,
                'response_time': end_time - start_time,
                'error': str(e)
            }
    
    def test_concurrency(self, url, concurrent_users=5, requests_per_user=10):
        print(f"并发测试: {concurrent_users} 个并发用户,每个用户 {requests_per_user} 个请求")
        
        total_requests = concurrent_users * requests_per_user
        all_results = []
        
        start_time = time.time()
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
            # 为每个用户创建请求任务
            futures = []
            for user_id in range(concurrent_users):
                for req_id in range(requests_per_user):
                    request_id = f"user{user_id}-req{req_id}"
                    future = executor.submit(self.single_request, url, request_id)
                    futures.append(future)
            
            # 收集结果
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                all_results.append(result)
                
                status = "✅" if result['success'] else "❌"
                print(f"{status} {result['request_id']}: {result['response_time']:.2f}s")
        
        end_time = time.time()
        total_time = end_time - start_time
        
        self.print_concurrency_stats(all_results, total_time)
        return all_results
    
    def print_concurrency_stats(self, results, total_time):
        successful_results = [r for r in results if r['success']]
        failed_results = [r for r in results if not r['success']]
        
        print("\n=== 并发性能统计 ===")
        print(f"总请求数: {len(results)}")
        print(f"总耗时: {total_time:.2f}s")
        print(f"成功请求: {len(successful_results)}")
        print(f"失败请求: {len(failed_results)}")
        print(f"成功率: {len(successful_results)/len(results)*100:.1f}%")
        print(f"QPS (每秒请求数): {len(results)/total_time:.2f}")
        
        if successful_results:
            response_times = [r['response_time'] for r in successful_results]
            print(f"平均响应时间: {mean(response_times):.2f}s")
            print(f"最快响应时间: {min(response_times):.2f}s")
            print(f"最慢响应时间: {max(response_times):.2f}s")

# 使用示例
tester = ConcurrencyTester("myuser", "your_password_here", "US")
results = tester.test_concurrency("https://httpbin.org/ip", concurrent_users=3, requests_per_user=5)

配置完成!查看更多代理配置信息。

基于 MIT 许可发布