#items.py
#定义数据结构
class ItestItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
title = scrapy.Field()
href = scrapy.Field()
content = scrapy.Field()
pass
#pipelines.py
#这里持久化使用pymysql 框架对mysql数据库进行新增操作,
import pymysql
class ItestPipeline:
#初始化连接
def __init__(self):
self.db = pymysql.connect(host='localhost', user='root',
passwd='123456',
db='spider', port=3306, charset='utf8mb4')
self.cur = self.db.cursor()
#拼接sql并插入数据
def process_item(self, item, spider):
sql = 'INSERT INTO mtime_movie_list (`id`, `title `, `href `, `content`) VALUES (0, %s, %s, %s);'
try:
print(sql)
self.cur.execute(sql, [item['title '], item['href '], item['content']])
# 提交
self.db.commit()
except Exception as e:
raise e
# 错误回滚
db.rollback()
return item
def close_spider(self, spider):
# 接收结束信号
self.db.close()
print('完成数据插入...')
#settings.py
ITEM_PIPELINES = {
#启用自定义pipeline
'itest.pipelines.MtimePipeline': 300,
}
#itest.py
import scrapy
from selenium import webdriver
from itest.items import ItestItem
class itest(scrapy.Spider):
def __init__(self):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
self.browser = webdriver.Chrome(chrome_options=chrome_options, executable_path='/usr/bin/chromedriver')
super(itest, self).__init__()
def closed(self, reason):
self.browser.close()
name = "itest"
start_urls = ['https://againriver.com/#blog']
def parse(self, response):
source = response.css("ol.post-list>li")
for i in source:
#创建自定义item对象
dict = ItestItem()
title = i.css("h2.post-list__post-title>a::attr(title)").extract_first()
href = i.css("h2.post-list__post-title>a::attr(href)").extract_first()
dict["title"] = title
dict["href"] = href
yield scrapy.Request(url=str(dict["href"]), meta={"dict": dict}, callback=self.parseContent)
next_page = response.css('a.pagination__older::attr(href)').extract_first()
if next_page is not None:
next_page = response.urljoin(next_page)
yield scrapy.Request(next_page, callback=self.parse)
def parseContent(self, response):
dict = response.meta["dict"]
dict["content"] = response.css("article.post-container>section.post").extract_first()
self.log("aa:%s" % dict)
filename = str(dict["title"])
with open(filename, 'w',encoding='utf-8') as f:
f.write(str(dict["content"]))
self.log("保存文件:%s" % filename)
#将item对象返回给pipeline
yield dict
scrapy学习记录(使用pipeline进行数据持久化)
打赏作者
- 本文链接: https://hefengwei.com/archives/1591174607
- 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!