Python: Python网络协议和编程

Socket 编程基础

主要内容:

网络概念

每一个开发工程师,对编程方面的每一个知识都应该做一定的了解。

什么是网络

网络是由节点和连线构成的图,表示诸多对象及其关系。

什么是计算机网络

计算机网络指的是将地理位置不同的具有独立功能的多台计算机及其外部设备, 通过通信线路物理连接(包括有线、无线连接),并在网络操作系统、网络管理 软件和网络通信协议的管理和协调下,实现 资源共享信息传递 的计算机系统。

带宽

在数字设备中,指的是单位时间数据的传输量。

网络传输习惯上使用比特率,即bps每秒传输的二进制位数。

常见的100M网络,实际上指的是理论上的下行速度为100Mbps,换算得12.5MBps。

拓扑

总线型

img_20250312_102532.png

所有设备都连接到公共总线上,结点间使用广播通信方式。一个结点发出的信息,总线上所有其他结点都可以接收到。一段时间只允许一个结点独占总线。

常见使用同轴电缆连接,总线两端需要终结器。

优点

  • 结构简单、易于实现
  • 易于扩充,增加或者移除结点比较灵活
  • 可靠性较高,个别结点发生故障时,不影响网络中其他结点的正常工作

缺点

  • 网络传输能力低,安全性低,总线发生故障时,会导致全网瘫痪
  • 所有数据都需要经过总线传输,总线是整个网络的瓶颈。结点数量的增多会影响网络性能

环形结构

img_20250312_104723.png

环形结构是将联网的计算机由通信线路连接成一个闭合的环,在环形结构网络中信息按照固定方向流动,或顺时针方向,或逆时针方向。

优点

  • 令牌控制,没有线路竞争,实时性强,传输控制容易

缺点

  • 维护困难,可靠性不高。一个结点发生故障时,可能导致全网瘫痪。可以使用双环拓扑结构,但是复杂性提升

星型拓扑

img_20250312_105213.png

每个结点都由一条单独的通信线路与中心结点连结。其他各结点都与该中心结点 有着物理链路的直接互连,其他结点直接不能直接通信,其他结点直接的通信需 要该中心结点进行转发。因此中心结点必须有着较强的功能和较高的可靠性。需 要中心设备,例如hub、switch、router

优点

  • 可靠性高,方便管理,易于扩展,传输效率高

缺点

  • 线路利用率低,中心节点需要很高的可靠性和冗余度

注意, hub工作在一层,这种星型实际上就是芯片化的总线网络。只是物理拓扑结构上感觉像是星型。

OSI参考模型

OSI是Open System Interconnection的缩写,意为开放式系统互联。国际标准化 组织(ISO)制定了OSI模型,该模型定义了不同计算机互联的标准,是设计和描 述计算机网络通信的基本框架。

OSI模型把网络通信的工作分为7层,分别是物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。

img_20250312_114550.png
img_20250312_160212.png
  • 物理层: 定义了电气规范,设备规范、物理接口等,电信号的变化,或数字信号变化,比特。
  • 链路层: 二层。将比特组织成帧,即对字节进行定义,支持错误检查。使用物理地址、MAC地址。MAC有48位,前24位厂商编号由IEEE分配,后24位设备序号。
  • 网络层: 三层。将帧组织成包,包传递的路径选择(路由),将包传输到目标地址。使用逻辑地址、IP地址。
  • 传输层: 四层。解决传输的问题,确保数据传输的可靠性;建立、维护、终止虚拟电路;错误检测和恢复。
  • 会话层: 建立、管理、终止应用程序间的逻辑通路,即会话。
  • 表示层: 对应用数据格式化、加密解密等。将上层数据转换为适合网络传输的格式,或将下层数据转化为上。
  • 应用层: 七层。为应用程序提供网络服务接口,用户使用的时候并不关心会话如何建立保持,也不关心协议的协商是否加密等。

数据传输

img_20250313_004413.png

数据很大,在应用层切分,每一份数据都会在下一层被封装。

在数据链路层会增加tail即校验位,最后在物理层上都是电平信号0、1发送出去。

到了对端设备,由下至上逐层解包组合。直到合成并还原应用层的一份数据。

通讯的三种模式

  • 单播: 包在计算机网络传输中,目的地址为单一目标的传输方式。每次都是点对点的2个实体间相互通信
  • 广播: 数据在网络中传输,目标地址是网络中所有设备的传输方式。所有设备是有范围的,这个范围称为广播域。IP v6不支持广播,由组播替代
  • 多播、组播:把数据同时传递给一组目的地址。数据源只发出一份数据,会在尽可能远的的设备上复制和分发

冲突域、广播域: 参看https://baike.baidu.com/item/%E5%B9%BF%E6%92%AD%E5%9F%9F

冲突域 网络中设备A发送数据时,设备B也发送数据,数据碰撞,发生了冲突,这两个设备就属于同一个冲突域 交换机可以隔离冲突域 路由器可以隔离广播域

局域网LAN

局域网Local Area Network,指的是某一个区域内,多台计算机互联的计算机组。

常见组网设备:网线、有线网卡、无线网卡、集线器、交换机、路由器等

网络设备

网络线缆

  • 有线连接,需要使用网线,最早使用同轴电缆,后来使用双绞线,现在高速网络布线可以采用光纤
  • 常用的双绞线使用RJ45水晶头
  • 直通采用两端T568B,互连使用一端T568A一端T568B的交叉线,不过目前新型网卡可以自适应,都使用直通线连接即可
img_20250313_014002.png

集线器hub

  • 工作在一层。使用HUB连接的设备看似是星型,实际是总线型。
  • 它是物理层设备,只认识电信号,所以根本不认识什么MAC地址之类的信息。早期用来多机互连,信号中继的作用户。
  • 连入设备越多,广播信号,在一个冲突域,网络效率很低。
  • 使用HUB连接的所有设备,都在同一个冲突域。

交换机switch

  • 工作在二层。内部记录着MAC表,通过自学习,建立交换机端口和MAC地址对应表。内部有电路交换数据,如同信号立交桥。网桥也工作这一层

路由器Router

  • 工作在三层。内部记录路由表,记录着路由器的端口到 网络 对应关系。这个表可以静态配置,也可以动态更新
  • 功能:分割广播域;选择路由;维护和检查路由信息;连接广域网

广域网WAN

广域网,又称外网、公网。连接不同局域网或城域网的计算机通讯网络

互联网Internet

互联网Internet,也称因特网。前身是美国军用ARPA网,后来连入了很多的科研院校,并逐步商业化走向全球。

它连接了覆盖全球的网络,是众多的广域网互联的大型网络。

互联网使用了TCP/IP协议。

TCP/IP协议栈

TCP/IP,Transmission Control Protocol/Internet Protocol ,传输控制协议/因特网互联协议。

它是一个包含很多工作在不同层的协议的协议族,其中最著名的2个协议分别是TCP和IP协议。

它最早起源于美国国防部(缩写为DoD)的ARPA网项目,1982年应用于美国所有军事网络。 IBM、AT&T、DEC从1984年起就开始使用TCP/IP协议。TCP/IP更加广泛的传播是在1989年, 加州大学伯克利分校在BSD中加入了该协议。微软是在Win95中增加。

TCP/IP协议,共定义了四层:网络访问层、Internet层、传输层、应用层。

img_20250313_015205.png

TCP/IP协议是事实标准。目前局域网和广域网基本上也都用该协议。

传输层协议

  TCP UDP
连续类型 面向连接 无连接
可靠性 可靠 不可靠
有序 数据包有序号 没有包序
使用场景 大多数场合,数据不能出任何问题 视频、音频

连接

  • TCP需要通讯双方预先建立连接,需要三次握手、四次断开
  • UDP不需要预先建立连接

可靠性

  • TCP需要确定每一个包是否收到,丢包重发,效率低一些
  • UDP尽最大努力交付数据,不要要确认数据包,丢包无法知道,也不重复,效率高一些

有序

  • TCP包有序号,可以进行顺序控制。第一个包序号随机生成,之后的序号都和它有关
  • UDP包无序,无法纠正,只能在应用层进行验证

TCP协议三次握手/四次断开

img_20250313_115707.png

三次握手建立连接 Three-way Handshake

  • Client端首先发送一个SYN包告诉Server端我的初始序列号是X,发送 [SYN] seq=x
  • Server端收到SYN包后回复给Client一个ACK确认包,告诉Client说我收到了。 Server端也需要告诉Client端自己的初始序列号,于是Server也发送一个SYN 包告诉Client我的初始序列号是Y。 [SYN ACK] seq=y ack=x+1 。ack=x+1表示 服务器端期望客户端下一次序号从x+1开始。
  • Client收到后,回复Server一个ACK确认包说我知道了 [ACK] seq=x+1 ack=y+1

表述2

  • 请求端(通常称为客户)发送一个 SYN 报文段指明打算连接的服务器的端口,以及初始序号(ISN)。这是报文段 1。
  • 服务器发回包含服务器的初始序号的 SYN 报文段(报文段 2)作为应答。同时,将确认序号设置为客户的 ISN 加 1 以对客户的 SYN 报文段进行确认。一个 SYN 将消耗一个序号。
  • 客户必然将确认序号设置为服务器的 ISN 加 1 以对服务器的 SYN 报文段进行确认(报文段 3)。

表述3

  • 客户端发送 SYN 报文,请求建立连接 (客户端进入 SYN-SEND 状态)
  • 服务端收到 SYN 报文,回复 SYN + ACK 报文,表示同意建立连接 (服务端进入 SYN-RECEIVED 状态,如果服务端停留在这个状态一段时间会自动关闭)
  • 客户端接收到 SYN + ACK 报文,回复 ACK 报文,表示连接建立成功 (客户端和服务端同时进入 ESTABLISHED 状态)

数据传输

  • 客户端发送 [PSH] seq=x+1 ack=y+1 len=3 ... 。seq=x+1表示客户端当前序列号; ack=y+1是期望从服务器端返回的seq号;len=3表示当前发送的字节大小。
  • 服务器端返回的响应 [ACK] seq=y+1 ack=x+1+3 len=0 ... 。seq=y+1正是客户端期望 的服务器端的序号;ack=x+4说明服务器端已经接受了3个字节数据,希望从x+4开始 接收下一次数据;len=0表示服务器端只是响应无数据发送给客户端所以长度为0.
  • 客户端再发送数据 [PSH] seq=x+4 ack=y+1 len=10 ...
  • 服务端响应 [ACK] seq=y+1 ack=x+14 len=0 ...
  • 客户端发送重复上面的过程。
img_20250313_115835.png

四次断开

  • Client发送一个FIN包来告诉Server需要断开
  • Server收到后回复一个ACK确认FIN包收到
  • Server在自己也没数据发送给Client后,Server也发送一个FIN包给Client,表示也无数据发送
  • Client收到后,就会回复一个ACK确认Server的FIN包

表述2

  • 客户端向服务端发送 FIN 报文,请求断开连接 (客户端进入FIN-WAIT-1 状态)
  • 服务端收到 FIN 报文,回复 ACK 报文 (服务端进入 CLOSE-WAIT 状态, 客户端收到来自服务端的确认后,就进入 FIN-WAIT-2 状态,等待服务端发出的连接释放报文段。)
  • 服务端将未传输完的数据传输完后,向客户端发送 FIN 报文,请求断开连接 (服务端进入 LAST-ACK 状态)
  • 客户端收到 FIN 报文,回复 ACK 报文 (客户端进入 TIME-WAIT 状态,一段时间后 CLOSEED)此时服务端进入 CLOSEED 状态

主动发出Fin包就是主动关闭方,就会进入TIME_WAIT,原因是被动关闭方发来的 FIN包需要确认,万一此包丢失,被动关闭方未收到确认会超时重发FIN包,主动 关闭方还在,可以重发ACK。

IP地址

IP地址是IP协议提供的一种同一个地址格式,它为互联网上的网络设备分配一个用来通信的逻辑地址。

目前分为IP v4和IP v6。

IP v4

IP v4 是一个32位二进制数,不便记忆,为了使用方便,使用“点分十进制”表 示法,将这个二进制数每8位断开,每8位是一个字节,一个字节表示的十进制正 整数范围是0~255。

IP v4地址早期比较充足,随着全球连入互联网,在2011年IP v4地址分配完毕。

IP地址的分类

  • 公有地址:需向因特网信息中心申请,在互联网上可以直接使用的IP地址
  • 私有地址:不需要注册,可以组织内部网络使用的IP地址

IP地址这个数被分成2部分,即网络位 + 主机位

网络位表示设备同属一个网络;主机位表示网络中不同的设备的唯一ID

子网掩码

  • 子网掩码将IP地址划分为网络ID和主机ID
  • IP地址 位与 子网掩码就是网络ID

IP v4地址被分为A、B、C、D、E五类

类别 最大网络数 IP地址范围 单个网段最大主机数 私有IP地址范围
A类 126(2^7-1-1) 1.0.0.0 - 127.255.255.255 16777214 10.0.0.0 - 10.255.255.255
B类 16384(2^14) 128.0.0.0 - 191.15.255.255 65534 172.16.0.0 -172.31.255.255
C类 2097152(2^21) 192.0.0.0 - 223.167.255.255 254 192.168.0.0 - 192.168.255.255
A类

0 0000000 - 0 1111111.X.Y.Z : 0-127.X.Y.Z

网络 ID位是最高8位,最高位为 0,可变的只有 7 位,主机 ID 是 24 位低位

二进制表示为:00000001 00000000 00000000 00000000 至 01111111 11111111 11111111 11111111

网络数:126=2^7-2 即 2^ 可变是的网络ID位数 -(0 和 127 不能要)

每个网络中的主机数:2^24-2=16777214 即 2^主机 id 的位数 - (全为 0的 网络地址和 全为 1 的广播地址)

默认子网掩码:255.0.0.0

私网地址:10.0.0.0

范例:114.114.114.114,8.8.8.8,1.1.1.1,58.87.87.99,119.29.29.29

B类

10 000000 - 10 111111.X.Y.Z:128-191.X.Y.Z

最高位10

二进制表示为:10000000 00000000 00000000 00000000 至10111111 11111111 11111111 11111111

网络ID位是最高16位,主机ID是16位低位

网络数:2^14=16384

每个网络中的主机数:2^16-2=65534

默认子网掩码:255.255.0.0

私网地址:172.16.0.0-172.31.0.0

范例:180.76.76.76,172.16.0.1

C类

110 0 0000 - 110 1 1111.X.Y.Z: 192-223.X.Y.Z

最高位110

二进制表示为:11000000 00000000 00000000 00000000 至11011111 11111111 11111111 11111111

网络ID位是最高24位,主机ID是8位低位

网络数:2^21=2097152

每个网络中的主机数:2^8-2=254

默认子网掩码:255.255.255.0

私网地址:192.168.0.0-192.168.255.0

范例: 223.6.6.6

D类

组(多)播,1110 0000 - 1110 1111.X.Y.Z: 224-239.X.Y.Z

多播地址最高4位必须是1110,那么地址范围就是224.0.0.0到239.255.255.255

224.0.0.1特指所有主机

E类
保留未使用,240-255

特殊IP地址

  • 0.0.0.0表示当前主机
  • 255.255.255.255 限制广播地址。路由器不会转发这个受限广播地址的数据报文,此地址只能用于本网广播
  • IP地址中以127开头的地址称为Loopback回环地址
  • 169.254.x.x,windows主机使用了DHCP获取IP,但没有获得地址,windows会临时获得这样的地址

网关GATEWAY

  • 网关(Gateway)又称网间连接器、协议转换器。网关在网络层以上实现网络互连,用于网络间互联

举例

  • IP地址192.168.3.200,要配合子网掩码使用,假设子网掩码为255.255.255.0,说明它是C类地址
  • 网络ID为192.168.3.0,广播地址为192.168.3.255
  • 剩余192.168.3.1~192.168.3.254能够分配给网络中其他设备

网关地址配置一般习惯使用1、100、254等。本例使用192.168.3.1

其作用是连接不同的网络,也称为处在不同的网段

  • 又有一个IP地址为192.168.100.10/24,它也是C类地址,网络ID是192.168.100.0
  • 和上面的IP处在不同的网络,这两个地址的主机通信,就需要使用网关,由网关将数据包转发到另一个网络

IP v6

互联网上的公有地址在2011年分配完,而随着互联网的发展,接入设备越来越多,尤其是物联网的到来,此问题必须解决。由此,提出了IP v6。

IP v6采用128位二进制数表示,基本解决IP地址短缺现象,同时,该协议还解决原有协议的诸多问题。

冒号表示法:

  • fd36:1cbf:beba:0:a430:75cd:ad32:65b9,冒号切成8段,16位为一段,16位是2个字节。
  • 前导0可以省略

0位压缩表示法:

  • 如果冒号表示法中,中间部分是连续的0,可以直接将这部分压缩成为::,但是只能压缩一次。
  • fd36:0:0:0:0:0:0:65b9可以表示为fd36::65d9
  • 0:0:0:0:0:0:0:0可以表示为::
  • 0:0:0:0:0:0:0:1可以表示为::1

路由Routing

跨网络通信就需要使用路由,通过路由器将数据包从一个网络发往另一个网络。

路由器上维护着路由表,它知道如何将数据包发往另外的网络。

windows使用 route print ,Linux使用 route -n 或者 ip route 可以查看路由表。

路由器所有端口都有自己的IP地址,这些IP地址往往处在不同的网络,所以,路由器连接了不同了网络。

路由表中记录着路由设备所有端口对应的网络,分为静态、动态配置。

  • 静态路由:由管理员手动配置的固定的路由信息。
  • 动态路由:网络中的路由器,根据实时网络拓扑变化,相互通信传递路由信息,利用这些路由信息通过路由选择协议动态计算,并更新路由表。常见的协议有RIP、OSPF等等。
img_20250314_000126.png

网关:下一跳地址,就是到下一个网络,从哪个网关出去

  • 到192.168.0.0/24和10.0.0.0/8网络,R1本身就直接连接着这些网络,所以网关为空,不需要
  • 到172.16.0.0/16网络需要找到R2,所以写R2的接口1地址即可

DHCP

动态主机设置协议(Dynamic Host Configuration Protocol,DHCP)是一个局域网的网络协议,基于UDP协议工作。

主要用途就是用于内部网或网络服务供应商自动给网络中的主机分配IP地址。

网络编程

Socket介绍

img_20250314_001955.png

socket本意为插座。比如,有2面墙,进程就像墙上的管道,如果想让两面墙上的进程通信,那么两面墙上的进程都必须有一个座子。

Socket套接字

  • Python中提供socket.py标准库,非常底层的接口库。
  • Socket是一种通用的网络编程接口,和网络层次没有一一对应的关系。

协议族

  • AF表示Address Family,用于socket()第一个参数
名称 含义
AF_INET IPV4
AF_INET6 IPV6
AF_UNIX Unix Domain Socket, windows没有

Socket类型

名称 含义
SOCK_STREAM 面向连接的流套接字。默认值,TCP协议
SOCK_DGRAM 无连接的数据文套戒子。UDP协议

TCP协议是流协议,也就是一个大段数据看作字节流,一段段持续发送这些字节。

UDP协议是数据报协议,每一份数据封在一个单独的数据报中,一份一份发送数据。

TCP编程

Socket编程,是完成一端到另一端通信的,注意一般来说这两端分别处在不同的进程中, 也就是说网络通信是一个进程发送消息到另一个进程。

我们写代码的时候,每一个socket对象只表示了其中的一端。

从业务角度来说,这两端从角色上分为:

  • 主动发送请求的一端,称为客户端Client
  • 被动接受请求并回应的一端,称为服务端称为Server

这种编程模式也称为 C/S编程

TCP服务端编程

服务端编程步骤
  • 创建Socket对象
  • 绑定IP地址Address和端口Port。使用bind()方法

    IPv4地址为一个二元组('IP地址字符串‘, Port)

  • 开始监听,将再指定的IP端口上监听。使用listen()方法
  • 获取用于传送数据的新socket对象

    socket.accrpt() -> (socket object, address info)

    accept方法阻塞等待客户端创立连接,返回一个新的Socket对象和客户端地址的二元组

    地址是远程客户端的地址,IPv4中它是一个二元组(clientaddr, port)

    • 接受数据

      recv(bufsize[, flags]) 使用缓冲区接受数据

    • 发送数据

      send(bytes) 发送数据

Server端开发
socket对象 --> bind((IP, PORT)) --> listem --> accept --> close
                                                  | --> recv or send --> close
img_20250314_140643.png

问题

  • 两次绑定同一个监听端口会怎么样?
import socket
import time

#socket对象占用文件描述符,fd是有限制的
server = socket.socket() #默认TCP协议 ipv4
addr = ('127.0.0.1', 9999)
# bind
server.bind(addr)
# server.bind(addr) #只能绑定一次
server.listen() # 监听, 等待3次握手建立稳定可靠连接
print(server)

# 等待连接
newsocket, raddr = server.accept() # 阻塞,直到有一个建立的连接可以接进来
print('-' * 30)
print(newsocket) #用来和客户端进程的socket通信
print(raddr)     #二元组,套接字,对端(客户端)ip和端口

data = newsocket.recv(1024) # 阻塞,直到收到数据 bytes类型
print(type(data), data)
msg = b'hello world'
newsocket.send(msg) # 向对端发送数据

print('=' * 30)
newsocket.close()
server.close()

#返回结果
# <socket.socket fd=356, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>
# ------------------------------
# <socket.socket fd=368, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 55782)>
# ('127.0.0.1', 55782)
# <class 'bytes'> b'1111'
# ==============================
img_20250314_180921.png
import socket
import time

#socket对象占用文件描述符,fd是有限制的
server = socket.socket() #默认TCP协议 ipv4
addr = ('127.0.0.1', 9999)
# bind
server.bind(addr)
# server.bind(addr) #只能绑定一次
server.listen() # 监听, 等待3次握手建立稳定可靠连接
print(server)

# 等待连接
newsocket, raddr = server.accept() # 阻塞,直到有一个建立的连接可以接进来
print('-' * 30)
print(newsocket) #用来和客户端进程的socket通信
print(raddr)     #二元组,套接字,对端(客户端)ip和端口
# print(newsocket.getpeername()) # 对端的ip和端口
# print(newsocket.getsockname()) # 自己的ip和端口

data = newsocket.recv(1024) # 阻塞,直到收到数据 bytes类型
print(type(data), data)
msg = b'hello world'
newsocket.send(msg) # 向对端发送数据

newsocket2, info2 = server.accept() #请入第二个连接到新的socket上通信
print(newsocket2)
print(info2)
msg = b'2 connection~~~~\r\n'
newsocket2.send(msg) # 向对端发送数据
data = newsocket2.recv(1024) # 阻塞,直到收到数据 bytes类型
print(data, '++++')

print('=' * 30)
newsocket.close()
newsocket2.close()
server.close()

#返回结果
# <socket.socket fd=308, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>
# ------------------------------
# <socket.socket fd=312, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 63873)>
# ('127.0.0.1', 63873)
# <class 'bytes'> b'1111\n'
# <socket.socket fd=352, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 63912)>
# ('127.0.0.1', 63912)
# b'3333\n' ++++
# ==============================

客户端操

#nc 连接
PS D:\tmp> nc 127.0.0.1 9999                                                                                            
1111                                                                                                                    
hello world                                                                                                             
4444                                                                                                                    
PS D:\tmp>

#再nc连接
PS C:\Users\Administrator> nc 127.0.0.1 9999
2 connection~~~~
3333
PS C:\Users\Administrator>
img_20250314_183434.png

上例accept和recv是阻塞的,主线程经常被阻塞住而不能工作。怎么办?

查看监听端口

#windoes 命令
netstat -anp tcp | findstr 9999

#linux 命令
netstat -tanl | grep 9999
ss -tanl | grep 9999
实战 – 写一个群聊程序
  • 需求分析

    聊天工具是CS程序,C是每一个客户端client,S是服务器端server。

    服务器应该具有的功能:

    • 启动服务,包括绑定地址和端口,并监听
    • 建立连接,能和多个客户端建立连接
    • 接收不同用户的信息
    • 分发,将接收的某个用户的信息转发到已连接的所有客户端
    • 停止服务
    • 记录连接的客户端
  • 代码实现

    服务端应该对应一个类

    class ChatServer:
        def __init__(self, ip, port): # 启动服务
            self.sock = socket.socket()
            self.addr = (ip, port)
    
        def start(self): # 启动监听
            pass
    
        def accept(self): # 多人连接
            pass
    
        def recv(self): # 接受客户端数据
            pass
    
        def stop(self): # 停止服务
            pass
    

    在此基础上,扩展完成

    #TCP Chat Server
    import socket
    import datetime
    import time
    import logging
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    #Server 面向对象封装,class类得有哪些方法和属性?
    
    class Server: #初始化配置、启动服务、关闭服务
        def __init__(self, ip='127.0.0.1', port=9999):
            self.sock = socket.socket() #TCP ipv4
            self.addr = ip, port
            #下面2行与服务启动有关,根据业务逻辑放合适的位置
            # self.sock.bind(self.addr)
            # self.sock.listen() #监听了
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
        def stop(self):
            self.sock.close()
    
    #TCP Chat Server
    import socket
    import datetime
    import time
    import logging
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    #Server 面向对象封装,class类得有哪些方法和属性?
    
    class ChatServer: #初始化配置、启动服务、关闭服务
        def __init__(self, ip='127.0.0.1', port=9999):
            self.sock = socket.socket() #TCP ipv4
            self.addr = ip, port
            #下面2行与服务启动有关,根据业务逻辑放合适的位置
            # self.sock.bind(self.addr)
            # self.sock.listen() #监听了
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
            client, raddr = self.sock.accept() #阻塞
            print(client)
            print(raddr) #对端地址
    
            while True:
                data = client.recv(1024) #阻塞
                print(data)
                msg = "msg={}".format(data.decode())
                logging.info(msg)
                msg = msg.encode()
                client.send(msg)
    
    
        def stop(self):
            self.sock.close()
    
    #######
    cs = ChatServer()
    cs.start()
    
    #返回结果
    # <socket.socket fd=312, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 64770)>
    # ('127.0.0.1', 64770)
    # b'1111\n'
    # 2025-03-16 14:09:20,695 MainThread 1776 msg=1111
    
    # b'2222\n'
    # 2025-03-16 14:09:22,693 MainThread 1776 msg=2222
    
    # b'hello world\n'
    # 2025-03-16 14:09:29,704 MainThread 1776 msg=hello world
    

    客户端连接

    > nc 127.0.0.1 9999
    1111
    msg=1111
    2222
    msg=2222
    hello world
    msg=hello world
    

    使用多线程

    #TCP Chat Server
    import socket
    import datetime
    import time
    import logging
    import threading
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    #Server 面向对象封装,class类得有哪些方法和属性?
    
    class ChatServer: #初始化配置、启动服务、关闭服务
        def __init__(self, ip='127.0.0.1', port=9999):
            self.sock = socket.socket() #TCP ipv4
            self.addr = ip, port
            #下面2行与服务启动有关,根据业务逻辑放合适的位置
            # self.sock.bind(self.addr)
            # self.sock.listen() #监听了
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
            while True:
                client, raddr = self.sock.accept() #阻塞
                print(client)
                print(raddr) #对端地址
    
                #启动线程
                threading.Thread(target=self.recv, name='recv', args=(client, raddr)).start()
    
        def recv(self, client, raddr):
            while True:
                data = client.recv(1024) #阻塞
                print(data)
                msg = "{:%m%d %H:%M:%S}From {}:{} msg={}".format(
                    datetime.datetime.now(),
                    *raddr,
                    data.decode())
                logging.info(msg)
                msg = msg.encode()
                client.send(msg)
    
        def stop(self):
            self.sock.close()
    
    #######
    cs = ChatServer()
    cs.start()
    

    基本功能完成,但是有问题。使用Event改进

    #TCP Chat Server
    import socket
    import datetime
    import time
    import logging
    import threading
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    #Server 面向对象封装,class类得有哪些方法和属性?
    
    class ChatServer: #初始化配置、启动服务、关闭服务
        def __init__(self, ip='127.0.0.1', port=9999):
            self.sock = socket.socket() #TCP ipv4
            self.addr = ip, port
            self.event = threading.Event()
    
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
            threading.Thread(target=self.accept, name='accept', args=()).start()
    
        def accept(self):
            count = 1
            while not self.event.is_set():
                client, raddr = self.sock.accept() #阻塞
                print(client)
                print(raddr) #对端地址
    
                #启动线程
                threading.Thread(
                    target=self.recv, 
                    name='recv-{}'.format(count), 
                    args=(client, raddr)).start()
                count += 1
    
        def recv(self, client, raddr):
            while not self.event.is_set():
                data = client.recv(1024) #阻塞
                print(data)
                msg = "{:%m%d %H:%M:%S}From {}:{} msg={}".format(
                    datetime.datetime.now(),
                    *raddr,
                    data.decode())
                logging.info(msg)
                msg = msg.encode()
                client.send(msg)
    
        def stop(self):
            self.event.set()
            self.sock.close()
    
    #######
    cs = ChatServer()
    cs.start()
    
    print('=' * 30)
    while True: #主线程监管
        time.sleep(2)
        print(*threading.enumerate(), sep='\n', end='\n\n')
    

    客户端连接

    #2个客户端连接
    nc 127.0.0.1 9999
    

    服务端输出

    ==============================
    <_MainThread(MainThread, started 2620)>
    <Thread(accept, started 27452)>
    
    
    <_MainThread(MainThread, started 2620)>
    <Thread(accept, started 27452)>
    
    <socket.socket fd=320, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 6196)>
    ('127.0.0.1', 6196)
    
    <_MainThread(MainThread, started 2620)>
    <Thread(accept, started 27452)>
    <Thread(recv-1, started 14080)>
    
    
    <socket.socket fd=344, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 6240)>
    ('127.0.0.1', 6240)
    <_MainThread(MainThread, started 2620)>
    <Thread(accept, started 27452)>
    <Thread(recv-1, started 14080)>
    <Thread(recv-2, started 22988)>
    
    <_MainThread(MainThread, started 2620)>
    <Thread(accept, started 27452)>
    <Thread(recv-1, started 14080)>
    <Thread(recv-2, started 22988)>
    

    主线程交互式管理线程

    #交互式,主线程增加退出命令
    while True:
        cmd = input('>>>').strip()
        if cmd == 'quit':
            cs.stop()
            logging.info('Server quits~~~~')
            break
        print(*threading.enumerate(), sep='\n', end='\n\n')
        print()
    

    主线程管理线程

    #方法1,start函数中accept线程属性改为daemon,随主线程退出
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
            threading.Thread(target=self.accept, name='accept', args=(), daemon=True).start()
    

    方法2-记录线程信息

    #TCP Chat Server
    import socket
    import datetime
    import time
    import logging
    import threading
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    #Server 面向对象封装,class类得有哪些方法和属性?
    
    class ChatServer: #初始化配置、启动服务、关闭服务
        def __init__(self, ip='127.0.0.1', port=9999):
            self.sock = socket.socket() #TCP ipv4
            self.addr = ip, port
            self.clients = {} #客户端
            self.event = threading.Event()
    
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
            threading.Thread(target=self.accept, name='accept', args=(), daemon=False).start()
    
        def accept(self):
            count = 1
            while not self.event.is_set():
                client, raddr = self.sock.accept() #阻塞
                print(client)
                print(raddr) #对端地址
                self.clients[raddr] = client #维护客户端字典
    
                #启动线程
                threading.Thread(
                    target=self.recv, 
                    name='recv-{}'.format(count), 
                    args=(client, raddr)).start()
                count += 1
    
        def recv(self, client, raddr):
            while not self.event.is_set():
                data = client.recv(1024) #阻塞
                print(data)
                msg = "{:%m%d %H:%M:%S}From {}:{} msg={}".format(
                    datetime.datetime.now(),
                    *raddr,
                    data.decode())
                logging.info(msg)
                msg = msg.encode()
                client.send(msg)
    
        def stop(self):
            self.event.set()
            for c in self.clients.values():
                print('close ~~~~', c)
                c.close()
            self.sock.close()
    
    #######
    cs = ChatServer()
    cs.start()
    
    print('=' * 30)
    while True:
        cmd = input('>>>').strip()
        if cmd == 'quit':
            cs.stop()
            logging.info('Server quits~~~~')
            break
        print(*threading.enumerate(), sep='\n', end='\n\n')
        print()
    

    主线程管理,主线程正常退出,其他线程崩溃退出,这里的报错先不处理

    ==============================
    >>><socket.socket fd=332, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11461)>
    ('127.0.0.1', 11461)
    <socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11464)>
    ('127.0.0.1', 11464)
    
    <_MainThread(MainThread, started 19408)>
    <Thread(accept, started 27252)>
    <Thread(recv-1, started 13552)>
    <Thread(recv-2, started 23608)>
    
    
    >>>quit
    close ~~~~ <socket.socket fd=332, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11461)>
    Exception in thread recv-1:
    close ~~~~ <socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11464)>
    Exception in thread recv-2:
    2025-03-16 15:37:05,178 MainThread 19408 Server quits~~~~
    Exception in thread accept:
    Traceback (most recent call last):
    Traceback (most recent call last):
    Traceback (most recent call last):
      File "D:\Program Files\Python\python3130\Lib\threading.py", line 1041, in _bootstrap_inner
        self.run()
        ~~~~~~~~^^
      File "D:\Program Files\Python\python3130\Lib\threading.py", line 1041, in _bootstrap_inner
        self.run()
        ~~~~~~~~^^
      File "D:\Program Files\Python\python3130\Lib\threading.py", line 992, in run
        self._target(*self._args, **self._kwargs)
        ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "D:\Program Files\Python\python3130\Lib\threading.py", line 1041, in _bootstrap_inner
        self.run()
        ~~~~~~~~^^
      File "d:\project\pyprojs\trae\t.py", line 30, in accept
        client, raddr = self.sock.accept() #阻塞
                        ~~~~~~~~~~~~~~~~^^
      File "D:\Program Files\Python\python3130\Lib\socket.py", line 295, in accept
        fd, addr = self._accept()
                   ~~~~~~~~~~~~^^
      File "D:\Program Files\Python\python3130\Lib\threading.py", line 992, in run
        self._target(*self._args, **self._kwargs)
        ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    OSError: [WinError 10038] 在一个非套接字上尝试了一个操作。
      File "d:\project\pyprojs\trae\t.py", line 44, in recv
        data = client.recv(1024) #阻塞
    ConnectionAbortedError: [WinError 10053] 你的主机中的软件中止了一个已建立的连接。
      File "D:\Program Files\Python\python3130\Lib\threading.py", line 992, in run
        self._target(*self._args, **self._kwargs)
        ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "d:\project\pyprojs\trae\t.py", line 44, in recv
        data = client.recv(1024) #阻塞
    ConnectionAbortedError: [WinError 10053] 你的主机中的软件中止了一个已建立的连接。
    

    群聊

    def recv(self, client, raddr):
        ....
            # client.send(msg) #单聊
            #群聊
            # print(self.clients)
            for c in self.clients.values():
                c.send(msg)
    

    这一版基本能用了,测试通过。但是还有要完善的地方

    例如各种异常的判断,客户端断开连接后移除客户端数据等

  • 客户端主动断开带来的问题

    服务端知道自己何时断开,如果客户端断开,服务器不知道。(客户端主动断开,服务端recv会得到一个空串) 所以,好的做法是,客户端断开发出特殊消息通知服务器端断开连接。但是,如果客户端主动断开,服务端主动发送一个空消息,超时返回异常,捕获异常并清理连接 即使为客户端提供了断开命令,也不能保证客户端会使用它断开连接。但是还是要增加这个退出功能

    增加客户端退出命令

    #TCP Chat Server
    import socket
    import datetime
    import time
    import logging
    import threading
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    #Server 面向对象封装,class类得有哪些方法和属性?
    
    class ChatServer: #初始化配置、启动服务、关闭服务
        def __init__(self, ip='127.0.0.1', port=9999):
            self.sock = socket.socket() #TCP ipv4
            self.addr = ip, port
            self.event = threading.Event()
            self.clients = {} #server端和每一个客户端通信的socket
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
            threading.Thread(target=self.accept, name='accept', args=(), daemon=False).start()
    
        def accept(self):
            count = 1
            while not self.event.is_set():
                newsock, raddr = self.sock.accept() #阻塞
                print(newsock)
                print(raddr) #对端地址
                self.clients[raddr] =newsock #维护客户端字典
    
                #启动线程
                threading.Thread(
                    target=self.recv, 
                    name='recv-{}'.format(count), 
                    args=(newsock, raddr)).start()
                count += 1
    
        def recv(self, client, raddr):
            while not self.event.is_set():
                data = client.recv(1024) #阻塞
                print(data, '****')
                if not data:
                    client.close()
                    self.clients.pop(raddr)
                    break
                msg = "{:%m%d %H:%M:%S}From {}:{} msg={}".format(
                    datetime.datetime.now(),
                    *raddr,
                    data.decode())
                logging.info(msg)
                msg = msg.encode()
                # client.send(msg) #单聊
                #群聊
                # print(self.clients)
                for c in self.clients.values():
                    c.send(msg)
    
        def stop(self):
            self.event.set()
            for c in self.clients.values():
                print('close ~~~~', c)
                c.close()
            self.sock.close()
    
    #######
    cs = ChatServer()
    cs.start()
    
    print('=' * 30)
    while True:
        cmd = input('>>>').strip()
        if cmd == 'quit':
            cs.stop()
            logging.info('Server quits~~~~')
            break
        print(*threading.enumerate(), sep='\n', end='\n\n')
        print()
    
    def recv(self, sock:socket.socket, client): # 接受客户端数据
        while not self.event.is_set():
            data = sock.recv(1024) # 阻塞到数据到来
            msg = data.decode().strip()
            # 客户端退出命令
            if msg == 'quit' or msg == '': # 主动断开得到空串
                self.clients.pop(client)
                sock.close()
                logging.info('{} quits'.format(client))
                break
    
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(
                datetime.datetime.now(), *client, data.decode())
            logging.info(msg)
            msg = msg.encode()
            for s in self.clients.values():
                s.send(msg)
    

    程序还是有瑕疵,但是业务基本功能完成了

    线程安全问题:

    由于GIL和内置数据结构的读写原子性,单独操作字典的某一项item是安 全的。但是遍历过程是线程不安全的,遍历中有可能被打断,其他线程如果对字 典元素进行增加、弹出,都会影响字典的size,就会抛出异常。所以还是要加锁 Lock

    回顾字典使用注意事项,在使用keys、values、items方法遍历的时候,不可以改变字典的size。

    import logging
    import time
    import threading
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    global_dict = {}
    
    def additem(d:dict):
        for i in range(100000000):
            d[i] = i
            time.sleep(0.001)
    
    def iterdict(d:dict):
        while True:
            time.sleep(0.1)
            for item in d.items():
                logging.info(item)
    
    add = threading.Thread(target=additem, name='add', args=(global_dict,))
    it_d = threading.Thread(target=iterdict, name='itter', args=(global_dict,))
    add.start()
    it_d.start()
    time.sleep(0.5)
    
    while True:
        print(threading.active_count())
        if threading.active_count() <= 2:
            print(threading.enumerate())
            break
        time.sleep(1)
    
    #返回结果
    # 2025-03-16 23:13:12,224 itter 15688 (0, 0)
    # Exception in thread itter:
    # Traceback (most recent call last):
    #   File "D:\Program Files\Python\python3130\Lib\threading.py", line 1041, in _bootstrap_inner
    #     self.run()
    #     ~~~~~~~~^^
    #   File "D:\Program Files\Python\python3130\Lib\threading.py", line 992, in run
    #     self._target(*self._args, **self._kwargs)
    #     ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    #   File "d:\project\pyprojs\trae\t2.py", line 18, in iterdict
    #     for item in d.items():
    #                 ~~~~~~~^^
    # RuntimeError: dictionary changed size during iteration
    # 2
    # [<_MainThread(MainThread, started 24668)>, <Thread(add, started 26300)>]
    

    加锁后代码如下

    #比较消耗性能的是群发功能
                for c in self.clients.values():
                    c.send(msg)
    

    高性能场景推荐使用C++

    #TCP Chat Server
    import socket
    import datetime
    import time
    import logging
    import threading
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    #Server 面向对象封装,class类得有哪些方法和属性?
    
    class ChatServer: #初始化配置、启动服务、关闭服务
        def __init__(self, ip='127.0.0.1', port=9999):
            self.sock = socket.socket() #TCP ipv4
            self.addr = ip, port
            self.event = threading.Event()
            self.clients = {} #server端和每一个客户端通信的socket
            self.lock = threading.Lock()
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
            threading.Thread(target=self.accept, name='accept', args=(), daemon=False).start()
    
        def accept(self):
            count = 1
            while not self.event.is_set():
                newsock, raddr = self.sock.accept() #阻塞
                print(newsock)
                print(raddr) #对端地址
                with self.lock:
                    self.clients[raddr] =newsock #维护客户端字典
    
                #启动线程
                threading.Thread(
                    target=self.recv, 
                    name='recv-{}'.format(count), 
                    args=(newsock, raddr)).start()
                count += 1
    
        def recv(self, client, raddr):
            while not self.event.is_set():
                data = client.recv(1024) #阻塞
                print(data, '****')
                if not data:
                    client.close()
                    with self.lock:
                        if raddr in self.clients:
                            self.clients.pop(raddr)
                    break
                msg = "{:%m%d %H:%M:%S}From {}:{} msg={}".format(
                    datetime.datetime.now(),
                    *raddr,
                    data.decode())
                logging.info(msg)
                msg = msg.encode()
                # client.send(msg) #单聊
                #群聊
                with self.lock:
                    for c in self.clients.values():
                        c.send(msg)
    
        def stop(self):
            self.event.set()
            with self.lock:
                for c in self.clients.values():
                    print('close ~~~~', c)
                    c.close()
                self.clients.clear()
            self.sock.close()
    
    #######
    cs = ChatServer()
    cs.start()
    
    print('=' * 30)
    while True:
        cmd = input('>>>').strip()
        if cmd == 'quit':
            cs.stop()
            logging.info('Server quits~~~~')
            break
        print(*threading.enumerate(), sep='\n', end='\n\n')
        print()
    
socket常用方法
socket.recv(bufsize[, flags])     #获取数据。默认是阻塞的方式
socket.recvfrom(bufsize[, flags]) #获取数据,返回一个二元组(bytes, address)
socket.recv_into(buffer[, nbytes[, flags]]) 
获取到nbytes的数据后,存储到buffer中。如果nbytes没有指定或0,将buffer大小的数据存入buffer中。返回接收的字节数

socket.recvfrom_into(buffer[, nbytes[, flags]]) #获取数据,返回一个二元组(bytes, address)到buffer中
socket.send(bytes[, flags])       #TCP发送数据
socket.sendall(bytes[, flags])    #TCP发送全部数据,成功返回None
socket.sendto(string[,flag],address)  #UDP发送数据
socket.sendfile(file, offset=0, count=None)
发送一个文件直到EOF,使用高性能的os.sendfile机制,返回发送的字节数。如果win下不支持sendfile,
或者不是普通文件,使用send()发送文件。offset告诉起始位置。3.5版本开始
socket.getpeername()      #返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)
socket.getsockname()      #返回套接字自己的地址。通常是一个元组(ipaddr,port)
socket.setblocking(flag)  #如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)
非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常

socket.settimeout(value)  #设置套接字操作的超时期,timeout是一个浮点数,单位是秒
值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect()

socket.setsockopt(level,optname,value)  #设置套接字选项的值。比如缓冲区大小。太多了,去看文档
不同系统,不同版本都不尽相同
MakeFile
socket.makefile(mode='r', buffering=None, *, encoding=None, errors=None, newline=None)

创建一个与该套接字相关连的文件对象,将recv方法看做读方法,将send方法看做写方法。

#简单使用
import time
import threading
import logging
import socket

FOMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FOMAT, level=logging.INFO)

#makefile
server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen()

newsock, raddr = server.accept()
print(newsock)
print(raddr)
f = newsock.makefile('rw') #mode只能用r w b
print(f)
#print(f.fileno()) #报错。没有文件描述符。makefile包装成类文件对象,只是在内存中开辟读写缓冲区
# data = newsock.recv(1024) #bytes
data = f.read(5) #收,读取5个字符。按行读取要使用readline方法
print(type(data), data)

msg = "msg={}".format(data)
f.write(msg) #发,相当于调用newsock.send方法
f.flush()

newsock.close()
f.close()
print(f.closed, newsock._closed)

server.close()

测试

#客户端连接
PS D:\> nc 127.0.0.1 9999
12345
msg=12345

#服务端输出
<socket.socket fd=332, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 50515)>
('127.0.0.1', 50515)
<_io.TextIOWrapper mode='rw' encoding='cp936'>
<class 'str'> 12345
True True
  • makefile练习

    使用makefile改写群聊类

    #TCP Chat Server
    import socket
    import datetime
    import time
    import logging
    import threading
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    #Server 面向对象封装,class类得有哪些方法和属性?
    
    class ChatServer: #初始化配置、启动服务、关闭服务
        def __init__(self, ip='127.0.0.1', port=9999):
            self.sock = socket.socket() #TCP ipv4
            self.addr = ip, port
            self.event = threading.Event()
            self.clients = {} #server端和每一个客户端通信的socket
            self.lock = threading.Lock()
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen() #监听了
    
            threading.Thread(target=self.accept, name='accept', args=(), daemon=False).start()
    
        def accept(self):
            count = 1
            while not self.event.is_set():
                newsock, raddr = self.sock.accept() #阻塞
                print(newsock)
                print(raddr) #对端地址
                f = newsock.makefile('rw', encoding='utf-8')
                with self.lock:
                    self.clients[raddr] = f, newsock #维护客户端字典
    
                #启动线程
                threading.Thread(
                    target=self.recv, 
                    name='recv-{}'.format(count), 
                    args=(f, newsock, raddr)).start()
                count += 1
    
        def recv(self, f, client, raddr):
            while not self.event.is_set():
                # data = client.recv(1024) #阻塞
                try: #如果出现异常,就认为一定是socket对象的异常
                    #那么这个socket通道一定出了问题
                    data = f.readline().strip() #阻塞等一行来,即换行符。 读进来是str\n或者b''空串客户端断开
                except Exception as e:
                    logging.error(e)
                    data = ''
                print(data, '****')
                if not data or data == 'quit':
                    with self.lock:
                        if raddr in self.clients:
                            self.clients.pop(raddr)
                    client.close()
                    f.close()
                    break
                msg = "{:%m%d %H:%M:%S}From {}:{} msg={}".format(
                    datetime.datetime.now(),
                    *raddr,
                    data)
                logging.info(msg)
                #群聊
                with self.lock:
                    for ff, _ in self.clients.values():
                        ff.write(msg)
                        ff.flush()
    
        def stop(self):
            self.event.set()
            with self.lock:
                for f, c in self.clients.values():
                    print('close ~~~~', c)
                    c.close()
                    f.close()
                self.clients.clear()
            self.sock.close()
    
    #######
    cs = ChatServer()
    cs.start()
    
    print('=' * 30)
    while True:
        cmd = input('>>>').strip()
        if cmd == 'quit':
            cs.stop()
            logging.info('Server quits~~~~')
            break
        print(*threading.enumerate(), sep='\n', end='\n\n')
        print()
    

    测试

    #客户端
    PS D:\> nc 127.0.0.1 9999
    123456hi
    0317 11:44:10From 127.0.0.1:58826 msg=123456hi
    PS D:\> nc 127.0.0.1 9999
    nihao
    0317 11:44:42From 127.0.0.1:58966 msg=nihao haa
    0317 11:44:57From 127.0.0.1:58966 msg=haa
    
    #服务端返回
    >>><socket.socket fd=328, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 58826)>
    ('127.0.0.1', 58826)
    123456hi ****
    2025-03-17 11:44:10,566 recv-1 22724 0317 11:44:10From 127.0.0.1:58826 msg=123456hi
     ****
    <socket.socket fd=348, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 58966)>
    ('127.0.0.1', 58966)
    nihao ****
    2025-03-17 11:44:42,518 recv-2 1924 0317 11:44:42From 127.0.0.1:58966 msg=nihao   
    haa ****
    2025-03-17 11:44:57,631 recv-2 1924 0317 11:44:57From 127.0.0.1:58966 msg=haa
    

    上例完成了基本功能,但是,如果客户端主动断开,或者readline出现异常,就不会从clients中移除作废的socket。可以使用异常处理解决这个问题。

ChatServer实验用完整代码

注意,这个代码为实验用,代码中瑕疵还有很多。Socket太底层了,实际开发中很少使用这么底层的接口。

增加一些异常处理。

#TCP Chat Server
import socket
import datetime
import time
import logging
import threading

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

#Server 面向对象封装,class类得有哪些方法和属性?

class ChatServer: #初始化配置、启动服务、关闭服务
    def __init__(self, ip='127.0.0.1', port=9999):
        self.sock = socket.socket() #TCP ipv4
        self.addr = ip, port
        self.event = threading.Event()
        self.clients = {} #server端和每一个客户端通信的socket
        self.lock = threading.Lock()

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen() #监听了

        threading.Thread(target=self.accept, name='accept', args=(), daemon=False).start()

    def accept(self):
        count = 1
        while not self.event.is_set():
            newsock, raddr = self.sock.accept() #阻塞
            print(newsock)
            print(raddr) #对端地址
            with self.lock:
                self.clients[raddr] =newsock #维护客户端字典

            #启动线程
            threading.Thread(
                target=self.recv, 
                name='recv-{}'.format(count), 
                args=(newsock, raddr)).start()
            count += 1

    def recv(self, client, raddr):
        while not self.event.is_set():
            try:
                data = client.recv(1024) #阻塞
                print(data, '****')
            except Exception as e:
                logging.error(e)
                data = b''
            if not data or data == b'quit':
                client.close()
                with self.lock:
                    if raddr in self.clients:
                        self.clients.pop(raddr)
                break
            msg = "{:%m%d %H:%M:%S}From {}:{} msg={}".format(
                datetime.datetime.now(),
                *raddr,
                data.decode())
            logging.info(msg)
            msg = msg.encode()
            # client.send(msg) #单聊
            #群聊
            with self.lock:
                for c in self.clients.values():
                    c.send(msg)

    def stop(self):
        self.event.set()
        with self.lock:
            for c in self.clients.values():
                print('close ~~~~', c)
                c.close()
            self.clients.clear()
        self.sock.close()

#######
def main():
    cs = ChatServer()
    cs.start()

    print('=' * 30)
    while True:
        cmd = input('>>>').strip()
        if cmd == 'quit':
            cs.stop()
            logging.info('Server quits~~~~')
            break
        print(*threading.enumerate(), sep='\n', end='\n\n')
        print()

if __name__ == '__main__':
    main()

TCP客户端编程

客户端编程步骤

创建Socket对象 连接到远端服务端的ip和port,connect()方法 传输数据 使用send、recv方法发送、接收数据 关闭连接,释放资源

import socket
import time

client = socket.socket() #TCP
print(client)
client.connect(('127.0.0.1', 9999)) #直接服务端
print(client)
print('-' * 30)
time.sleep(2)

client.send(b'I am client 1')
data = client.recv(1024) #阻塞等待
print(data)

client.close()

测试

#启动服务端
PS D:\> nc -lvp 9999
listening on [any] 9999 ...
connect to [127.0.0.1] from SKY-20240801BMX [127.0.0.1] 51227
I am client 1 hai

#客户端连接返回
PS D:\> nc -lvp 9999
listening on [any] 9999 ...
connect to [127.0.0.1] from SKY-20240801BMX [127.0.0.1] 51227
I am client 1 hai

开始编写客户端类

# TCP Chat Client
import time
import datetime
import threading
import logging
import socket

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatClient: #配置,服务端的IP和端口。本地需要
    def __init__(self, dip='127.0.0.1', dport=9999):
        self.__sock = socket.socket() #TCP ipv4
        self.raddr = dip, dport
        self.event = threading.Event()

    def start(self):
        try:
            self.__sock.connect(self.raddr)
            self.__sock.send(b'hello I am client')
        except Exception as e:
            logging.error(f'Connection error: {e}')
            self.stop()

        # while True:
        #     #发数据、收数据
        #     pass
        threading.Thread(target=self.__recv, name='recv', args=()).start()

    def __recv(self):
        while not self.event.is_set():
            try:
                data = self.__sock.recv(1024)
            except Exception as e:
                logging.error(f'Recv error: {e}')
                break

            msg = "msg={}".format(data)
            logging.info(msg)

    def send(self, msg:str):
        msg = msg.strip()
        try:
            self.__sock.send(msg.encode('utf-8'))
        except Exception as e:
            logging.error(f'Send error: {e}')
            self.stop()

    def stop(self):
        if not self.event.is_set():
            self.event.set()
            try:
                self.__sock.shutdown(socket.SHUT_RDWR)
                self.send('quit')
                self.__sock.close()
            except OSError:
                pass

cc = ChatClient()
cc.start()
print('=' * 30)

while True:
    cmd = input('plz input your message: ')
    if cmd.strip() == 'quit':
        cc.stop()
        break
    cc.send(f'{cmd}\n')

同样,这样的客户端还是有些问题的,不过socket仅用于测试和学习,了解前面所学各种技术。

UDP编程

测试命令

#windows查看udp端口
netstat -anp udp | find "9999"
netstat -anbp udp | findstr 9999

#linux下发给服务端数据
echo '123abc' | nc -u 127.0.0.1 9999

UDP服务端编程流程

img_20250318_095407.png
  • 创建socket对象。type=socket.Sock_DGRAM
  • 绑定IP和Port,bing()方法
  • 传输数据
    • 接受数据,socket.recvform(bufsize[flags]),获得一个二元组(string,address)
    • 发送数据,socket.sendto(string, address)发给某地址某信息
  • 释放资源
import datetime
import time
import socket
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

server = socket.socket(type=socket.SOCK_DGRAM)
addr = '127.0.0.1', 9999
server.bind(addr)
print(server)

data = server.recv(1024) #收,阻塞
print(type(data), data)
data, info= server.recvfrom(1024) #阻塞 (value, (ip, port))
print(type(data), data, info)

msg = "your msg={}".format(data.decode('utf-8')).encode()
server.sendto(msg, info) #

server.close()

测试

#客户端连接
PS D:\py> nc -u 127.0.0.1 9999
1111
2222
your msg=2222

#服务端返回
<socket.socket fd=336, family=2, type=2, proto=0, laddr=('127.0.0.1', 9999)>
<class 'bytes'> b'1111\n'
<class 'bytes'> b'2222\n' ('127.0.0.1', 60892)

UDP客户端编程流程

  • 创建socket对象。type=socket.Sock_DGRAM
  • 发送数据,socket.sendto(string, address)发给某地址某信息
  • 接受数据,socket.recvform(bufsize[flags]),获得一个二元组(string,address)
  • 释放资源
import datetime
import time
import socket
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

client = socket.socket(type=socket.SOCK_DGRAM)
addr = '127.0.0.1', 9999

#绑定本地ip和端口
print(client)
client.connect(addr) #laddr 自动获取本地可以用作通信的ip和port; raddr 对端ip和端口也确定了
print(client) #返回... laddr=('127.0.0.1', 49347), raddr=('127.0.0.1', 9999)>
client.send(b'test 111') #需要提前知道对端raddr地址

client.sendto(b'test 222', addr) #laddr 自动获取本地的ip和port用来发送,发送给指定的对端ip和端口
# print(client) #返回... laddr=('0.0.0.0', 55447)>

#收数据,阻塞,只要是知道本地ip和端口的对端都能发送数据。 有不可靠性
data = client.recv(1024) # 阻塞等待数据。 在本地端口上等待数据
print(data)
data, info = client.recvfrom(1024) # 阻塞等待数据(value, (ip, port))
print(data, info)

client.close()

测试

#客户端返回
<socket.socket fd=288, family=2, type=2, proto=0>
<socket.socket fd=288, family=2, type=2, proto=0, laddr=('127.0.0.1', 49739), raddr=('127.0.0.1', 9999)>
b'333\n'
b'444\n' ('127.0.0.1', 9999)

#服务端返回
PS D:\py> nc -ulvp 9999
listening on [any] 9999 ...
connect to [127.0.0.1] from SKY-20240801BMX [127.0.0.1] 49739
test 111test 222333
444

注意:UDP是无连接协议,所以可以只有任何一端,例如客户端数据发往服务端,服务端存在与否无所谓。

UDP编程中bind、connect、send、sendto、recv、recvfrom方法使用。

UDP的socket对象创建后,是没有占用本地地址和端口的。

bind方法    #可以指定本地地址和端口laddr,会立即占用
connect方法 #可以立即占用本地地址和端口laddr,填充远程地址和端口raddr
sendto方法  #可以立即占用本地地址和端口laddr,并把数据发往指定远端。
只有有了本地绑定端口,sendto就可以向任何远端发送数据

send方法    #需要和connect方法配合, 可以使用已经从本地端口把数据发往raddr指定的远端
recv方法    #要求一定要在占用了本地端口后,返回接受的数据
recvfrom方法  #要求一定要占用了本地端口后,返回接受的数据和对端地址的二元祖

练习-UDP版群聊

UDP版群聊服务端代码

服务端类的基本架构

class ChatUDPServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = (ip, port)
        self.sock = socket.socket(type=socket.SOCK_DGRAM)

    def start(self):
        self.sock.bind(self.addr) # 立即绑定
        self.sock.recvfrom(1024) # 阻塞接受数据

    def stop(self):
        self.sock.close()
import datetime
import time
import socket
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class UDPChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = ip, port
        self.sock = socket.socket(type=socket.SOCK_DGRAM)

    def start(self):
        self.sock.bind(self.addr)

        data = self.sock.recv(1024)
        print(data)

    def stop(self):
        self.sock.close()

def main():
    cs = UDPChatServer()
    cs.start()

if __name__ == '__main__':
    main()

测试

#客户端连接
PS D:\py> nc -u 127.0.0.1 9999
ls

#服务端返回
b'ls\n'
def start(self):
    self.sock.bind(self.addr)

    while True:
        data, raddr= self.sock.recvfrom(1024)
        print(data, raddr)
        msg = "{:%Y%m%d %H:%M:%S} From client {}:{}, msg={}".format(
            datetime.datetime.now(), 
            *raddr, data.decode())
        self.sock.sendto(msg.encode(), raddr)

主线程会阻塞住。

import datetime
import time
import socket
import logging
import threading

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class UDPChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = ip, port
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.event = threading.Event()

    def start(self):
        self.sock.bind(self.addr)
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            data, raddr= self.sock.recvfrom(1024)
            print(data, raddr)
            msg = "{:%Y%m%d %H:%M:%S} From client {}:{}, msg={}".format(
                datetime.datetime.now(), 
                *raddr, data.decode())
            self.sock.sendto(msg.encode(), raddr)

    def stop(self):
        self.event.set()
        self.sock.close()

def main():
    cs = UDPChatServer()
    cs.start()

    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            cs.stop()
            break
        print(threading.enumerate())

if __name__ == '__main__':
    main()

测试

#客户端1连接
PS C:\Users\Administrator> nc -u 127.0.0.1 9999
1111
20250318 16:02:41 From client 127.0.0.1:59376, msg=111

#客户端2连接
PS D:\py> nc -u 127.0.0.1 9999
2222
20250318 16:02:47 From client 127.0.0.1:63042, msg=2222

#服务端返回
>>>
[<_MainThread(MainThread, started 29508)>, <Thread(recv, started 30656)>]
>>>b'1111\n' ('127.0.0.1', 59376)
b'2222\n' ('127.0.0.1', 63042)

实例群聊天功能

完整版版代码

import datetime
import time
import socket
import logging
import threading

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class UDPChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = ip, port
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.event = threading.Event()
        self.clients = set() #需要考虑线程安全吗?
        #在多线程中,字典和集合都不可以一边迭代一边改变其长度。
        self.lock = threading.Lock()

    def start(self):
        self.sock.bind(self.addr)
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            try:
                data, raddr= self.sock.recvfrom(1024)
            except Exception as e:
                continue
            with self.lock:
                self.clients.add(raddr)
            print(data, '++++')
            if data.strip() == b'quit':
                logging.info('{} quit'.format(raddr))
                with self.lock:
                    self.clients.remove(raddr) #效率高吗?高用的是key,由ip和端口组成的2元组
                # break #break后recv线程结束了
                continue

            msg = "{:%Y%m%d %H:%M:%S} From client {}:{}, msg={}".format(
                datetime.datetime.now(), 
                *raddr, 
                data.decode())
            # self.sock.sendto(msg.encode(), raddr)
            with self.lock:
                for c in self.clients:
                    self.sock.sendto(msg.encode(), c)

    def stop(self):
        with self.lock:
            for c_info in self.clients:
                self.sock.sendto(b'bye', c_info)
        self.event.set()
        self.sock.close()

def main():
    cs = UDPChatServer()
    cs.start()

    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            cs.stop()
            break
        print(threading.enumerate())
        print(cs.clients)

if __name__ == '__main__':
    main()

测试

#客户端1连接
PS C:\Users\Administrator> nc -u 127.0.0.1 9999
111
20250318 18:36:45 From client 127.0.0.1:51280, msg=111
20250318 18:36:48 From client 127.0.0.1:58554, msg=222
20250318 18:36:49 From client 127.0.0.1:58554, msg=333
bye

#客户端2连接
PS D:\py> nc -u 127.0.0.1 9999
222
20250318 18:36:48 From client 127.0.0.1:58554, msg=222
333
20250318 18:36:49 From client 127.0.0.1:58554, msg=333
quit

#服务端返回
>>>
[<_MainThread(MainThread, started 31716)>, <Thread(recv, started 11816)>]
set()
>>>b'111\n' ++++
b'222\n' ++++
b'333\n' ++++

[<_MainThread(MainThread, started 31716)>, <Thread(recv, started 11816)>]
{('127.0.0.1', 51280), ('127.0.0.1', 58554)}
>>>b'quit\n' ++++
2025-03-18 18:36:58,934 recv 11816 ('127.0.0.1', 58554) quit

[<_MainThread(MainThread, started 31716)>, <Thread(recv, started 11816)>]
{('127.0.0.1', 51280)}
>>>quit
UDP版群聊客户端代码
import datetime
import time
import socket
import logging
import threading

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class UDPChatClient:
    def __init__(self, rip='127.0.0.1', rport=9999, name='c1'):
        self.raddr = rip, rport
        self.__sock = socket.socket(type=socket.SOCK_DGRAM)
        self.name = name
        self.event = threading.Event()

    def start(self):
        self.__sock.connect(self.raddr) #laddr raddr
        logging.info(self.__sock)
        self.send('hello, I am {}. {}'.format(self.name, self.__sock.getsockname()))

        #防止主线程阻塞,启动一个线程来接收
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            data, info = self.__sock.recvfrom(1024)
            #能否接收非服务器发来的信息. connect不予理睬,非connect可以授受任意地址发来的信息
            print(data, '++++', info)
            # msg = data.decode()
            # logging.info(msg)

    def send(self, msg:str):
        if msg.strip():
            self.__sock.sendto(msg.encode(), self.raddr)

    def stop(self):
        self.event.set()
        self.send('quit')
        self.__sock.close()

def main():
    cc = UDPChatClient()
    cc.start()

    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            cc.stop()
            break
        cc.send(cmd)
        print(threading.enumerate())

if __name__ == '__main__':
    main()

上面的例子并不完善,如果客户端断开了,服务端不知道。每一个服务端还需要对所有客户端发送数据,包括已经断开的客户端。

问题:服务端如何知道客户端断开了呢?

心跳机制

增加心跳heartbeat机制或ack机制。这些机制同样可以用在TCP通信的时候。

心跳,就是一端定时发往另一端的信息,一般每次数据越少越好。心跳时间间隔约定好就行。

ack即响应,一端收到另一端的消息后返回的确认信息。

心跳机制

  • 一般来说是客户端定时发往服务端的,服务端并不需要ack回复客户端,只需要记录该客户端还活着就行了
  • 如果是服务端定时发往客户端的,一般需要客户端ack响应来表示活着,如果没有收到ack的客户端,服务端移除其信息。这种实现较为复杂,用的较少
  • 也可以双向都发心跳的,用的更少

在服务器端代码中使用第一种心跳机制改进

import datetime
import time
import socket
import logging
import threading

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class UDPChatServer:
    def __init__(self, ip='127.0.0.1', port=9999, interval=10):
        self.addr = ip, port
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.event = threading.Event()
        self.clients = {} #记录客户端, 改为字典
        #在多线程中,字典和集合都不可以一边迭代一边改变其长度。
        self.lock = threading.Lock()
        self.interval = interval # 默认10秒,超时就要移除对应的客户端。 一般是心跳周期2~3个。
        #1 单独心跳接口
        #2 混着用

    def start(self):
        self.sock.bind(self.addr)
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        removed_keys = [] # 清理超市
        while not self.event.is_set():
            try:
                data, raddr= self.sock.recvfrom(1024)
            except Exception as e:
                continue
            with self.lock:
                self.clients[raddr] = time.time()
            print(data, '++++')

            current  = datetime.datetime.now().timestamp()
            if data == b'^hb^': #heartbeat info 心跳信息
                self.clients[raddr] = current #要么拆开心跳包读取里面的时间记录下来。记录心跳时间
                continue

            if data.strip() == b'quit':
                logging.info('{} quit'.format(raddr))
                with self.lock:
                    self.clients.pop(raddr) #效率高吗?高用的是key,由ip和端口组成的2元组
                # break #break后recv线程结束了
                continue

            self.clients[raddr] = current #如果发送过来的不是心跳信息、quit,就记录当前时间
            msg = "{:%Y%m%d %H:%M:%S} From client {}:{}, msg={}".format(
                datetime.datetime.now(), 
                *raddr, 
                data.decode())
            with self.lock:
                for c_info, stamp in self.clients.items():
                    if current - stamp > self.interval:
                        #> interval 超时; 0 <- delta <= interval 正常; < 0 异常
                        removed_keys.append(c_info)
                    else:
                        self.sock.sendto(msg.encode(), c_info)
            # for index in range(1, len(removed_keys)+1):
            #     self.clients.pop(removed_keys[-1])
            #     removed_keys.pop()
            for c in removed_keys:
                self.clients.pop(c)
            removed_keys.clear()

    def stop(self):
        with self.lock:
            for c_info, _ in self.clients:
                self.sock.sendto(b'bye', c_info)
        self.event.set()
        self.sock.close()

def main():
    cs = UDPChatServer()
    cs.start()

    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            cs.stop()
            break
        print(threading.enumerate())
        print(cs.clients)

if __name__ == '__main__':
    main()
  • 客户端代码改进

    增加定时发送心跳代码

    import datetime
    import time
    import socket
    import logging
    import threading
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    class UDPChatClient:
        def __init__(self, rip='127.0.0.1', rport=9999, name='c1', interval=5):
            self.raddr = rip, rport
            self.__sock = socket.socket(type=socket.SOCK_DGRAM)
            self.name = name
            self.event = threading.Event()
            self.interval = interval
    
        def start(self):
            self.__sock.connect(self.raddr) #laddr raddr
            logging.info(self.__sock)
            # self.send('hello, I am {}. {}'.format(self.name, self.__sock.getsockname()))
            #启动心跳线程
            threading.Thread(target=self._heartbeat, name='heartbeat', daemon=True).start()
            #防止主线程阻塞,启动一个线程来接收
            threading.Thread(target=self.recv, name='recv').start()
    
        def _heartbeat(self):
            while not self.event.wait(self.interval): #每隔interval秒发送一次心跳包
                self.__sock.send(b'^hb^')
    
        def recv(self):
            while not self.event.is_set():
                data = self.__sock.recv(1024)
                #能否接收非服务器发来的信息. connect不予理睬,非connect可以授受任意地址发来的信息
                print(data, '++++')
                # msg = data.decode()
                # logging.info(msg)
    
        def send(self, msg:str):
            if msg.strip():
                self.__sock.sendto(msg.encode(), self.raddr)
    
        def stop(self):
            self.event.set()
            self.send('quit')
            self.__sock.close()
    
    def main():
        cc = UDPChatClient()
        cc.start()
    
        while True:
            cmd = input('>>>')
            if cmd.strip() == 'quit':
                cc.stop()
                break
            cc.send(cmd)
            # print(threading.enumerate())
    
    if __name__ == '__main__':
        main()
    

UDP协议应用

UDP是无连接协议,它基于以下假设:

  • 网络足够好
  • 消息不会丢包
  • 包不会乱序

但是,即使再局域网,也不能保证不丢包,而且包的达到不一定有序。

应用场景

  • 视频、音频传输,一般来说,丢些包,问题不大,最多丢些图像、听不清话语,可以重新发话语来解决。
  • 海量采集数据,例如传感器发来的数据,丢几十、几百条数据也没有关系。
  • DNS协议,数据内容小,一个包就能查询到结果,不存在乱序,丢包,重新请求解析。

一般来说,UDP性能优于TCP,但是可靠性要求高的场合的还是要选择TCP协议。

SocketServer

socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层API进行封装。

Python的封装就是socketserver模块。它是网络服务编程框架,便于企业级快速开发

类的继承关系

+------------+
| BaseServer |
+------------+
      |
      v
+-----------+        +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+        +------------------+
      |
      v
+-----------+        +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+        +--------------------+

SocketServer简化了网络服务器的编写。

它有4个同步类:

  • TCPServer
  • UDPServer
  • UnixStreamServer
  • UnixDatagramServer

2个Mixin类:ForkingMixIn 和 ThreadingMixIn 类,用来支持异步。由此得到

  • class ForkingUDPServer(ForkingMixIn, UDPServer): pass
  • class ForkingTCPServer(ForkingMixIn, TCPServer): pass
  • class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
  • class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

fork是创建多进程,thread是创建多线程。

fork需要操作系统支持,Windows不支持。

编程接口

socketserver.BaseServer(server_address, RequestHandlerClass)

需要提供服务器绑定的地址信息,和用于处理请求的RequestHandlerClass类。

RequestHandlerClass类必须是BaseRequestHandler类的子类,在BaseServer中代码如下:

# BaseServer代码
class BaseServer:
    def __init__(self, server_address, RequestHandlerClass):
        """Constructor. May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def finish_request(self, request, client_address): # 处理请求的方法
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self) # RequestHandlerClass构造

BaseRequestHandler类

它是和用户连接的用户请求处理类的基类,定义为

BaseRequestHandler(request, client_address, server)

服务端Server实例接收用户请求后,最后会实例化这个类。

它被初始化时,送入3个构造参数:request, client_address, server自身。

以后就可以在BaseRequestHandler类的实例上使用以下属性:

  • self.request是和客户端的连接的socket对象
  • self.server是TCPServer实例本身
  • self.client_address是客户端地址

这个类在初始化的时候,它会依次调用3个方法。子类可以覆盖这些方法

# BaseRequestHandler要子类覆盖的方法
class BaseRequestHandler:
    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self): # 每一个连接初始化
        pass

    def handle(self): # 每一次请求处理
        pass

    def finish(self): # 每一个连接清理
        pass

测试代码

from socketserver import ThreadingTCPServer, BaseRequestHandler
import threading

#socketserver模块把socket的套路封装在一起了,只需要提供自定义的RequestHandlerClass
class MyHandler(BaseRequestHandler):
    pass

server = ThreadingTCPServer(('127.0.0.1', 9999), MyHandler)
# self.RequestHandlerClass(request, client_address, self) #这3个参数是server注入的
print( server)

server.handle_request() #处理请求
print('=' * 30)
server.server_close()

测试

#客户端。 连接瞬间断开
PS D:\> nc 127.0.0.1 9999
PS D:\>

#服务端返回
<socketserver.ThreadingTCPServer object at 0x0000026A10DF6900>
==============================
from socketserver import ThreadingTCPServer, BaseRequestHandler
import threading

#socketserver模块把socket的套路封装在一起了,只需要提供自定义的RequestHandlerClass
class MyHandler(BaseRequestHandler):
    def handle(self): #处理请求, 提供了一个处理线程、handler实例 recv
        print(self.client_address) # 客户端的
        print(id(self.server), self.server) 
        print(type(self.request), self.request) #和客户端通信的new socket对象

        print(threading.enumerate())

server = ThreadingTCPServer(('127.0.0.1', 9999), MyHandler)
# self.RequestHandlerClass(request, client_address, self) #这3个参数是server注入的 实例化
print(id(server), server)
print(server.socket)
print(server.__dict__)

# server.handle_request() #处理请求
server.serve_forever() # 循环的调用handle_request
print('=' * 30)
server.server_close()

测试

#客户端。 连接瞬间断开
PS D:\> nc 127.0.0.1 9999
PS D:\>

#服务端返回
1872387008768 <socketserver.ThreadingTCPServer object at 0x000001B3F2F66900>
<socket.socket fd=324, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>
{'server_address': ('127.0.0.1', 9999), 'RequestHandlerClass': <class '__main__.MyHandler'>, '_BaseServer__is_shut_down': <threading.Event at 0x1b3f31539d0: unset>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=324, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>}
('127.0.0.1', 59978)
1872387008768 <socketserver.ThreadingTCPServer object at 0x000001B3F2F66900>
<class 'socket.socket'> <socket.socket fd=332, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59978)>
[<_MainThread(MainThread, started 28292)>, <Thread(Thread-1 (process_request_thread), started 4512)>]

测试结果说明,handle方法相当于socket的recv方法。

每个不同的连接上的请求过来后,生成这个连接的socket对象即self.request,客户端地址是self.client_address

from socketserver import ThreadingTCPServer, BaseRequestHandler
import threading

#socketserver模块把socket的套路封装在一起了,只需要提供自定义的RequestHandlerClass
class MyHandler(BaseRequestHandler):
    def handle(self): #处理请求, 提供了一个处理线程、handler实例 recv
        super().handle() # 做为一个好习惯,虽然父类handle什么都没有做
        print(self.client_address) # 客户端的
        print(id(self.server), self.server) 
        print(type(self.request), self.request) #和客户端通信的new socket对象

        while True:
            data = self.request.recv(1024) #阻塞的
            print(data, '%%%%')
            msg = 'msg = {}'.format(data.decode())
            self.request.send(msg.encode())
        print(threading.enumerate())

server = ThreadingTCPServer(('127.0.0.1', 9999), MyHandler) #ThreadingTCPServer 异步; TCPServer 主线程
# self.RequestHandlerClass(request, client_address, self) #这3个参数是server注入的 实例化
print(id(server), server)
print(server.socket)
print(server.__dict__)

# server.handle_request() #处理请求
server.serve_forever() # 循环的调用handle_request
print('=' * 30)
server.server_close()

测试

#客户端
PS D:\> nc 127.0.0.1 9999
aaaa
msg = aaaa

PS C:\Users\Administrator> nc 127.0.0.1 9999
bbbb
msg = bbbb

#服务端返回
2244418103552 <socketserver.ThreadingTCPServer object at 0x0000020A91BE6900>
<socket.socket fd=332, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>
{'server_address': ('127.0.0.1', 9999), 'RequestHandlerClass': <class '__main__.MyHandler'>, '_BaseServer__is_shut_down': <threading.Event at 0x20a91de39d0: unset>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=332, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>}
('127.0.0.1', 61660)
2244418103552 <socketserver.ThreadingTCPServer object at 0x0000020A91BE6900>
<class 'socket.socket'> <socket.socket fd=336, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 61660)>
b'aaaa\n' %%%%
('127.0.0.1', 61703)
2244418103552 <socketserver.ThreadingTCPServer object at 0x0000020A91BE6900>
<class 'socket.socket'> <socket.socket fd=344, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 61703)>
b'bbbb\n' %%%%

将ThreadingTCPServer换成TCPServer,同时连接2个客户端观察效果。

ThreadingTCPServer是异步的,可以同时处理多个连接。

TCPServer是同步的,一个连接处理完了,即一个连接的handle方法执行完了,才能处理另一个连接,且只有主线程。

总结

创建服务器需要几个步骤:

  • 从BaseRequestHandler类派生出子类,并覆盖其handle()方法来创建请求处理程序类,此方法将处理传入请求
  • 实例化一个服务器类,传参服务器的地址和请求处理类
  • 调用服务器实例的handle_request()执行一次或serve_forever()永久执行方法
  • 调用server_close()关闭套接字

BaseRequestHandler是基类,其子类提供了更好的封装。

StreamRequestHandler提供了类文件对象操作socket,rfile提供读取能力,wfile提供写入能力。

实现EchoServer

顾名思义,Echo,来什么消息回显什么消息

客户端发来什么信息,返回什么信息

import datetime
import time
import socket
import logging
from socketserver import ThreadingTCPServer, StreamRequestHandler
import threading

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class EchoHandler(StreamRequestHandler):
    def handle(self):
        print(self.request)
        print(self.rfile, self.wfile) #rb rw

        print('handle ~~~~')
        while True:
            data = self.rfile.readline()
            # data = self.rfile.read1(1024)
            print(data, '********')
            msg = 'Your msg = {}'.format(data.decode()).encode()
            self.wfile.write(msg)
        print('++++++++++')

server = ThreadingTCPServer(('127.0.0.1', 9999), EchoHandler)
threading.Thread(target=server.serve_forever, name='serve').start()

while True:
    time.sleep(1)
    cmd = input('>>>')
    if cmd == 'quit':
        server.server_close()
        break
    print(threading.enumerate())

测试

#客户端
PS D:\> nc  127.0.0.1 9999
1112333
Your msg = 1112333
lssfdfs
Your msg = lssfdfs

PS D:\> nc  127.0.0.1 9999
222
Your msg = 222

Your msg =

#服务端返回
>>><socket.socket fd=312, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59637)>
<_io.BufferedReader name=312> <socketserver._SocketWriter object at 0x00000229C27B3A90>
handle ~~~~
b'1112333\n' ********
b'lssfdfs\n' ********
<socket.socket fd=328, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 59776)>
<_io.BufferedReader name=328> <socketserver._SocketWriter object at 0x00000229C2CF0640>
handle ~~~~
b'222\n' ********
b'\n' ********

实战-改写ChatServer

使用ThreadingTCPServer改写ChatServer

import datetime
import time
import socket
import logging
from socketserver import ThreadingTCPServer, StreamRequestHandler
import threading

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatHandler(StreamRequestHandler):
    clients = {}  #1 线程安全问题,加锁
    lock = threading.Lock()

    def setup(self):
        super().setup() #类属性
        self.event = threading.Event()
        with self.lock:
            self.clients[self.client_address] = self.wfile

    def handle(self): #rfile wfile
        super().handle()
        while not self.event.is_set():
            try:
                data = self.rfile.readline()
            except Exception as e:
                logging.error(e)
                data = b''
            # data = self.rfile.read1(1024)
            if not data or data == b'quit':
                #模块内部会自动关闭连接
                break
            msg = '{:%H%M%S} From {}:{} msg = {}'.format(
                datetime.datetime.now(),
                *self.client_address,
                data.decode()
            ).encode()

            with self.lock:
                for w in self.clients.values():
                    w.write(msg)
                    w.flush()

    def finish(self):
        super().finish()
        self.event.set()
        with self.lock:
            self.clients.pop(self.client_address)

#ChatHandler(3个参数) ,每一个请求都会产生一个hander实例
server = ThreadingTCPServer(('127.0.0.1', 9999), ChatHandler)
server.daemon_threads = True #工作线程都设置为daemon, server_forever() 是none-daemon线程。
threading.Thread(target=server.serve_forever, name='serve').start()

while True:
    time.sleep(1)
    cmd = input('>>>')
    if cmd == 'quit':
        server.server_close()
        break
    print(threading.enumerate())

测试

#客户端
PS D:\> nc  127.0.0.1 9999
1111
231710 From 127.0.0.1:4334 msg = 1111
231717 From 127.0.0.1:4326 msg = 333

PS D:\> nc  127.0.0.1 9999
231710 From 127.0.0.1:4334 msg = 1111
333
231717 From 127.0.0.1:4326 msg = 333
PS D:\>


#服务端返回
>>>2025-03-20 23:17:27,777 Thread-1 (process_request_thread) 14504 [WinError 10054] 远程主机强迫关闭了一个现有的连接。

[<_MainThread(MainThread, started 20424)>, <Thread(serve, started 21144)>, <Thread(Thread-2 (process_request_thread), started 8792)>]
>>>

问题

  • 如果连接的线程中handle方法中抛出异常,例如客户端主动断开导致的异常,线程崩溃,self.clients的pop方法还能执行吗?

当然能执行,基类源码保证了即使异常,也能执行finish方法。但不代表不应该不捕获客户端各种异常

总结

为每一个连接提供RequestHandlerClass类实例,依次调用setup、handle、 finish方法,且使用了try…finally结构保证finish方法一定能被调用。这些 方法依次执行完成,如果想维持这个连接和客户端通信,就需要在handle函数中 使用循环。

socketserver模块提供的不同的类,但是编程接口是一样的,即使是多进程、多线程的类也是一样,大大减少了编程的难度。

将socket编程简化,只需要程序员关注数据处理本身,实现Handler类就行了。这种风格在Python十分常见。

IO多种概念及多路复用

重要概念

同步、异步

函数或方法被调用的时候,调用者是否得到 最终结果 的。

直接得到最终结果的,就是同步调用;

不直接得到最终结果的,就是异步调用。

阻塞、非阻塞

函数或方法调用的时候,是否立刻返回。

立即返回就是非阻塞调用;

不立即返回就是阻塞调用。

区别

  • 同步、异步,与阻塞、非阻塞不相关
  • 同步、异步强调的是,是否得到(最终的)结果
  • 阻塞、非阻塞强调是时间,是否等待
  • 同步与异步区别在于:调用者是否得到了想要的最终结果
  • 同步就是一直要执行到返回最终结果
  • 异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得 到结果,以后可以通过被调用者提供的某种方式(被调用着通知调用者、调用 者反复查询、回调),来取回最终结果

    例如

    • 饭店吃饭,并没有给你这碗饭而是给你了号等着叫号,这是异步。取结果,可以主动查号,也可以被调用者来通知,或者使用(被调用者)饭店的预案等到号了饭打包(回调)
  • 阻塞与非阻塞的区别在于,调用者是否还能干其他事
  • 阻塞,调用者就只能干等
  • 非阻塞,调用者可以先去忙会别的,不用一直等

联系

同步阻塞,我啥事不干,就等你打饭打给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。

同步非阻塞,我等着你打饭给我,但我可以玩会手机、看看电视。打饭是结果,但是我不一直等。程序很难实现。

异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。例如,取了号什么不干就等叫自己的号。程序很难实现。

异步非阻塞,我要打饭,你给我号,你说等叫号,并没有返回饭给我,我在旁边看电视、玩手机,饭打好了叫我。

操作系统知识

在386之前,CPU工作在实模式下,之后,开始支持保护模式,对内存进行了划分 X86 CPU有4种工作级别:

  • Ring0, Ring1, Ring2, Ring3
  • Ring0级,可以执行特权指令,可以访问所有级别数据,可以访问IO设备等
  • Ring3级,级别最低,只能访问本级别数据
  • 内核代码运行在Ring0,用户代码运行在Ring3

现代操作系统采用虚拟存储器,理论上,对于32位系统来说,进程对虚拟内存地 址的内存寻址空间为4G(2^32)。64位操作系统理论上最大内存寻址空间(2^64)。

操作系统中,内核程序独立且运行在较高的特权级别上,它们驻留在被保护的内 存空间上,拥有访问硬件设备的所有权限,这部分内存称为内核空间(内核态, 最高地址1G)。

普通应用程序运行在用户空间(用户态)

应用程序想访问某些硬件资源就需要通过操作系统提供的 系统调用 ,系统调用可 以使用特权指令运行在内核空间,此时进程陷入内核态运行。系统调用完成,进 程将回到用户态执行用户空间代码。

img_20250321_114822.png

同步IO、异步IO、IO多路复用

IO两个阶段

IO过程分两阶段:

  • 数据准备阶段。从设备读取数据到内核空间的缓冲区(淘米,把米放锅里煮饭)
  • 内核空间复制回用户空间进程缓冲区阶段(盛饭,从内核这个饭锅里把饭装到碗里)

系统调用——read函数、recv函数等

IO模型

同步IO

同步IO模型包括 阻塞IO、非阻塞IO、IO多路复用、信号驱动IO

阻塞IO
img_20250321_140955.png

进程等待(阻塞),直到读写完成。(全程等待)

img_20250321_141032.png
非阻塞IO
img_20250321_141128.png

进程调用read操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用 户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户 空间。

  • 第一阶段数据没有准备好,就先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的
  • 第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的

例:淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭

img_20250321_141215.png
IO多路复用

也叫事件驱动模型 Event-driven IO。

所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力。

select几乎所有操作系统平台都支持,poll是对的select的升级。

epoll,Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有kqueue,Windows有iocp。

img_20250321_141641.png

以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核“监 视”select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据, select返回。再使用read将数据复制到用户进程。

select举例

  • 食堂供应很多菜(众多的IO),你需要吃某三菜一汤,大师傅(操作系统)说 要现做,需要等,你只好等待大师傅叫。其中一样菜好了,大师傅叫你,说你 点的菜有好的了,你得自己遍历找找看哪一样才好了,请服务员把做好的菜打 给你。
  • epoll是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。

一般情况下,select最多能监听1024个fd(可以修改,但不建议改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。

epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。

img_20250321_141722.png
信号驱动IO

进程在IO访问时,先通过sigaction系统调用,提交一个信号处理函数,立即返回。进程不阻塞 当内核准备好数据后,产生一个SIGIO信号并投递给信号处理函数。可以在此函数中调用recvfrom函数操作数据从内核空间复制到用户空间,这段过程进程阻塞

img_20250321_141936.png

异步IO

img_20250321_142116.png

进程发起异步IO请求,立即返回。内核完成IO的两个阶段,内核给进程发一个信号。

举例

  • 来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电 话叫你。两阶段都是异步的。在整个过程中,进程都可以忙别的,等好了才过 来。
  • 今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。

Linux的aio的系统调用,内核从版本2.6开始支持

img_20250321_142141.png
img_20250321_160753.png

前4个都是同步IO,因为核心操作recv函数调用时,进程阻塞直到拿到最终结果为止。

而异步IO进程全程不阻塞

Python中IO多路复用

IO多路复用

  • 大多数操作系统都支持select和poll
  • Linux 2.5+ 支持epoll
  • BSD、Mac支持kqueue
  • Solaris实现了/dev/poll
  • Windows的IOCP

Python的select库实现了select、poll系统调用,这个基本上操作系统都支持。对Linux内核2.5+支持了epoll。

开发中的选择

  • 完全跨平台,使用select、poll。但是性能较差
  • 针对不同操作系统自行选择支持的技术,这样做会提高IO处理的性能

select维护一个文件描述符数据结构,单个进程使用有上限,通常是1024,线性扫描这个数据结构。效率低。

pool和select的区别是内部数据结构使用链表,没有这个最大限制,但是依然是线性遍历才知道哪个设备就绪了。

epool使用事件通知机制,使用回调机制提高效率。

select、pool还要从内核空间复制消息到用户空间,而epoll通过内核空间和用户空间共享一块内存来减少复制

selectors库

3.4版本提供selectors库,高级IO复用库。

类层次结构︰
BaseSelector
+-- SelectSelector      实现select
+-- PollSelector        实现poll
+-- EpollSelector       实现epoll
+-- DevpollSelector     实现devpoll
+-- KqueueSelector      实现kqueue

selectors.DefaultSelector返回当前平台最有效、性能最高的实现。

但是,由于没有实现Windows下的IOCP,所以,Windows下只能退化为select

在selects模块源码最下面有如下代码

# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

事件注册

class SelectSelector(_BaseSelectorImpl): 
    """Select-based selector."""
    def register(fileobj, events, data=None) -> SelectorKey: pass
  • 为selector注册一个文件对象,监视它的IO事件。返回SelectKey对象。
  • fileobj 被监视文件对象,例如socket对象
  • events 事件,该文件对象必须等待的事件
  • data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客 户端的会话ID,关联方法。通过这个参数在关注的事件产生后让selector干什 么事。

Event常量

  • EVENT_READ 可读 0b01,内核已经准备好输入输出设备,可以开始读了
  • EVENT_WRITE 可写 0b10,内核准备好了,可以往里写了

selectors.SelectorKey 有4个属性:

  • fileobj 注册的文件对象
  • fd 文件描述符
  • events 等待上面的文件描述符的文件对象的事件
  • data 注册时关联的数据

练习:IO多路复用TCP Server

完成一个TCP Server,能够接受客户端请求并回应客户端消息

import time
import threading
import socket
import selectors

#TCP Echo Server
server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen()
server.setblocking(False) #设置为非阻塞模式
print(server)

# server.accept()

selector = selectors.DefaultSelector()
key = selector.register(server, selectors.EVENT_READ, 12345) #注册待监控的文件对象或socket对象
print(type(key), key) #key.fileobj key.events, key.fd, key.data
print('-' * 30)
events = selector.select() #阻塞到监控的IO满足事件, 返回list  一批 [(key, event), (key1, 1), (key2, 2)]
print(type(events), events)

selector.close()

测试

#客户端
PS D:\> nc  127.0.0.1 9999
PS D:\>

#服务端返回
<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>
<class 'selectors.SelectorKey'> SelectorKey(fileobj=<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=296, events=1, data=12345)
------------------------------
<class 'list'> [(SelectorKey(fileobj=<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=296, events=1, data=12345), 1)]
import time
import threading
import socket
import selectors

#TCP Echo Server
server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen()

# 官方建议采用非阻塞ID
server.setblocking(False) #设置为非阻塞模式
print(server)

# 回调函数,sock的读事件
# 形参自定义
# server.accept()
def accept(sock:socket.socket, mask:int):
    newsock, raddr = sock.accept() #第二阶段
    newsock.setblocking(False) #非阻塞
    print(newsock)
    print(raddr)

    key = selector.register(newsock, selectors.EVENT_READ, recv) #

# 回调函数
def recv(conn:socket.socket, mask:int):
    data = conn.recv(1024)
    print(data, '++++')
    msg = 'msg = {}'.format(data.decode())
    conn.send(msg.encode())

selector = selectors.DefaultSelector()
key = selector.register(server, selectors.EVENT_READ, accept) #注册待监控的文件对象或socket对象
print(type(key), key)
print('-' * 30)
while True: #IO 多路复用
    events = selector.select() #阻塞到监控的IO满足事件, 返回list  一批 [(key, event), (key1, 1), (key2, 2)]
    # print(type(events), events)
    for key, mask in events:
        print(key, mask) #key.fileobj:server key.events=1, key.fd=fd, key.data=accept
        key.data(key.fileobj, mask)    #调用accept(server)
    print('=' * 30)
    print(*selector.get_map().items(), sep='\n')
    print('=' * 30)

selector.close()

测试

#客户端
PS D:\> nc  127.0.0.1 9999
12345
msg = 12345

#服务端
<socket.socket fd=316, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>
<class 'selectors.SelectorKey'> SelectorKey(fileobj=<socket.socket fd=316, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=316, events=1, data=<function accept at 0x00000289F038B880>)
------------------------------
SelectorKey(fileobj=<socket.socket fd=316, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=316, events=1, data=<function accept at 0x00000289F038B880>) 1
<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 14797)>
('127.0.0.1', 14797)
==============================
(316, SelectorKey(fileobj=<socket.socket fd=316, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=316, events=1, data=<function accept at 0x00000289F038B880>))
(296, SelectorKey(fileobj=<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 14797)>, 
fd=296, events=1, data=<function recv at 0x00000289F045A5C0>))
==============================
SelectorKey(fileobj=<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 14797)>, fd=296, events=1, data=<function recv at 0x00000289F045A5C0>) 1
b'12345\n' ++++
==============================
(316, SelectorKey(fileobj=<socket.socket fd=316, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=316, events=1, data=<function accept at 0x00000289F038B880>))
(296, SelectorKey(fileobj=<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 14797)>, 
fd=296, events=1, data=<function recv at 0x00000289F045A5C0>))
==============================

#客户端
PS D:\tmp> nc  127.0.0.1 9999
222
msg = 222

#服务端
SelectorKey(fileobj=<socket.socket fd=316, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=316, events=1, data=<function accept at 0x00000289F038B880>) 1
<socket.socket fd=320, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 19082)>
('127.0.0.1', 19082)
==============================
(316, SelectorKey(fileobj=<socket.socket fd=316, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=316, events=1, data=<function accept at 0x00000289F038B880>))
(296, SelectorKey(fileobj=<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 14797)>, 
fd=296, events=1, data=<function recv at 0x00000289F045A5C0>))
(320, SelectorKey(fileobj=<socket.socket fd=320, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 19082)>, 
fd=320, events=1, data=<function recv at 0x00000289F045A5C0>))
==============================
SelectorKey(fileobj=<socket.socket fd=320, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 19082)>, fd=320, events=1, data=<function recv at 0x00000289F045A5C0>) 1
b'222\n' ++++
==============================
(316, SelectorKey(fileobj=<socket.socket fd=316, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=316, events=1, data=<function accept at 0x00000289F038B880>))
(296, SelectorKey(fileobj=<socket.socket fd=296, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 14797)>, 
fd=296, events=1, data=<function recv at 0x00000289F045A5C0>))
(320, SelectorKey(fileobj=<socket.socket fd=320, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 19082)>, 
fd=320, events=1, data=<function recv at 0x00000289F045A5C0>))
==============================

实战:IO多路复用群聊软件

将ChatServer改写成IO多路复用的方式

不需要启动多线程来执行socket的accept、recv方法了

使用面向对象概念,构建

import time
import socket
import selectors
import logging

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = ip, port
        self.server = socket.socket()

    def start(self):
        self.server.bind(self.addr)
        self.server.listen()
        self.server.setblocking(False)

        #?

    def stop(self):
        self.server.close()

与客户端建立连接

import time
import socket
import selectors
import logging
import threading

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = ip, port
        self.server = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.event = threading.Event()

    def start(self):
        self.server.bind(self.addr)
        self.server.listen()
        self.server.setblocking(False)

        self.selector.register(self.server, selectors.EVENT_READ, self.accept)

        #select
        while not self.event.is_set():
            events = self.selector.select() #阻塞到某一批IO就绪
            for key, mask in events:
                print(key, mask)
                key.data(key.fileobj, mask)

    def accept(self, sock, mask):
        print('#' * 30)
        new_client_conn, client_raddr = self.server.accept()
        print(new_client_conn)
        print(client_raddr)
        new_client_conn.setblocking(False)
        print('#' * 30)

    def stop(self):
        self.event.set()
        self.server.close()

def run():
    cs = ChatServer()
    cs.start() #阻塞了

if __name__ == '__main__':
    run()

测试

#客户端
PS D:\> nc -p 51100  127.0.0.1 9999
PS D:\>

#服务端
SelectorKey(fileobj=<socket.socket fd=328, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=328, events=1, data=<bound method ChatServer.accept of <__main__.ChatServer object at 0x000001BB57A59A90>>) 1
##############################
<socket.socket fd=184, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 51100)>
('127.0.0.1', 51100)
##############################

测试

#客户端1
PS D:\> nc -p 51100  127.0.0.1 9999
1111
2025-03-24 10:27:45.862748 From 127.0.0.1:51100. msg=1111

#服务端
SelectorKey(fileobj=<socket.socket fd=312, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=312, events=1, data=<bound method ChatServer.accept of <__main__.ChatServer object at 0x000001E964B99A90>>) 1
##############################
<socket.socket fd=324, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 51100)>
('127.0.0.1', 51100)
##############################
SelectorKey(fileobj=<socket.socket fd=324, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 51100)>, fd=324, events=1, data=<bound method ChatServer.recv of <__main__.ChatServer object at 0x000001E964B99A90>>) 1
b'1111\n' ++++

#客户端2  ctrl+c 主动断开
PS D:\> nc -p 51101  127.0.0.1 9999
2222
2025-03-24 10:29:26.373504 From 127.0.0.1:51101. msg=2222
PS D:\>


#服务端
SelectorKey(fileobj=<socket.socket fd=312, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999)>, fd=312, events=1, data=<bound method ChatServer.accept of <__main__.ChatServer object at 0x000001E964B99A90>>) 1
##############################
<socket.socket fd=328, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 51101)>
('127.0.0.1', 51101)
##############################
SelectorKey(fileobj=<socket.socket fd=328, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 51101)>, fd=328, events=1, data=<bound method ChatServer.recv of <__main__.ChatServer object at 0x000001E964B99A90>>) 1
b'2222\n' ++++
SelectorKey(fileobj=<socket.socket fd=328, family=2, type=1, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 51101)>, fd=328, events=1, data=<bound method ChatServer.recv of <__main__.ChatServer object at 0x000001E964B99A90>>) 1
Traceback (most recent call last):
  File "d:\project\pyproj\trae\t2.py", line 63, in <module>
    run()
    ~~~^^
  File "d:\project\pyproj\trae\t2.py", line 60, in run
    cs.start() #阻塞了
    ~~~~~~~~^^
  File "d:\project\pyproj\trae\t2.py", line 30, in start
    key.data(key.fileobj, mask)
    ~~~~~~~~^^^^^^^^^^^^^^^^^^^
  File "d:\project\pyproj\trae\t2.py", line 44, in recv
    data = conn.recv(1024)
ConnectionResetError: [WinError 10054] 远程主机强迫关闭了一个现有的连接。

处理客户端断开问题

if not data or data == b'quit':
    self.selector.unregister(conn) #注销, 不再监控了,反注册
    conn.close() #?
    logging.info('{} leaving. bye...'.format(conn))
    return #结束当前线程

实现群聊

import time
import socket
import selectors
import logging
import threading
import datetime

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = ip, port
        self.server = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.event = threading.Event()

    def start(self):
        self.server.bind(self.addr)
        self.server.listen()
        self.server.setblocking(False)

        self.selector.register(self.server, selectors.EVENT_READ, self.accept)

        #select
        while not self.event.is_set():
            events = self.selector.select() #阻塞到某一批IO就绪
            for key, mask in events:
                print(key, mask)
                key.data(key.fileobj, mask)

    def accept(self, sock, mask):
        print('#' * 30)
        new_client_conn, client_raddr = self.server.accept()  #第二阶段 阻塞
        # new_client_conn, client_raddr = sock.accept() #这样与上一行效果一样
        print(new_client_conn)
        print(client_raddr)
        new_client_conn.setblocking(False)
        print('#' * 30)

        key = self.selector.register(new_client_conn, selectors.EVENT_READ, self.recv)

    def recv(self, conn:socket.socket, mask):
        data = conn.recv(1024)
        print(data, '++++')
        if not data or data == b'quit':
            self.selector.unregister(conn) #注销, 不再监控了,反注册
            conn.close() #?
            logging.info('{} leaving. bye...'.format(conn))
            return #结束当前线程
        msg = "{} From {}:{}. msg={}".format(
            datetime.datetime.now(),
            *conn.getpeername(), #对端地址
            data.decode()
        ).encode()
        # conn.send(msg)
        # print(*self.selector.get_map().items(), sep='\n')
        for fd, key in self.selector.get_map().items():
            # print('-' * 30)
            # print(key.fileobj)
            # print(key.data)
            # print('-' * 30)
            # if key.data is self.recv: #绑定对象id不一样
            if key.data == self.recv: #绑定对象内容一样
                key.fileobj.send(msg) #发消息给每个客户端

    def stop(self):
        self.event.set()
        self.server.close()

def run():
    cs = ChatServer()
    cs.start() #阻塞了

if __name__ == '__main__':
    run()

上面代码,只有一个线程。如果任何地方出异常,主线程就挂了,需要在里面做异常处理。

import time
import logging
import threading

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

def a():
    time.sleep(5)
    logging.info(threading.enumerate())
    logging.info('hello')
    time.sleep(3)

threading.Thread(target=a).start()

time.sleep(2)
raise TypeError('test')

#等2秒主线程异常结束了
#等5秒,还要等其他non-daemon线程结束
#进程的异常会导致进程状态码非0

#返回结果
# Traceback (most recent call last):
#   File "d:\project\pyproj\trae\t.py", line 17, in <module>
#     raise TypeError('test')
# TypeError: test
# 2025-03-24 11:41:45,823 Thread-1 (a) 25540 [<_MainThread(MainThread, stopped 27544)>, <Thread(Thread-1 (a), started 25540)>]
# 2025-03-24 11:41:45,823 Thread-1 (a) 25540 hello
import time
import socket
import selectors
import logging
import threading
import datetime

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = ip, port
        self.server = socket.socket()
        self.selector = selectors.DefaultSelector()
        self.event = threading.Event()

    def start(self):
        self.server.bind(self.addr)
        self.server.listen()
        self.server.setblocking(False)

        threading.Thread(target=self.select, name='select', daemon=False).start()

    def select(self):
        self.selector.register(self.server, selectors.EVENT_READ, self.accept)
        while not self.event.is_set():
            events = self.selector.select(0.5) #阻塞到某一批IO就绪
            for key, mask in events:
                key.data(key.fileobj, mask)

    def accept(self, sock, mask):
        new_client_conn, client_raddr = self.server.accept()  #第二阶段 阻塞
        new_client_conn.setblocking(False)
        key = self.selector.register(new_client_conn, selectors.EVENT_READ, self.recv)

    def recv(self, conn:socket.socket, mask):
        data = conn.recv(1024)
        print(data, '++++')
        if not data or data == b'quit':
            self.selector.unregister(conn) #注销, 不再监控了,反注册
            conn.close() #?
            logging.info('{} leaving. bye...'.format(conn))
            return #结束当前线程
        msg = "{} From {}:{}. msg={}".format(
            datetime.datetime.now(),
            *conn.getpeername(), #对端地址
            data.decode()
        ).encode()
        for fd, key in self.selector.get_map().items():
            if key.data == self.recv: #绑定对象内容一样
                key.fileobj.send(msg) #发消息给每个客户端

    def stop(self):
        self.event.set()
        # fileobjs = [key.fileobj for key in self.selector.get_map().values()]
        # for obj in fileobjs:
        #     self.selector.unregister(obj)
        #     obj.close()
        # self.selector.close()
        self.server.close()

def run():
    cs = ChatServer()
    cs.start() #阻塞了

    while True:
        cmd = input('>>>')
        if cmd == 'quit':
            cs.stop()
            break
        print(threading.enumerate())

if __name__ == '__main__':
    run()

本例只完成基本功能,其他功能如有需要,请自行完成。

注意使用IO多路复用,使用了几个线程?

特别注意key.data == self.recv

总结

使用 IO多路复用 +(select、epoll) 并不一定比 多线程 + 同步阻塞IO 性能好,其最大优势可以处理更多的连接

  • 多线程 + 同步阻塞IO模式

    开辟太多线程,线程开辟、销毁开销还是较大,倒是可以使用线程池;线程多, 线程自己使用的内存也很可观;多线程切换时要保护现场和恢复现场,线程过 多,切换会占用大量的时间。

  • 连接较少,多线程 + 同步阻塞IO模式比较适合,效率也不低
  • 如果连接非常多,对服务端程序来说,IO并发还是比较高的,这时候,开辟太多线程其实也不是很划算,这时候IO多路复用或许是更好的选择,使用epoll。
emacs

Emacs

org-mode

Orgmode

Donations

打赏

Copyright

© 2025 Jasper Hsu

Creative Commons

Creative Commons

Attribute

Attribute

Noncommercial

Noncommercial

Share Alike

Share Alike