Commit 06356c0d authored by yuhao's avatar yuhao

first commit

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*~
# C extensions
*.so
# Distribution / packaging
.Python
env/
bin/
build/
develop-eggs/
dist/
eggs/
lib/
lib64/
parts/
sdist/
var/
.idea/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Rope
.ropeproject
# Django stuff:
*.log
*.pot
# Sphinx documentation
docs/_build/
# config
fabfile.py
settings.online.py
/gaia/settings.py
media/
log/
crawldata/
conf/
/static
.vagrant/
Vagrantfile
*.DS_Store
dump.rdb
# .gitignore for yangchuncheng
settings_override*
.script/
.tmp.sql
.env
*.pem
coverage_html/
# -*- coding: utf-8 -*-
# Define here the models for your scraped items
#
# See documentation in:
# http://doc.scrapy.org/en/latest/topics/items.html
import datetime
import re
import redis
import scrapy
from scrapy.loader import ItemLoader
from scrapy.loader.processors import MapCompose, TakeFirst, Join
from utils.common import extract_num
from settings import SQL_DATETIME_FORMAT, SQL_DATE_FORMAT
from w3lib.html import remove_tags
from models.es_types import ArticleType
from elasticsearch_dsl.connections import connections
es = connections.create_connection(ArticleType._doc_type.using)
redis_cli = redis.StrictRedis()
class ArticlespiderItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
pass
def add_jobbole(value):
return value+"-bobby"
def date_convert(value):
try:
create_date = datetime.datetime.strptime(value, "%Y/%m/%d").date()
except Exception as e:
create_date = datetime.datetime.now().date()
return create_date
def get_nums(value):
match_re = re.match(".*?(\d+).*", value)
if match_re:
nums = int(match_re.group(1))
else:
nums = 0
return nums
def remove_comment_tags(value):
#去掉tag中提取的评论
if "评论" in value:
return ""
else:
return value
def return_value(value):
return value
def gen_suggests(index, info_tuple):
#根据字符串生成搜索建议数组
used_words = set()
suggests = []
for text, weight in info_tuple:
if text:
#调用es的analyze接口分析字符串
words = es.indices.analyze(index=index, analyzer="ik_max_word", params={'filter':["lowercase"]}, body=text)
anylyzed_words = set([r["token"] for r in words["tokens"] if len(r["token"])>1])
new_words = anylyzed_words - used_words
else:
new_words = set()
if new_words:
suggests.append({"input":list(new_words), "weight":weight})
return suggests
class ArticleItemLoader(ItemLoader):
#自定义itemloader
default_output_processor = TakeFirst()
class JobBoleArticleItem(scrapy.Item):
# 定义文章实体
title = scrapy.Field()
url = scrapy.Field()
content = scrapy.Field()
def get_insert_sql(self):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(fav_nums)
"""
params = (self["title"], self["url"], self["create_date"], self["fav_nums"])
return insert_sql, params
def save_to_es(self):
article = ArticleType()
article.title = self['title']
article.create_date = self["create_date"]
article.content = remove_tags(self["content"])
article.front_image_url = self["front_image_url"]
if "front_image_path" in self:
article.front_image_path = self["front_image_path"]
article.praise_nums = self["praise_nums"]
article.fav_nums = self["fav_nums"]
article.comment_nums = self["comment_nums"]
article.url = self["url"]
article.tags = self["tags"]
article.meta.id = self["url_object_id"]
article.suggest = gen_suggests(ArticleType._doc_type.index, ((article.title,10),(article.tags, 7)))
article.save()
redis_cli.incr("jobbole_count")
return
class ZhihuQuestionItem(scrapy.Item):
#知乎的问题 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
"""
zhihu_id = self["zhihu_id"][0]
topics = ",".join(self["topics"])
url = self["url"][0]
title = "".join(self["title"])
content = "".join(self["content"])
answer_num = extract_num("".join(self["answer_num"]))
comments_num = extract_num("".join(self["comments_num"]))
if len(self["watch_user_num"]) == 2:
watch_user_num = int(self["watch_user_num"][0])
click_num = int(self["watch_user_num"][1])
else:
watch_user_num = int(self["watch_user_num"][0])
click_num = 0
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time)
return insert_sql, params
class ZhihuAnswerItem(scrapy.Item):
#知乎的问题回答item
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
parise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num,
create_time, update_time, crawl_time
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num),
update_time=VALUES(update_time)
"""
create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
params = (
self["zhihu_id"], self["url"], self["question_id"],
self["author_id"], self["content"], self["parise_num"],
self["comments_num"], create_time, update_time,
self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
)
return insert_sql, params
def remove_splash(value):
#去掉工作城市的斜线
return value.replace("/","")
def handle_jobaddr(value):
addr_list = value.split("\n")
addr_list = [item.strip() for item in addr_list if item.strip()!="查看地图"]
return "".join(addr_list)
class LagouJobItemLoader(ItemLoader):
#自定义itemloader
default_output_processor = TakeFirst()
class LagouJobItem(scrapy.Item):
#拉勾网职位信息
title = scrapy.Field()
url = scrapy.Field()
url_object_id = scrapy.Field()
salary = scrapy.Field()
job_city = scrapy.Field(
input_processor=MapCompose(remove_splash),
)
work_years = scrapy.Field(
input_processor = MapCompose(remove_splash),
)
degree_need = scrapy.Field(
input_processor = MapCompose(remove_splash),
)
job_type = scrapy.Field()
publish_time = scrapy.Field()
job_advantage = scrapy.Field()
job_desc = scrapy.Field()
job_addr = scrapy.Field(
input_processor=MapCompose(remove_tags, handle_jobaddr),
)
company_name = scrapy.Field()
company_url = scrapy.Field()
tags = scrapy.Field(
input_processor = Join(",")
)
crawl_time = scrapy.Field()
def get_insert_sql(self):
insert_sql = """
insert into lagou_job(title, url, url_object_id, salary, job_city, work_years, degree_need,
job_type, publish_time, job_advantage, job_desc, job_addr, company_name, company_url,
tags, crawl_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE salary=VALUES(salary), job_desc=VALUES(job_desc)
"""
params = (
self["title"], self["url"], self["url_object_id"], self["salary"], self["job_city"],
self["work_years"], self["degree_need"], self["job_type"],
self["publish_time"], self["job_advantage"], self["job_desc"],
self["job_addr"], self["company_name"], self["company_url"],
self["job_addr"], self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
)
return insert_sql, params
\ No newline at end of file
# -*- coding: utf-8 -*-
# Define here the models for your spider middleware
#
# See documentation in:
# http://doc.scrapy.org/en/latest/topics/spider-middleware.html
from scrapy import signals
from fake_useragent import UserAgent
from tools.crawl_xici_ip import GetIP
class ArticlespiderSpiderMiddleware(object):
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, dict or Item objects.
for i in result:
yield i
def process_spider_exception(response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Response, dict
# or Item objects.
pass
def process_start_requests(start_requests, spider):
# Called with the start requests of the spider, and works
# similarly to the process_spider_output() method, except
# that it doesn’t have a response associated.
# Must return only requests (not items).
for r in start_requests:
yield r
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
class RandomUserAgentMiddlware(object):
#随机更换user-agent
def __init__(self, crawler):
super(RandomUserAgentMiddlware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get("RANDOM_UA_TYPE", "random")
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
def get_ua():
return getattr(self.ua, self.ua_type)
request.headers.setdefault('User-Agent', get_ua())
class RandomProxyMiddleware(object):
#动态设置ip代理
def process_request(self, request, spider):
get_ip = GetIP()
request.meta["proxy"] = get_ip.get_random_ip()
from selenium import webdriver
from scrapy.http import HtmlResponse
class JSPageMiddleware(object):
#通过chrome请求动态网页
def process_request(self, request, spider):
if spider.name == "jobbole":
# browser = webdriver.Chrome(executable_path="D:/Temp/chromedriver.exe")
spider.browser.get(request.url)
import time
time.sleep(3)
print ("访问:{0}".format(request.url))
return HtmlResponse(url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8", request=request)
# from pyvirtualdisplay import Display
# display = Display(visible=0, size=(800, 600))
# display.start()
#
# browser = webdriver.Chrome()
# browser.get()
# -*- coding: utf-8 -*-
__author__ = 'bobby'
# -*- coding: utf-8 -*-
__author__ = 'bobby'
from datetime import datetime
from elasticsearch_dsl import DocType, Date, Nested, Boolean, \
analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer
from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer
from elasticsearch_dsl.connections import connections
connections.create_connection(hosts=["localhost"])
class CustomAnalyzer(_CustomAnalyzer):
def get_analysis_definition(self):
return {}
ik_analyzer = CustomAnalyzer("ik_max_word", filter=["lowercase"])
class ArticleType(DocType):
#伯乐在线文章类型
suggest = Completion(analyzer=ik_analyzer)
title = Text(analyzer="ik_max_word")
create_date = Date()
url = Keyword()
url_object_id = Keyword()
front_image_url = Keyword()
front_image_path = Keyword()
praise_nums = Integer()
comment_nums = Integer()
fav_nums = Integer()
tags = Text(analyzer="ik_max_word")
content = Text(analyzer="ik_max_word")
class Meta:
index = "jobbole"
doc_type = "article"
if __name__ == "__main__":
ArticleType.init()
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
import codecs
import json
from scrapy.pipelines.images import ImagesPipeline
from scrapy.exporters import JsonItemExporter
from twisted.enterprise import adbapi
from models.es_types import ArticleType
from w3lib.html import remove_tags
import MySQLdb
import MySQLdb.cursors
class ArticlespiderPipeline(object):
def process_item(self, item, spider):
return item
class JsonWithEncodingPipeline(object):
# 自定义json文件的导出
def __init__(self):
self.file = codecs.open('article.json', 'w', encoding="utf-8") # 用codecs避免文件编码
def process_item(self, item, spider):
# 将item转换成字符串, ensure_ascii=False以便中文显示正常
lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
# 将字符串写入文件中
self.file.write(lines)
return item
def spider_closed(self, spider):
self.file.close()
class MysqlPipeline(object):
# 采用同步的机制写入mysql
def __init__(self):
self.conn = MySQLdb.connect('127.0.0.1', 'root', '', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
insert_sql = """
insert into jobbole_article(title, url, content)
VALUES (%s, %s, %s)
"""
self.cursor.execute(insert_sql, (item["title"], item["url"], item["content"]))
self.conn.commit()
class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
#使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) #处理异常
def handle_error(self, failure, item, spider):
#处理异步插入的异常
print (failure)
def do_insert(self, cursor, item):
#执行具体的插入
#根据不同的item 构建不同的sql语句并插入到mysql中
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
class JsonExporterPipleline(object):
#调用scrapy提供的json export导出json文件
def __init__(self):
self.file = open('articleexport.json', 'wb')
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
self.exporter.start_exporting()
def close_spider(self, spider):
self.exporter.finish_exporting()
self.file.close()
def process_item(self, item, spider):
self.exporter.export_item(item)
return item
class ArticleImagePipeline(ImagesPipeline):
def item_completed(self, results, item, info):
if "front_image_url" in item:
for ok, value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path
return item
class ElasticsearchPipeline(object):
#将数据写入到es中
def process_item(self, item, spider):
#将item转换为es的数据
item.save_to_es()
return item
\ No newline at end of file
# -*- coding: utf-8 -*-
import os
# Scrapy settings for ArticleSpider project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# http://doc.scrapy.org/en/latest/topics/settings.html
# http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
# http://scrapy.readthedocs.org/en/latest/topics/spider-middleware.html
BOT_NAME = 'ArticleSpider'
SPIDER_MODULES = ['ArticleSpider.spiders']
NEWSPIDER_MODULE = 'ArticleSpider.spiders'
# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = 'ArticleSpider (+http://www.yourdomain.com)'
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
# Configure maximum concurrent requests performed by Scrapy (default: 16)
#CONCURRENT_REQUESTS = 32
# Configure a delay for requests for the same website (default: 0)
# See http://scrapy.readthedocs.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
DOWNLOAD_DELAY = 10
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16
# Disable cookies (enabled by default)
COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False
# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
# 'Accept-Language': 'en',
#}
# Enable or disable spider middlewares
# See http://scrapy.readthedocs.org/en/latest/topics/spider-middleware.html
# SPIDER_MIDDLEWARES = {
# 'ArticleSpider.middlewares.ArticlespiderSpiderMiddleware': 543,
# }
# Enable or disable downloader middlewares
# See http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html
# DOWNLOADER_MIDDLEWARES = {
# 'ArticleSpider.middlewares.JSPageMiddleware': 1,
# # 'ArticleSpider.middlewares.RandomUserAgentMiddlware': 543,
# # 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
# }
# Enable or disable extensions
# See http://scrapy.readthedocs.org/en/latest/topics/extensions.html
#EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
#}
# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'ArticleSpider.pipelines.JsonWithEncodingPipeline': 2,
# # 'scrapy.pipelines.images.ImagesPipeline': 1,
# 'ArticleSpider.pipelines.ArticleImagePipeline': 1,
# 'ArticleSpider.pipelines.MysqlTwistedPipline': 1,
# 'ArticleSpider.pipelines.MysqlPipeline': 1,
# 'ArticleSpider.pipelines.ElasticsearchPipeline': 1
}
IMAGES_URLS_FIELD = "front_image_url"
project_dir = os.path.abspath(os.path.dirname(__file__))
IMAGES_STORE = os.path.join(project_dir, 'images')
import sys
BASE_DIR = os.path.dirname(os.path.abspath(os.path.dirname(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR, 'ArticleSpider'))
USER_AGENT = "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0"
RANDOM_UA_TYPE = "random"
#
# IMAGES_MIN_HEIGHT = 100
# IMAGES_MIN_WIDTH = 100
# Enable and configure the AutoThrottle extension (disabled by default)
# See http://doc.scrapy.org/en/latest/topics/autothrottle.html
AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# See http://scrapy.readthedocs.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = 'httpcache'
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'
MYSQL_HOST = "127.0.0.1"
MYSQL_DBNAME = "article_spider"
MYSQL_USER = "root"
MYSQL_PASSWORD = "root"
SQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
SQL_DATE_FORMAT = "%Y-%m-%d"
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.
# -*- coding: utf-8 -*-
import re
import scrapy
import datetime
from scrapy.http import Request
from urllib import parse
from scrapy.loader import ItemLoader
from ArticleSpider.items import JobBoleArticleItem, ArticleItemLoader
from selenium import webdriver
from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
start_urls = ['http://blog.jobbole.com/all-posts/']
# def __init__(self):
# self.browser = webdriver.Chrome(executable_path="D:/Temp/chromedriver.exe")
# super(JobboleSpider, self).__init__()
# dispatcher.connect(self.spider_closed, signals.spider_closed)
#
# def spider_closed(self, spider):
# #当爬虫退出的时候关闭chrome
# print ("spider closed")
# self.browser.quit()
#收集伯乐在线所有404的url以及404页面数
handle_httpstatus_list = [404]
def __init__(self, **kwargs):
self.fail_urls = []
dispatcher.connect(self.handle_spider_closed, signals.spider_closed)
def handle_spider_closed(self, spider, reason):
self.crawler.stats.set_value("failed_urls", ",".join(self.fail_urls))
def parse(self, response):
"""
1. 获取文章列表页中的文章url并交给scrapy下载后并进行解析
2. 获取下一页的url并交给scrapy进行下载, 下载完成后交给parse
"""
#解析列表页中的所有文章url并交给scrapy下载后并进行解析
if response.status == 404:
self.fail_urls.append(response.url)
self.crawler.stats.inc_value("failed_url")
# 获取所有文章链接
post_nodes = response.css("#archive .floated-thumb .post-thumb a")
for post_node in post_nodes:
image_url = post_node.css("img::attr(src)").extract_first("") # 如果list为空,返回""
post_url = post_node.css("::attr(href)").extract_first("")
# yield语句直接将Request交给scrapy去下载, 实际上yield利用了twisted的异步机制
yield Request(url=parse.urljoin(response.url, post_url), meta={"front_image_url": image_url},
callback=self.parse_detail)
#提取下一页并交给scrapy进行下载
next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
# 如果有下一页
if next_url:
yield Request(url=parse.urljoin(response.url, post_url), callback=self.parse)
def parse_detail(self, response):
"""
从每篇文章中提取结构化数据
:param response:
:return: article_item
"""
article_item = JobBoleArticleItem()
# 通过xpath提取文章的具体字段
title = response.xpath('//div[@class="entry-header"]/h1/text()').extract_first("")
content = response.xpath("//div[@class='entry']").extract()[0]
# 通过css选择器提取字段
front_image_url = response.meta.get("front_image_url", "") #文章封面图
# title = response.css(".entry-header h1::text").extract()[0]
#
# content = response.css("div.entry").extract()[0]
article_item["title"] = title
article_item["url"] = response.url
article_item["content"] = content
# 通过item loader加载item
# item_loader = ArticleItemLoader(item=JobBoleArticleItem(), response=response)
# item_loader.add_css("title", ".entry-header h1::text")
# item_loader.add_value("url", response.url)
# item_loader.add_css("content", "div.entry")
# article_item = item_loader.load_item()
yield article_item
# -*- coding: utf-8 -*-
from datetime import datetime
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from items import LagouJobItemLoader, LagouJobItem
from ArticleSpider.utils.common import get_md5
class LagouSpider(CrawlSpider):
name = 'lagou'
allowed_domains = ['www.lagou.com']
start_urls = ['https://www.lagou.com']
rules = (
Rule(LinkExtractor(allow=("zhaopin/.*",)), follow=True),
Rule(LinkExtractor(allow=("gongsi/j\d+.html",)), follow=True),
Rule(LinkExtractor(allow=r'jobs/\d+.html'), callback='parse_job', follow=True),
)
#
# def parse_start_url(self, response):
# return []
#
# def process_results(self, response, results):
# return results
def parse_job(self, response):
#解析拉勾网的职位
item_loader = LagouJobItemLoader(item=LagouJobItem(), response=response)
item_loader.add_css("title", ".job-name::attr(title)")
item_loader.add_value("url", response.url)
item_loader.add_value("url_object_id", get_md5(response.url))
item_loader.add_css("salary", ".job_request .salary::text")
item_loader.add_xpath("job_city", "//*[@class='job_request']/p/span[2]/text()")
item_loader.add_xpath("work_years", "//*[@class='job_request']/p/span[3]/text()")
item_loader.add_xpath("degree_need", "//*[@class='job_request']/p/span[4]/text()")
item_loader.add_xpath("job_type", "//*[@class='job_request']/p/span[5]/text()")
item_loader.add_css("tags", '.position-label li::text')
item_loader.add_css("publish_time", ".publish_time::text")
item_loader.add_css("job_advantage", ".job-advantage p::text")
item_loader.add_css("job_desc", ".job_bt div")
item_loader.add_css("job_addr", ".work_addr")
item_loader.add_css("company_name", "#job_company dt a img::attr(alt)")
item_loader.add_css("company_url", "#job_company dt a::attr(href)")
item_loader.add_value("crawl_time", datetime.now())
job_item = item_loader.load_item()
return job_item
# -*- coding: utf-8 -*-
import re
import json
import datetime
try:
import urlparse as parse
except:
from urllib import parse
import scrapy
from scrapy.loader import ItemLoader
from items import ZhihuQuestionItem, ZhihuAnswerItem
class ZhihuSpider(scrapy.Spider):
name = "zhihu"
allowed_domains = ["www.zhihu.com"]
start_urls = ['https://www.zhihu.com/']
#question的第一页answer的请求url
start_answer_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?sort_by=default&include=data%5B%2A%5D.is_normal%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccollapsed_counts%2Creviewing_comments_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Cmark_infos%2Ccreated_time%2Cupdated_time%2Crelationship.is_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.author.is_blocking%2Cis_blocked%2Cis_followed%2Cvoteup_count%2Cmessage_thread_token%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}"
headers = {
"HOST": "www.zhihu.com",
"Referer": "https://www.zhizhu.com",
'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0"
}
custom_settings = {
"COOKIES_ENABLED": True
}
def parse(self, response):
"""
提取出html页面中的所有url 并跟踪这些url进行一步爬取
如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数
"""
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls = filter(lambda x:True if x.startswith("https") else False, all_urls)
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
#如果提取到question相关的页面则下载后交由提取函数进行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
else:
#如果不是question页面则直接进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
def parse_question(self, response):
#处理question页面, 从页面中提取出具体的question item
if "QuestionHeader-title" in response.text:
#处理新版本
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")
question_item = item_loader.load_item()
else:
#处理老版本页面的item提取
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
# item_loader.add_css("title", ".zh-question-title h2 a::text")
item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
item_loader.add_css("content", "#zh-question-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", "#zh-question-answer-num::text")
item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text")
# item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text")
item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()")
item_loader.add_css("topics", ".zm-tag-editor-labels a::text")
question_item = item_loader.load_item()
yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
yield question_item
def parse_answer(self, reponse):
#处理question的answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"]
#提取answer的具体字段
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
def start_requests(self):
return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)]
def login(self, response):
response_text = response.text
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
if xsrf:
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": xsrf,
"phone_num": "",
"password": "",
"captcha": ""
}
import time
t = str(int(time.time() * 1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.login_after_captcha)
def login_after_captcha(self, response):
with open("captcha.jpg", "wb") as f:
f.write(response.body)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("输入验证码\n>")
post_data = response.meta.get("post_data", {})
post_url = "https://www.zhihu.com/login/phone_num"
post_data["captcha"] = captcha
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login
)]
def check_login(self, response):
#验证服务器的返回数据判断是否成功
text_json = json.loads(response.text)
if "msg" in text_json and text_json["msg"] == "登录成功":
for url in self.start_urls:
yield scrapy.Request(url, dont_filter=True, headers=self.headers)
# -*- coding: utf-8 -*-
__author__ = 'bobby'
import mmh3
import BitVector
import redis
import math
import time
class BloomFilter():
#内置100个随机种子
SEEDS = [543, 460, 171, 876, 796, 607, 650, 81, 837, 545, 591, 946, 846, 521, 913, 636, 878, 735, 414, 372,
344, 324, 223, 180, 327, 891, 798, 933, 493, 293, 836, 10, 6, 544, 924, 849, 438, 41, 862, 648, 338,
465, 562, 693, 979, 52, 763, 103, 387, 374, 349, 94, 384, 680, 574, 480, 307, 580, 71, 535, 300, 53,
481, 519, 644, 219, 686, 236, 424, 326, 244, 212, 909, 202, 951, 56, 812, 901, 926, 250, 507, 739, 371,
63, 584, 154, 7, 284, 617, 332, 472, 140, 605, 262, 355, 526, 647, 923, 199, 518]
#capacity是预先估计要去重的数量
#error_rate表示错误率
#conn表示redis的连接客户端
#key表示在redis中的键的名字前缀
def __init__(self, capacity=1000000000, error_rate=0.00000001, conn=None, key='BloomFilter'):
self.m = math.ceil(capacity*math.log2(math.e)*math.log2(1/error_rate)) #需要的总bit位数
self.k = math.ceil(math.log1p(2)*self.m/capacity) #需要最少的hash次数
self.mem = math.ceil(self.m/8/1024/1024) #需要的多少M内存
self.blocknum = math.ceil(self.mem/512) #需要多少个512M的内存块,value的第一个字符必须是ascii码,所有最多有256个内存块
self.seeds = self.SEEDS[0:self.k]
self.key = key
self.N = 2**31-1
self.redis = conn
if not self.redis:
#默认如果没有redis连接,在内存中使用512M的内存块去重
self.bitset = BitVector.BitVector(size=1<<32)
print(self.mem)
print(self.k)
def add(self, value):
name = self.key + "_" + str(ord(value[0])%self.blocknum)
hashs = self.get_hashs(value)
for hash in hashs:
if self.redis:
self.redis.setbit(name, hash, 1)
else:
self.bitset[hash] = 1
def is_exist(self, value):
name = self.key + "_" + str(ord(value[0])%self.blocknum)
hashs = self.get_hashs(value)
exist = True
for hash in hashs:
if self.redis:
exist = exist & self.redis.getbit(name, hash)
else:
exist = exist & self.bitset[hash]
return exist
def get_hashs(self, value):
hashs = list()
for seed in self.seeds:
hash = mmh3.hash(value, seed)
if hash >= 0:
hashs.append(hash)
else:
hashs.append(self.N - hash)
return hashs
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
conn = redis.StrictRedis(connection_pool=pool)
start = time.time()
bf = BloomFilter(conn=conn)
bf.add('test')
bf.add('fsest1')
print(bf.is_exist('qest'))
print(bf.is_exist('testdsad'))
end = time.time()
print(end-start)
\ No newline at end of file
# -*- coding: utf-8 -*-
__author__ = 'bobby'
import hashlib
import re
def get_md5(url):
if isinstance(url, str):
url = url.encode("utf-8")
m = hashlib.md5()
m.update(url)
return m.hexdigest()
def extract_num(text):
#从字符串中提取出数字
match_re = re.match(".*?(\d+).*", text)
if match_re:
nums = int(match_re.group(1))
else:
nums = 0
return nums
if __name__ == "__main__":
print (get_md5("http://jobbole.com".encode("utf-8")))
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
# -*- coding: utf-8 -*-
__author__ = 'bobby'
import requests
try:
import cookielib
except:
import http.cookiejar as cookielib
import re
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename="cookies.txt")
try:
session.cookies.load(ignore_discard=True)
except:
print ("cookie未能加载")
agent = "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0"
header = {
"HOST":"www.zhihu.com",
"Referer": "https://www.zhizhu.com",
'User-Agent': agent
}
def is_login():
#通过个人中心页面返回状态码来判断是否为登录状态
inbox_url = "https://www.zhihu.com/question/56250357/answer/148534773"
response = session.get(inbox_url, headers=header, allow_redirects=False)
if response.status_code != 200:
return False
else:
return True
def get_xsrf():
#获取xsrf code
response = session.get("https://www.zhihu.com", headers=header)
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response.text)
if match_obj:
return (match_obj.group(1))
else:
return ""
def get_index():
response = session.get("https://www.zhihu.com", headers=header)
with open("index_page.html", "wb") as f:
f.write(response.text.encode("utf-8"))
print ("ok")
def get_captcha():
import time
t = str(int(time.time()*1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
t = session.get(captcha_url, headers=header)
with open("captcha.jpg","wb") as f:
f.write(t.content)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("输入验证码\n>")
return captcha
def zhihu_login(account, password):
#知乎登录
if re.match("^1\d{10}",account):
print ("手机号码登录")
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": get_xsrf(),
"phone_num": account,
"password": password,
"captcha":get_captcha()
}
else:
if "@" in account:
#判断用户名是否为邮箱
print("邮箱方式登录")
post_url = "https://www.zhihu.com/login/email"
post_data = {
"_xsrf": get_xsrf(),
"email": account,
"password": password
}
response_text = session.post(post_url, data=post_data, headers=header)
session.cookies.save()
zhihu_login("18782902568", "admin123")
# get_index()
is_login()
# get_captcha()
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
captcha.jpg

3.86 KB

[0]
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
# -*- coding: utf-8 -*-
__author__ = 'ian'
from scrapy.cmdline import execute
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
execute(["scrapy", "crawl", "jobbole"]) # 相当于执行scrapy crawl jobbole
# execute(["scrapy", "crawl", "zhihu"])
# execute(["scrapy", "crawl", "lagou"])
\ No newline at end of file
This diff is collapsed.
asn1crypto==0.23.0
attrs==17.3.0
Automat==0.6.0
certifi==2017.11.5
cffi==1.11.2
chardet==3.0.4
constantly==15.1.0
cryptography==2.1.3
cssselect==1.0.1
elasticsearch==6.0.0
enum34==1.1.6
hyperlink==17.3.1
idna==2.6
incremental==17.5.0
ipaddress==1.0.18
lxml==4.1.1
mysqlclient==1.3.12
olefile==0.44
parsel==1.2.0
Pillow==4.3.0
pyasn1==0.3.7
pyasn1-modules==0.1.5
pycparser==2.18
PyDispatcher==2.0.5
pyOpenSSL==17.3.0
queuelib==1.4.2
requests==2.18.4
Scrapy==1.4.0
service-identity==17.0.0
six==1.11.0
Twisted==17.9.0
urllib3==1.22
w3lib==1.18.0
zope.interface==4.4.3
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.org/en/latest/deploy.html
[settings]
default = ArticleSpider.settings
# shell = bpython
[deploy:bobby]
url = http://localhost:6800/
project = ArticleSpider
# Automatically created by: scrapyd-deploy
from setuptools import setup, find_packages
setup(
name = 'project',
version = '1.0',
packages = find_packages(),
entry_points = {'scrapy': ['settings = ArticleSpider.settings']},
)
# -*- coding: utf-8 -*-
__author__ = 'bobby'
import redis
redis_cli = redis.StrictRedis()
redis_cli.incr("jobbole_count")
# -*- coding: utf-8 -*-
__author__ = 'bobby'
# -*- coding: utf-8 -*-
__author__ = 'bobby'
import requests
from scrapy.selector import Selector
import MySQLdb
conn = MySQLdb.connect(host="127.0.0.1", user="root", passwd="root", db="article_spider", charset="utf8")
cursor = conn.cursor()
def crawl_ips():
#爬取西刺的免费ip代理
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0"}
for i in range(1568):
re = requests.get("http://www.xicidaili.com/nn/{0}".format(i), headers=headers)
selector = Selector(text=re.text)
all_trs = selector.css("#ip_list tr")
ip_list = []
for tr in all_trs[1:]:
speed_str = tr.css(".bar::attr(title)").extract()[0]
if speed_str:
speed = float(speed_str.split("秒")[0])
all_texts = tr.css("td::text").extract()
ip = all_texts[0]
port = all_texts[1]
proxy_type = all_texts[5]
ip_list.append((ip, port, proxy_type, speed))
for ip_info in ip_list:
cursor.execute(
"insert proxy_ip(ip, port, speed, proxy_type) VALUES('{0}', '{1}', {2}, 'HTTP')".format(
ip_info[0], ip_info[1], ip_info[3]
)
)
conn.commit()
class GetIP(object):
def delete_ip(self, ip):
#从数据库中删除无效的ip
delete_sql = """
delete from proxy_ip where ip='{0}'
""".format(ip)
cursor.execute(delete_sql)
conn.commit()
return True
def judge_ip(self, ip, port):
#判断ip是否可用
http_url = "http://www.baidu.com"
proxy_url = "http://{0}:{1}".format(ip, port)
try:
proxy_dict = {
"http":proxy_url,
}
response = requests.get(http_url, proxies=proxy_dict)
except Exception as e:
print ("invalid ip and port")
self.delete_ip(ip)
return False
else:
code = response.status_code
if code >= 200 and code < 300:
print ("effective ip")
return True
else:
print ("invalid ip and port")
self.delete_ip(ip)
return False
def get_random_ip(self):
#从数据库中随机获取一个可用的ip
random_sql = """
SELECT ip, port FROM proxy_ip
ORDER BY RAND()
LIMIT 1
"""
result = cursor.execute(random_sql)
for ip_info in cursor.fetchall():
ip = ip_info[0]
port = ip_info[1]
judge_re = self.judge_ip(ip, port)
if judge_re:
return "http://{0}:{1}".format(ip, port)
else:
return self.get_random_ip()
# print (crawl_ips())
if __name__ == "__main__":
get_ip = GetIP()
get_ip.get_random_ip()
\ No newline at end of file
# -*- coding: utf-8 -*-
__author__ = 'bobby'
from selenium import webdriver
from scrapy.selector import Selector
# browser = webdriver.Chrome(executable_path="D:/Temp/chromedriver.exe")
# browser.get("https://www.zhihu.com/#signin")
#
# browser.find_element_by_css_selector(".view-signin input[name='account']").send_keys("18782902568")
# browser.find_element_by_css_selector(".view-signin input[name='password']").send_keys("admin125")
#
# browser.find_element_by_css_selector(".view-signin button.sign-button").click()
#selenium 完成微博模拟登录
# browser.get("https://www.oschina.net/blog")
# import time
# time.sleep(5)
# browser.find_element_by_css_selector("#loginname").send_keys("liyao198705@sina.com")
# browser.find_element_by_css_selector(".info_list.password input[node-type='password']").send_keys("da_ge_da")
# browser.find_element_by_css_selector(".info_list.login_btn a[node-type='submitBtn']").click()
# for i in range(3):
# browser.execute_script("window.scrollTo(0, document.body.scrollHeight); var lenOfPage=document.body.scrollHeight; return lenOfPage;")
# time.sleep(3)
# t_selector = Selector(text=browser.page_source)
# print (t_selector.css(".tm-promo-price .tm-price::text").extract())
#设置chromedriver不加载图片
# chrome_opt = webdriver.ChromeOptions()
# prefs = {"profile.managed_default_content_settings.images":2}
# chrome_opt.add_experimental_option("prefs", prefs)
#phantomjs, 无界面的浏览器, 多进程情况下phantomjs性能会下降很严重
browser = webdriver.PhantomJS(executable_path="E:/home/phantomjs-2.1.1-windows/bin/phantomjs.exe")
browser.get("https://detail.tmall.com/item.htm?spm=a230r.1.14.3.yYBVG6&id=538286972599&cm_id=140105335569ed55e27b&abbucket=15&sku_properties=10004:709990523;5919063:6536025")
print (browser.page_source)
browser.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
__author__ = 'bobby'
import json
import requests
class YDMHttp(object):
apiurl = 'http://api.yundama.com/api.php'
username = ''
password = ''
appid = ''
appkey = ''
def __init__(self, username, password, appid, appkey):
self.username = username
self.password = password
self.appid = str(appid)
self.appkey = appkey
def balance(self):
data = {'method': 'balance', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey}
response_data = requests.post(self.apiurl, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("获取剩余积分", ret_data["balance"])
return ret_data["balance"]
else:
return None
def login(self):
data = {'method': 'login', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey}
response_data = requests.post(self.apiurl, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("登录成功", ret_data["uid"])
return ret_data["uid"]
else:
return None
def decode(self, filename, codetype, timeout):
data = {'method': 'upload', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey, 'codetype': str(codetype), 'timeout': str(timeout)}
files = {'file': open(filename, 'rb')}
response_data = requests.post(self.apiurl, files=files, data=data)
ret_data = json.loads(response_data.text)
if ret_data["ret"] == 0:
print ("识别成功", ret_data["text"])
return ret_data["text"]
else:
return None
def ydm(file_path):
username = 'da_ge_da1'
# 密码
password = 'da_ge_da'
# 软件ID,开发者分成必要参数。登录开发者后台【我的软件】获得!
appid = 3129
# 软件密钥,开发者分成必要参数。登录开发者后台【我的软件】获得!
appkey = '40d5ad41c047179fc797631e3b9c3025'
# 图片文件
filename = 'image/captcha.jpg'
# 验证码类型,# 例:1004表示4位字母数字,不同类型收费不同。请准确填写,否则影响识别率。在此查询所有类型 http://www.yundama.com/price.html
codetype = 5000
# 超时时间,秒
timeout = 60
# 检查
yundama = YDMHttp(username, password, appid, appkey)
if (username == 'username'):
print('请设置好相关参数再测试')
else:
# 开始识别,图片路径,验证码类型ID,超时时间(秒),识别结果
return yundama.decode(file_path, codetype, timeout);
if __name__ == "__main__":
# 用户名
username = 'da_ge_da1'
# 密码
password = 'da_ge_da'
# 软件ID,开发者分成必要参数。登录开发者后台【我的软件】获得!
appid = 3129
# 软件密钥,开发者分成必要参数。登录开发者后台【我的软件】获得!
appkey = '40d5ad41c047179fc797631e3b9c3025'
# 图片文件
filename = 'image/captcha.jpg'
# 验证码类型,# 例:1004表示4位字母数字,不同类型收费不同。请准确填写,否则影响识别率。在此查询所有类型 http://www.yundama.com/price.html
codetype = 5000
# 超时时间,秒
timeout = 60
# 检查
if (username == 'username'):
print ('请设置好相关参数再测试')
else:
# 初始化
yundama = YDMHttp(username, password, appid, appkey)
# 登陆云打码
uid = yundama.login();
print('uid: %s' % uid)
# 登陆云打码
uid = yundama.login();
print ('uid: %s' % uid)
# 查询余额
balance = yundama.balance();
print ('balance: %s' % balance)
# 开始识别,图片路径,验证码类型ID,超时时间(秒),识别结果
text = yundama.decode(filename, codetype, timeout);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment