高级设置
本页面介绍AdminCloud代理的高级配置选项,帮助您优化性能、提升稳定性和实现更复杂的功能。
会话管理
会话保持
通过会话ID保持相同的IP地址:
bash
# 添加会话ID
curl -x gtx.paopaous.net:38082 \
--proxy-user "myuser:ip:res-c:US:sid-12345:your_password_here" \
-L https://httpbin.org/ip会话参数
| 参数 | 说明 | 示例 |
|---|---|---|
| sid | 会话标识符 | sid- |
| 12345 | 自定义字符串 | 任意字母数字组合 |
编程实现
Python会话管理
python
import requests
import uuid
class ProxySession:
def __init__(self, username, password, country="US"):
self.username = username
self.password = password
self.country = country
self.session_id = str(uuid.uuid4())[:8]
def get_proxies(self):
proxy_user = f"{self.username}:ip:res-c:{self.country}:sid-{self.session_id}"
return {
"http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
"https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
}
def test_session(self):
proxies = self.get_proxies()
response = requests.get("https://httpbin.org/ip", proxies=proxies)
return response.json()['origin']
# 使用示例
session = ProxySession("myuser", "your_password_here", "US")
# 测试会话保持
for i in range(3):
ip = session.test_session()
print(f"请求 {i+1}: {ip}")Node.js会话管理
javascript
const axios = require('axios');
const { v4: uuidv4 } = require('uuid');
class ProxySession {
constructor(username, password, country = 'US') {
this.username = username;
this.password = password;
this.country = country;
this.sessionId = uuidv4().substring(0, 8);
}
getProxyConfig() {
const proxyUser = `${this.username}:ip:res-c:${this.country}:sid-${this.sessionId}`;
return {
protocol: 'http',
host: 'gtx.paopaous.net',
port: 38082,
auth: {
username: proxyUser,
password: this.password
}
};
}
async testSession() {
const proxyConfig = this.getProxyConfig();
const response = await axios.get('https://httpbin.org/ip', { proxy: proxyConfig });
return response.data.origin;
}
}
// 使用示例
const session = new ProxySession('myuser', 'your_password_here', 'US');
for (let i = 0; i < 3; i++) {
session.testSession().then(ip => {
console.log(`请求 ${i + 1}: ${ip}`);
});
}连接池优化
Python连接池
python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OptimizedProxyClient:
def __init__(self, username, password, country="US"):
self.username = username
self.password = password
self.country = country
self.session = self._create_session()
def _create_session(self):
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
# 配置适配器
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=20,
pool_maxsize=50
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def get_proxies(self):
proxy_user = f"{self.username}:ip:res-c:{self.country}"
return {
"http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
"https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
}
def request(self, method, url, **kwargs):
kwargs.setdefault('proxies', self.get_proxies())
kwargs.setdefault('timeout', (10, 30))
return self.session.request(method, url, **kwargs)
# 使用示例
client = OptimizedProxyClient("myuser", "your_password_here", "US")
for i in range(10):
response = client.request("GET", "https://httpbin.org/ip")
print(f"请求 {i+1}: {response.json()['origin']}")Node.js连接池
javascript
const axios = require('axios');
const { Pool } = require('agentkeepalive');
class OptimizedProxyClient {
constructor(username, password, country = 'US') {
this.username = username;
this.password = password;
this.country = country;
this.axiosInstance = this._createAxiosInstance();
}
_createAxiosInstance() {
const httpsAgent = new Pool({
maxSockets: 50,
maxFreeSockets: 10,
timeout: 30000,
freeSocketTimeout: 30000
});
const proxyConfig = {
protocol: 'http',
host: 'gtx.paopaous.net',
port: 38082,
auth: {
username: `${this.username}:ip:res-c:${this.country}`,
password: this.password
}
};
return axios.create({
proxy: proxyConfig,
httpsAgent,
timeout: 30000
});
}
async request(method, url, options = {}) {
try {
const response = await this.axiosInstance.request({
method,
url,
...options
});
return response;
} catch (error) {
console.error('请求失败:', error.message);
throw error;
}
}
}
// 使用示例
const client = new OptimizedProxyClient('myuser', 'your_password_here', 'US');
for (let i = 0; i < 10; i++) {
client.request('GET', 'https://httpbin.org/ip')
.then(response => console.log(`请求 ${i + 1}: ${response.data.origin}`))
.catch(error => console.error(error));
}轮换策略
自动轮换
默认情况下,每次请求使用不同的IP:
python
import requests
proxies = {
"http": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082",
"https": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082"
}
# 每次请求使用不同IP
for i in range(5):
response = requests.get("https://httpbin.org/ip", proxies=proxies)
print(f"请求 {i+1}: {response.json()['origin']}")时间控制轮换
python
import time
import requests
def timed_rotation():
proxies = {
"http": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082",
"https": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082"
}
start_time = time.time()
rotation_interval = 60 # 60秒轮换一次
while time.time() - start_time < 300: # 运行5分钟
response = requests.get("https://httpbin.org/ip", proxies=proxies)
current_ip = response.json()['origin']
print(f"当前IP: {current_ip}")
time.sleep(10) # 每10秒检查一次
timed_rotation()条件轮换
python
import requests
import random
def conditional_rotation():
proxies = {
"http": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082",
"https": "http://myuser:ip:res-c:US:your_password_here@gtx.paopaous.net:38082"
}
current_ip = None
request_count = 0
while request_count < 20:
response = requests.get("https://httpbin.org/ip", proxies=proxies)
new_ip = response.json()['origin']
if new_ip != current_ip:
print(f"IP轮换: {current_ip} → {new_ip}")
current_ip = new_ip
request_count += 1
# 随机延迟,模拟真实用户行为
delay = random.uniform(1, 5)
time.sleep(delay)
conditional_rotation()错误处理与重试
指数退避重试
python
import time
import requests
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"请求失败,{delay}秒后重试... (尝试 {attempt + 1}/{max_retries})")
time.sleep(delay)
return None
return wrapper
return decorator
class RobustProxyClient:
def __init__(self, username, password, country="US"):
self.username = username
self.password = password
self.country = country
def get_proxies(self):
proxy_user = f"{self.username}:ip:res-c:{self.country}"
return {
"http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
"https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
}
@retry_with_backoff(max_retries=3, base_delay=2)
def get(self, url, **kwargs):
kwargs.setdefault('proxies', self.get_proxies())
kwargs.setdefault('timeout', 30)
return requests.get(url, **kwargs)
# 使用示例
client = RobustProxyClient("myuser", "your_password_here", "US")
try:
response = client.get("https://httpbin.org/ip")
print(f"请求成功: {response.json()['origin']}")
except Exception as e:
print(f"请求最终失败: {e}")智能错误分类
python
import requests
from enum import Enum
class ErrorType(Enum):
NETWORK_ERROR = "network"
AUTH_ERROR = "auth"
PROXY_ERROR = "proxy"
RATE_LIMIT = "rate_limit"
SERVER_ERROR = "server"
class SmartProxyClient:
def __init__(self, username, password, country="US"):
self.username = username
self.password = password
self.country = country
def classify_error(self, error):
if isinstance(error, requests.exceptions.ConnectionError):
return ErrorType.NETWORK_ERROR
elif isinstance(error, requests.exceptions.ProxyError):
return ErrorType.AUTH_ERROR
elif isinstance(error, requests.exceptions.Timeout):
return ErrorType.PROXY_ERROR
elif hasattr(error, 'response') and error.response.status_code == 429:
return ErrorType.RATE_LIMIT
elif hasattr(error, 'response') and error.response.status_code >= 500:
return ErrorType.SERVER_ERROR
else:
return ErrorType.NETWORK_ERROR
def handle_error(self, error_type):
handlers = {
ErrorType.NETWORK_ERROR: self._handle_network_error,
ErrorType.AUTH_ERROR: self._handle_auth_error,
ErrorType.PROXY_ERROR: self._handle_proxy_error,
ErrorType.RATE_LIMIT: self._handle_rate_limit,
ErrorType.SERVER_ERROR: self._handle_server_error
}
return handlers.get(error_type, self._handle_generic_error)
def _handle_network_error(self):
print("网络错误,检查网络连接")
return 2 # 重试间隔2秒
def _handle_auth_error(self):
print("认证错误,检查用户名密码")
return 0 # 不重试
def _handle_proxy_error(self):
print("代理错误,稍后重试")
return 5 # 重试间隔5秒
def _handle_rate_limit(self):
print("请求频率过高,延长重试间隔")
return 60 # 重试间隔60秒
def _handle_server_error(self):
print("服务器错误,稍后重试")
return 10 # 重试间隔10秒
def _handle_generic_error(self):
print("未知错误,使用默认重试策略")
return 3 # 重试间隔3秒
def request_with_smart_retry(self, url, max_retries=3):
for attempt in range(max_retries):
try:
proxies = {
"http": f"http://{self.username}:ip:res-c:{self.country}:{self.password}@gtx.paopaous.net:38082",
"https": f"http://{self.username}:ip:res-c:{self.country}:{self.password}@gtx.paopaous.net:38082"
}
response = requests.get(url, proxies=proxies, timeout=30)
return response
except Exception as e:
error_type = self.classify_error(e)
retry_delay = self.handle_error(error_type)
if attempt == max_retries - 1 or retry_delay == 0:
raise e
print(f"等待 {retry_delay} 秒后重试...")
time.sleep(retry_delay)
# 使用示例
client = SmartProxyClient("myuser", "your_password_here", "US")
try:
response = client.request_with_smart_retry("https://httpbin.org/ip")
print(f"请求成功: {response.json()['origin']}")
except Exception as e:
print(f"请求失败: {e}")性能监控
响应时间监控
python
import time
import requests
from statistics import mean, median
class PerformanceMonitor:
def __init__(self, username, password, country="US"):
self.username = username
self.password = password
self.country = country
self.response_times = []
def get_proxies(self):
proxy_user = f"{self.username}:ip:res-c:{self.country}"
return {
"http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
"https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
}
def timed_request(self, url):
start_time = time.time()
try:
response = requests.get(url, proxies=self.get_proxies(), timeout=30)
end_time = time.time()
response_time = end_time - start_time
self.response_times.append(response_time)
return {
'success': True,
'response_time': response_time,
'status_code': response.status_code,
'ip': response.json()['origin']
}
except Exception as e:
end_time = time.time()
response_time = end_time - start_time
return {
'success': False,
'response_time': response_time,
'error': str(e)
}
def run_performance_test(self, url, requests_count=50):
print(f"开始性能测试,共 {requests_count} 个请求...")
results = []
for i in range(requests_count):
result = self.timed_request(url)
results.append(result)
status = "✅" if result['success'] else "❌"
print(f"请求 {i+1}: {status} - {result['response_time']:.2f}s")
time.sleep(0.1) # 避免过于频繁的请求
self.print_statistics(results)
return results
def print_statistics(self, results):
successful_results = [r for r in results if r['success']]
failed_results = [r for r in results if not r['success']]
if successful_results:
response_times = [r['response_time'] for r in successful_results]
print("\n=== 性能统计 ===")
print(f"成功请求: {len(successful_results)}/{len(results)}")
print(f"失败请求: {len(failed_results)}")
print(f"成功率: {len(successful_results)/len(results)*100:.1f}%")
print(f"平均响应时间: {mean(response_times):.2f}s")
print(f"中位数响应时间: {median(response_times):.2f}s")
print(f"最快响应时间: {min(response_times):.2f}s")
print(f"最慢响应时间: {max(response_times):.2f}s")
if failed_results:
print(f"\n=== 失败原因 ===")
error_counts = {}
for result in failed_results:
error = result.get('error', 'Unknown')
error_counts[error] = error_counts.get(error, 0) + 1
for error, count in error_counts.items():
print(f"{error}: {count}次")
# 使用示例
monitor = PerformanceMonitor("myuser", "your_password_here", "US")
results = monitor.run_performance_test("https://httpbin.org/ip", 20)并发性能测试
python
import concurrent.futures
import time
import requests
class ConcurrencyTester:
def __init__(self, username, password, country="US"):
self.username = username
self.password = password
self.country = country
def get_proxies(self):
proxy_user = f"{self.username}:ip:res-c:{self.country}"
return {
"http": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082",
"https": f"http://{proxy_user}:{self.password}@gtx.paopaous.net:38082"
}
def single_request(self, url, request_id):
start_time = time.time()
try:
response = requests.get(url, proxies=self.get_proxies(), timeout=30)
end_time = time.time()
return {
'request_id': request_id,
'success': True,
'response_time': end_time - start_time,
'ip': response.json()['origin']
}
except Exception as e:
end_time = time.time()
return {
'request_id': request_id,
'success': False,
'response_time': end_time - start_time,
'error': str(e)
}
def test_concurrency(self, url, concurrent_users=5, requests_per_user=10):
print(f"并发测试: {concurrent_users} 个并发用户,每个用户 {requests_per_user} 个请求")
total_requests = concurrent_users * requests_per_user
all_results = []
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
# 为每个用户创建请求任务
futures = []
for user_id in range(concurrent_users):
for req_id in range(requests_per_user):
request_id = f"user{user_id}-req{req_id}"
future = executor.submit(self.single_request, url, request_id)
futures.append(future)
# 收集结果
for future in concurrent.futures.as_completed(futures):
result = future.result()
all_results.append(result)
status = "✅" if result['success'] else "❌"
print(f"{status} {result['request_id']}: {result['response_time']:.2f}s")
end_time = time.time()
total_time = end_time - start_time
self.print_concurrency_stats(all_results, total_time)
return all_results
def print_concurrency_stats(self, results, total_time):
successful_results = [r for r in results if r['success']]
failed_results = [r for r in results if not r['success']]
print("\n=== 并发性能统计 ===")
print(f"总请求数: {len(results)}")
print(f"总耗时: {total_time:.2f}s")
print(f"成功请求: {len(successful_results)}")
print(f"失败请求: {len(failed_results)}")
print(f"成功率: {len(successful_results)/len(results)*100:.1f}%")
print(f"QPS (每秒请求数): {len(results)/total_time:.2f}")
if successful_results:
response_times = [r['response_time'] for r in successful_results]
print(f"平均响应时间: {mean(response_times):.2f}s")
print(f"最快响应时间: {min(response_times):.2f}s")
print(f"最慢响应时间: {max(response_times):.2f}s")
# 使用示例
tester = ConcurrencyTester("myuser", "your_password_here", "US")
results = tester.test_concurrency("https://httpbin.org/ip", concurrent_users=3, requests_per_user=5)配置完成!查看更多代理配置信息。