1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# -*- coding:utf-8 -*-
# @Time : 2019/9/12 10:19
# @Author : litao
"""
私密代理使用示例
接口鉴权说明:
目前支持的鉴权方式有 "simple" 和 "hmacsha1" 两种,默认使用 "simple"鉴权。
所有方法均可添加关键字参数sign_type修改鉴权方式。
"""
import redis, random
import kdl, requests
import sys
# from redis.sentinel import Sentinel
# sentinel = Sentinel([('192.168.17.65', 26379),
# ('192.168.17.66', 26379),
# ('192.168.17.67', 26379)
# ], socket_timeout=0.5)
# # 查看master节点
# master = sentinel.discover_master('ida_redis_master')
# # 查看slave 节点
# slave = sentinel.discover_slaves('ida_redis_master')
# # 连接数据库
# rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=7, decode_responses=True)
def func_get_redis(stats="on_line"):
sys_path = sys.path
for p in sys_path:
if "C:\\" in p:
stats = "test"
if stats == "on_line":
rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=18, decode_responses=True, password='ReDis!GmTx*0aN12')
else:
rds = redis.StrictRedis(host='172.18.51.10', port=6379, db=18, decode_responses=True)
return rds
rds = func_get_redis()
def get_proxy_from_redis():
try:
one_proxy = rds.randomkey()
username = "hanye"
password = "i9mmu0a3"
proxies = {
"http": "http://%(user)s:%(pwd)s@%(ip)s/" % {'user': username, 'pwd': password, 'ip': one_proxy},
"https": "http://%(user)s:%(pwd)s@%(ip)s/" % {'user': username, 'pwd': password, 'ip': one_proxy}
}
return proxies
except Exception as e:
print(e)
return None
def func_get_proxy_to_redis():
# chance = random.random()
auth = kdl.Auth("990866563045611", "quxguz4hwm9cxnx6wpjhkokx04klpr8v")
client = kdl.Client(auth)
# 获取订单到期时间, 返回时间字符串
# expire_time = client.get_order_expire_time()
# print("expire time", expire_time)
# 获取ip白名单, 返回ip列表
# ip_whitelist = client.get_ip_whitelist()
# print("ip whitelist", ip_whitelist)
# 设置ip白名单,参数类型为字符串或列表或元组
# 成功则返回True, 否则抛出异常
# client.set_ip_whitelist([])
# client.set_ip_whitelist("127.0.0.1, 192.168.0.139")
# print(client.get_ip_whitelist())
# client.set_ip_whitelist(tuple())
# 提取私密代理ip, 第一个参数为提取的数量, 其他参数以关键字参数的形式传入(不需要传入signature和timestamp)
# 具体有哪些参数请参考帮助中心: "https://help.kuaidaili.com/api/getdps/"
# 返回ip列表
# 注意:若您使用的是python2, 且在终端调用,或在文件中调用且没有加 "# -*- coding: utf-8 -*-" 的话
# 传入area参数时,请传入unicode类型,如 area=u'北京,上海'
# ips = client.get_dps(1, sign_type='simple', format='json', pt=2, area='北京,上海,广东')
# print("dps proxy: ", ips)
# 检测私密代理有效性: 返回 ip: true/false 组成的dict
# ips = client.get_dps(1, sign_type='simple', format='json')
# valids = client.check_dps_valid(ips)
# print("valids: ", valids)
# 获取私密代理剩余时间: 返回 ip: seconds(剩余秒数) 组成的dict
ips = client.get_dps(1, format='json', dedup=1)
seconds = client.get_dps_valid_time(ips)
# print("seconds: ", seconds)
for key in seconds:
rds.set(key, key, ex=int(seconds[key]) - 3)
# 获取计数版ip余额(仅私密代理计数版)
# balance = client.get_ip_balance(sign_type='hmacsha1')
# print("balance: ", balance)
def proxy_test(proxies):
page_url = "http://dev.kdlapi.com/testproxy/"
headers = {
"Accept-Encoding": "Gzip", # 使用gzip压缩传输数据让访问更快
}
res = requests.get(url=page_url, proxies=proxies, headers=headers)
# print(res.status_code) # 获取Reponse的返回码
if res.status_code == 200:
print(res.content.decode('utf-8')) # 获取页面内容
def get_proxy_dic(max_proxies=None):
if not max_proxies:
max_proxies = 8
try:
res = rds.dbsize()
except Exception as e:
print("redis error")
return None
if res is None:
return None
if res < max_proxies:
func_get_proxy_to_redis()
return get_proxy_from_redis()
else:
return get_proxy_from_redis()
def get_proxy(proxies_num=None):
if proxies_num:
proxies = get_proxy_dic(max_proxies=proxies_num)
# print("get a IP %s" % str(proxies))
return proxies
else:
return None
if __name__ == "__main__":
proxy_pool_dic = get_proxy(11)
print(proxy_pool_dic)
proxy_test(proxy_pool_dic)
print(get_proxy_from_redis())