一、psutil模块(重点)

psutil模块作用:协助我们完成CPU使用率、内存、磁盘信息、网络等等相关数据的采集!

1、模块介绍

psutil 是一个跨平台的 Python 库,用于检索系统的运行信息,包括 CPU 使用情况、内存状态、磁盘信息、网络统计、进程信息等,非常适合运维和系统监控应用。下面是 psutil 的一些常见用法和应用示例。

2、安装 psutil

前置操作:

第一步:启动VMware中的node1服务器(192.168.88.101)

第二步:在PyCharm中创建一个新项目,所有配置保持默认,不需要调整

img

第三步:在项目中,找到File菜单->Settings设置->Project关键词菜单->Python Interpreter解析器

img

选择On SSH:配置远程解析器

img

填入Linux服务器信息:

img

确认指纹,单击OK,然后输入Linux服务器的密码,如下图所示:

img

设置系统解析器:

在Linux服务器的/root目录创建一个day04Project文件夹

1
mkdir -p /root/day04Project

然后更改解析器信息(注意:必须选择系统解析器):

img

更改源码上传位注意更改源码上传位置::

img

进入到Linux服务器,可以通过以下命令安装 psutil

1
2
3
4
5
6
7
dnf install python3-pip -y
pip install psutil -i https://pypi.tuna.tsinghua.edu.cn/simple

注:
pip install 软件包名称,安装软件
-i 指定镜像源,说白了就是从哪里下载软件
https://pypi.tuna.tsinghua.edu.cn/simple清华镜像源

3、官方文档

官方地址:https://github.com/giampaolo/psutil

4、获取CPU使用情况

获取CPU使用率

1
2
3
4
5
import psutil

# 获取 CPU 总使用率,1 秒间隔
cpu_usage = psutil.cpu_percent(interval=1)
print("CPU usage:", cpu_usage, "%")

常见运行报错解决方案:

img

获取每个核心的 CPU 使用率

[第一个核心使用率,第二个核心使用率,第三个核心使用率,第四个核心使用]

1
2
3
4
5
import psutil

# 获取每个核心的CPU使用率
for i in range(psutil.cpu_count()):
print("CPU usage of core", i, ":", psutil.cpu_percent(interval=1, percpu=True)[i], "%")

5、获取内存使用情况

字节Bytes => /1024 KB千字节 => /1024 MB兆字节 => /1024 GB

1
2
3
4
5
6
7
import psutil

# 获取物理内存信息
memory_info = psutil.virtual_memory()
print("Total memory:", memory_info.total // (1024 ** 2), "MB")
print("Used memory:", memory_info.used // (1024 ** 2), "MB")
print("Memory usage:", memory_info.percent, "%")

获取交换内存信息

1
2
3
4
5
6
7
import psutil

# 获取交换内存信息
swap_info = psutil.swap_memory()
print("Total swap:", swap_info.total // (1024 ** 2), "MB")
print("Used swap:", swap_info.used // (1024 ** 2), "MB")
print("Swap usage:", swap_info.percent, "%")

Linux操作系统必须有两个分区:/根分区 + swap分区

安装操作系统时,默认会分配一定的磁盘空间(物理内存1-2倍,小于8G情况,物理内存2倍,大于等于8G,就是1倍)作为临时内存使用;当系统内存资源不足时,系统会自动调用swap分区充当内存使用。

6、获取磁盘使用情况

1
2
3
4
5
6
7
8
9
10
11
12
import psutil

# 获取磁盘分区
partitions = psutil.disk_partitions()
for partition in partitions:
print(f"Device: {partition.device}, Mountpoint: {partition.mountpoint}, Filesystem: {partition.fstype}")

# 获取指定分区的使用情况
disk_usage = psutil.disk_usage('/')
print("Total disk space:", disk_usage.total // (1024 ** 3), "GB")
print("Used disk space:", disk_usage.used // (1024 ** 3), "GB")
print("Disk usage:", disk_usage.percent, "%")

获取磁盘IO:

I:Input

O:Output

磁盘IO指的是系统对磁盘设备进行数据读写的过程,主要包括:

  • 读取操作:从磁盘中读取数据到内存。
  • 写入操作:将数据从内存写入到磁盘。

磁盘IO性能通常受磁盘硬件、文件系统类型、RAID配置和磁盘缓存的影响。 常见指标:

  • IOPS(Input/Output Operations Per Second):每秒处理的IO操作数量。
  • 吞吐量:磁盘每秒处理的数据量(如MB/s或GB/s)。
  • 比如:一块SSD的读写速度标称为 3.5 GB/s,表示它每秒可以读取或写入3.5GB的数据。

计算磁盘的IOPS:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import psutil
import time

# 第一次采样
disk_io_1 = psutil.disk_io_counters()
time.sleep(1) # 等待1秒
# 第二次采样
disk_io_2 = psutil.disk_io_counters()

# 计算吞吐量 (MB/s)
read_bytes_diff = disk_io_2.read_bytes - disk_io_1.read_bytes
write_bytes_diff = disk_io_2.write_bytes - disk_io_1.write_bytes
read_mb_s = read_bytes_diff / (1024 * 1024) # 转换为MB/s
write_mb_s = write_bytes_diff / (1024 * 1024) # 转换为MB/s

# 计算IOPS
read_iops = disk_io_2.read_count - disk_io_1.read_count
write_iops = disk_io_2.write_count - disk_io_1.write_count

print(f"读取吞吐量: {read_mb_s:.2f} MB/s")
print(f"写入吞吐量: {write_mb_s:.2f} MB/s")
print(f"读取IOPS: {read_iops}")
print(f"写入IOPS: {write_iops}")

---------------------------------------------------------------------------
psutil.disk_io_counters() 返回一个对象,包含以下关键属性(指标):
read_bytes:从磁盘读取的总字节数。
write_bytes:写入磁盘的总字节数。
read_count:读取操作的次数(可用于计算IOPS)。
write_count:写入操作的次数。

运行结果:

1
2
3
4
读取吞吐量: 15.75 MB/s
写入吞吐量: 8.20 MB/s
读取IOPS: 1200
写入IOPS: 600

理论值参考:

吞吐量:HDD(顺序读写约100-200 MB/s),SATA SSD(500-550 MB/s)或NVMe SSD(3-7 GB/s)

IOPS:HDD(100-200 IOPS),SATA SSD(1万-10万IOPS)或NVMe SSD(10万-100万IOPS)

7、获取网络信息

获取网络I/O统计

网络IO指的是系统通过网络接口进行数据收发的过程,主要包括:

  • 接收操作:从网络中接收数据包。
  • 发送操作:通过网络发送数据包。
  • 带宽:单位时间内网络传输的数据量(如Mbps或Gbps)。
  • 延迟:网络数据包从源到目标的时间。
  • 吞吐量:实际传输的数据量,通常小于带宽上限。

普及一个概念:千兆带宽1Gb/s(注意这里b是小写的!!!)

Gb/s:表示 Gigabit per second(千兆比特每秒),1 Gb = 10亿比特(10^9 bits)。

GB/s:表示 Gigabyte per second(吉字节每秒),1 GB = 10亿字节(10^9 Bytes)。

关键区别:1 Byte(字节) = 8 bits(比特),所以 1 GB/s = 8 Gb/s。

所以千兆带宽的理论传输速度约为125MB/s,百兆带宽理论传输速度约为12.5MB/s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import psutil

# 获取网络流量统计
net_io = psutil.net_io_counters()
print("Bytes sent:", net_io.bytes_sent)
print("Bytes received:", net_io.bytes_recv)

-----------------------------------------------
psutil.net_io_counters() 返回一个命名元组,包含系统所有网络接口(如Wi-Fi、以太网)的累计网络IO统计数据
主要指标:
bytes_sent:系统发送的总字节数(上传流量)。
bytes_recv:系统接收的总字节数(下载流量)。
其他指标包括:
packets_sent:发送的数据包数。
packets_recv:接收的数据包数。
errin/errout:接收/发送的错误数。
dropin/dropout:接收/发送丢弃的数据包数。

获取网络IO吞吐量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import psutil
import time

# 第一次采样
net_io_1 = psutil.net_io_counters()
time.sleep(1) # 等待1秒
# 第二次采样
net_io_2 = psutil.net_io_counters()

# 计算吞吐量 (MB/s)
bytes_sent_diff = net_io_2.bytes_sent - net_io_1.bytes_sent
bytes_recv_diff = net_io_2.bytes_recv - net_io_1.bytes_recv
send_mb_s = bytes_sent_diff / (1024 * 1024) # 转换为MB/s
recv_mb_s = bytes_recv_diff / (1024 * 1024) # 转换为MB/s

print(f"发送吞吐量: {send_mb_s:.2f} MB/s")
print(f"接收吞吐量: {recv_mb_s:.2f} MB/s")

获取网路接口(网卡,如lo网卡、ens33/ens160/eth0)地址信息

1
2
3
4
5
6
import psutil

net_if_addrs = psutil.net_if_addrs()
for interface_name, interface_addresses in net_if_addrs.items():
print(f"接口名称: {interface_name}")
print(f"接口地址: {interface_addresses[0].address}")

ens33/ens160/eth0:物理网卡,lo(local)回环网卡 => 虚拟网卡,主要用于网络测试工作。

小结:

网络信息采集通常采集两个方面:(网络IO,使用net_io_counters)与 (网卡信息,使用net_if_addrs)

8、psutil运维场景:获取cpu使用率

监控CPU使用率,超过80%(阈值)就发给邮件给运维,前置知识点:https://docs.python.org/3.9/library/smtplib.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import psutil
import smtplib
from email.mime.text import MIMEText
from email.header import Header

# 设置邮箱地址和授权码
from_addr = '你的邮箱'
to_addr = '目标邮箱'
auth_code = '你的邮箱授权码'

# 获取 CPU 使用率
cpu_percent = psutil.cpu_percent()

# 判断 CPU 使用率是否超过阈值
if cpu_percent > 80:
# 构造邮件内容
subject = '警报:高 CPU 使用率'
message = 'CPU 使用率超过 80%:当前使用率为 {}%'.format(cpu_percent)

# 使用 MIMEText 创建邮件对象,指定编码为 UTF-8
msg = MIMEText(message, 'plain', 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = from_addr # 用于显示的发件人
msg['To'] = to_addr # 用于显示的收件人

# 建立 SMTP 连接
server = smtplib.SMTP('smtp.163.com')
server.login(from_addr, auth_code)

# 发送邮件
server.sendmail(from_addr, to_addr, msg.as_string())
server.quit()

小结:

整个案例一共使用了3个模块:检查系统CPU信息(psutil)、发送邮件(email)

另外特别注意:邮件发送时,使用的密码并不是邮件密码,而是邮件授权码!!!

二、paramiko模块

目标:paramiko主要用于实现(远程登录)、(文件上传)与(下载功能)

1、模块介绍

paramiko模块支持以加密和认证的方式连接远程服务器。可以实现远程文件的上传,下载或通过ssh远程执行命令。

Linux上写shell脚本远程ssh操作处理密码有两种方法:

  • ssh-keygen实现免密操作,这样远程连接不用输入密码 ssh 192.168.88.102 touch /tmp/123
  • expect自动应答处理密码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/bin/bash

# 删除 .ssh/known_hosts 中的旧记录,以便每次登录都需要确认
sed -i /^192.168.88.102/d /root/.ssh/known_hosts

# 使用 expect 自动化 SSH 登录
expect <<EOF
spawn ssh 192.168.88.102 # 执行 SSH 登录
expect "(yes/no)?" # 等待出现 yes/no 确认提示
send "yes\n" # 发送 yes 确认
expect "password:" # 等待输入密码提示
send "123\n" # 输入密码
expect "]" # 等待登录成功提示符
send "touch /tmp/123\n" # 登录成功后执行命令
send "exit\n" # 退出 SSH

expect eof
EOF

2、安装paramiko

1
pip install paramiko -i https://pypi.tuna.tsinghua.edu.cn/simple

3、使用paramiko实现远程登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import paramiko

ssh = paramiko.SSHClient() # 创建一个客户端连接实例
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) # 加了这一句,如果第一次ssh连接要你输入yes,也不用输入了
ssh.connect(hostname="192.168.88.102", port=22, username="root", password="123456") # 指定连接的ip,port,username,password

stdin,stdout,stderr = ssh.exec_command("touch /tmp/file.txt") # 执行一个命令,有标准输入,输出和错误输出

cor_res = stdout.read() # 标准输出赋值
err_res = stderr.read() # 错误输出赋值

print(cor_res.decode()) # 网络传输是二进制需要decode(我们没有讨论socket编程,所以你就直接这样做)
print(err_res.decode()) # 不管正确的还是错误的输出,都打印出来

ssh.close() # 关闭此连接实例

paramiko 中,ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 用于指定在首次连接到一个新主机时的处理策略。

作用

自动接受未知主机:在首次连接到一个新的主机(即 known_hosts 文件中没有记录的主机)时,通常会提示确认连接,要求输入 yesno

避免手动确认:使用 AutoAddPolicy 后,paramiko 会自动将未知主机的主机密钥添加到 known_hosts 文件中,而不提示用户手动确认。

把以上内容封装为函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
=# 1. 导入模块
import paramiko

# 2. 封装函数
def ssh_exec(hostname, password, cmd, port=22, username='root'):
# 创建SSHClient对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname=hostname, port=port, username=username, password=password)
# 执行命令,获取返回结果 => 标准输入 + 标准输出1 + 标准错误2
stdin, stdout, stderr = ssh.exec_command(cmd)
# 返回最终的结果
print(stdout.read().decode())
print(stderr.read().decode())
# 关闭SSH连接
ssh.close()

# 3. 调用函数
# ssh_exec('192.168.88.102', '123456', 'ls /')
while True:
cmd = input('[root@node2 ~]# ')
ssh_exec('192.168.88.102', '123456', cmd)

4、使用paramiko免密远程登录操作

node1服务器生成秘钥,发送公钥给node2服务器,然后才能使用paramiko实现免密登录!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ssh-keygen
# ssh-copy-id root@192.168.88.102
import paramiko

ssh = paramiko.SSHClient() # 创建一个客户端连接实例
private_key = paramiko.RSAKey.from_private_key_file("/root/.ssh/id_rsa") # 提前先做好ssh等效性(也就是免密登陆)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy) # 加了这一句,如果第一次ssh连接要你输入yes,也不用输入了
ssh.connect(hostname="192.168.88.102",port=22,username="root",pkey=private_key) # 把password=123456换成pkey=private_key
stdin,stdout,stderr = ssh.exec_command("touch /tmp/paramiko.txt")
cor_res = stdout.read()
err_res = stderr.read()

print(cor_res.decode())
print(err_res.decode())

ssh.close()

5、使用paramiko实现文件上传下载

基于账号密码操作

1
2
3
4
5
6
7
8
9
10
11
import paramiko
trans = paramiko.Transport(("192.168.88.102",22)) # 参数比较特殊,要求必须是一个文本类型

trans.connect(username="root",password="123456")

sftp = paramiko.SFTPClient.from_transport(trans)

sftp.get("/etc/fstab","/tmp/fstab1") # 把对方机器的/etc/fstab下载到本地为/tmp/fstab(注意不能只写/tmp,必须要命名)
sftp.put("/etc/inittab","/tmp/inittab1") # 本地的上传,也一样要命令

trans.close()

基于免密操作

node1:

1
2
# ssh-keygen
# ssh-copy-i-root 192.168.88.102

编写执行代码

1
2
3
4
5
6
7
8
9
10
11
12
13
import paramiko

trans = paramiko.Transport(("192.168.88.102",22))

private_key = paramiko.RSAKey.from_private_key_file("/root/.ssh/id_rsa")
trans.connect(username="root",pkey=private_key) # 提前使用ssh-keygen做好免密登录

sftp = paramiko.SFTPClient.from_transport(trans)

sftp.get("/etc/fstab","/tmp/fstab2")
sftp.put("/etc/inittab","/tmp/inittab2")

trans.close()

小结:

paramiko作用?实现(远程登录)、((上传)、(下载)、执行远程命令(模拟MobaXterm软件)等功能

paramiko既支持密码登录,还支持(免密)登录?

三、subprocess模块

作用?允许我们通过Python代码执行Linux命令(Python调用Linux系统命令),实现一些脚本自动化操作!

1、模块介绍

subprocess 是 Python 标准库中的一个模块,用于在程序中执行系统命令或脚本。它在运维场景中非常有用,可以实现任务的自动化和系统操作的编程化。

2、subprocess.run() 函数

subprocess.run()subprocess 这个函数返回一个 CompletedProcess 对象,其中包含了执行结果的各种信息,如返回码、标准输出和标准错误等。

1> 文件,2> 文件, &> 文件

1 = 标准输出(stdout)

2 = 标准错误(stderr)

& = 1 + 2(既包含标准输出也包含标准错误)

subprocess.run() 的基本用法如下:systemctl start mysqld => [‘systemctl’, ‘start’, ‘mysqld’]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import subprocess
import os

# 判断当前操作系统
if os.name == 'nt': # Windows
command = ['cmd', '/c', 'dir']
else: # Linux/Unix/macOS
command = ['ls', '-l']

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout) # 输出命令执行结果
print(result.returncode) # 输出命令返回值

为什么subprocess.run(['ls', '-l'])?
1个好处:可以保证数据安全,避免恶意操作(命令注入)
2个好处:加快Shell命令执行,Shell在底层需要把完整命令拆解然后在执行,如果subprocess.run直接拆解了命令,避免解析

在上面的例子中,我们执行了 ls -l 命令,并通过 capture_output=True 参数将标准输出捕获到变量 result.stdout 中。同时,text=True 参数确保输出以字符串形式而不是字节流形式返回。这样,我们就可以直接对输出进行字符串操作了。

3、自动化部署与配置

Nginx Web Server软件 => Apache Web Server软件(httpd)=> 80端口 => Web应用

1
2
3
4
5
6
7
8
9
10
11
import subprocess

# 安装软件包
subprocess.run(["yum", "install", "-y", "httpd"])

# 启动服务
subprocess.run(["systemctl", "start", "httpd"])

# 检查服务状态
result = subprocess.run(["systemctl", "status", "httpd"], capture_output=True, text=True)
print(result.stdout)

4、执行Shell脚本

提前在/root/script.sh

1
2
3
4
5
6
7
8
#!/bin/bash
#1.目前所使用仓库都属于基础仓库
#2.安装epel扩展库
dnf install epel-release -y
#3.安装sl软件
dnf install sl -y
#4.启动sl软件
sl

在Python中编写如下代码:

1
2
3
4
5
6
# 1. 导入模块
import subprocess
# 2. 基于run()方法触发Shell脚本
result = subprocess.run(['bash', '/root/script.sh'], capture_output=True, text=True)
# 3. 打印结果
print(result.stdout)

5、系统资源监控

1
2
3
4
5
6
7
8
9
10
11
import subprocess

# 获取磁盘使用情况
result = subprocess.run(["df", "-h"], capture_output=True, text=True)
print("磁盘使用情况:")
print(result.stdout)

# 获取内存使用情况
result = subprocess.run(["free", "-m"], capture_output=True, text=True)
print("内存使用情况:")
print(result.stdout)

小结:

subprocess模块作用?答:执行系统命令

subprocess在以下场景中使用(自动化部署)、(执行Shell脚本)、(系统资源监控)

四、Python定时采集

1、定时采集意义

在运维开发中,定时采集是常见的需求,例如:

  • 持续监控系统资源使用情况(CPU、内存、磁盘等)。
  • 定期记录关键日志信息。
  • 定时备份和检测系统状态。

Python提供了简单的time.sleep()方法,结合循环实现间隔性采集任务。

2、定时采集任务基础

  • 使用 while True` 死循环保证任务持续执行。
  • 调用time.sleep(interval)设置间隔时间(单位:秒)。
1
2
3
4
5
6
7
8
9
10
11
12
import time

# 定时任务示例
try:
print("开始定时采集任务,按Ctrl+C停止...")
while True:
# 模拟采集数据
print("正在采集数据...")
# 等待5秒
time.sleep(5)
except KeyboardInterrupt:
print("\n定时任务已停止")

3、基于psutil定时采集系统资源

Python提供了psutil模块,可以轻松获取系统资源使用信息:

  • CPU使用率psutil.``cpu_percent``()
  • 内存使用率psutil.``virtual_memory``().percent
  • 磁盘使用率psutil.``disk_usage``("/").percent

前置概念:try...except异常捕获,提前感知Python异常,预处理,避免报错终止程序执行

datetime扩展:datetime.now()可以用于获取当前系统时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import psutil
import time
from datetime import datetime

# 采集系统资源信息
try:
print("开始定时采集系统资源,按Ctrl+C停止...")
while True:
# 获取当前时间
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 获取资源信息
cpu_usage = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent
disk_usage = psutil.disk_usage("/").percent
# 打印采集结果
print(f"{current_time} | CPU: {cpu_usage}% | 内存: {memory_usage}% | 磁盘: {disk_usage}%")
# 等待5秒
time.sleep(5)
except KeyboardInterrupt:
print("\n采集任务已停止")

小结:

定时采集就是使用while True + (time.sleep())方法实现?

try…except作用?答:异常处理

datetime模块通过(datetime.now())方法可以获取当前系统时间?

五、CSV数据存储

1、CSV数据存储

CSV(Comma-Separated Values)是一种简单的表格数据存储格式,适合以下场景:

  • 存储结构化数据(如时间戳、CPU、内存使用率等)。 => 有行有列二维表格
  • 易于用Excel或数据分析工具(如Pandas)打开和处理。

2、Python中的CSV模块

  • import csv:导入csv模块。
  • csv.writer:创建一个csv对象。
  • csv对象.writerow:向CSV文件中写入一行。

案例:

1
2
3
4
5
6
7
8
9
10
# 1. 导入模块
import csv
# 2. 新建CSV文件 => 初始化表头 => 写入数据行
with open('test.csv', 'w', newline='', encoding='utf-8-sig') as file:
# 3. 初始化CSVdui
writer = csv.writer(file)
# 4. 写入表头信息
writer.writerow(['系统时间', 'CPU使用率(%)', '内存使用率(%)', '磁盘使用率(%)'])
# 5. 写入数据行
writer.writerow(['2025-01-20 16:24:30', 80, 60, 75])

3、CSV文件存储实现

需求:把定时采集到的数据,时间、CPU使用率、内存使用率、磁盘使用率信息写入到CSV文件

第一步:初始化CSV文件

  • 在文件的第一行写入表头,用于说明每列的意义。

第二步:追加数据行

  • 使用a模式打开CSV文件,逐行写入数据。

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import psutil
import csv
import time
from datetime import datetime

# 定义CSV文件路径
OUTPUT_FILE = "system_resource_usage.csv"

# 初始化CSV文件
with open(OUTPUT_FILE, "w", newline="", encoding='utf-8-sig') as file:
writer = csv.writer(file)
# 写入表头
writer.writerow(["Timestamp", "CPU Usage (%)", "Memory Usage (%)", "Disk Usage (%)"])

# 定时采集系统资源并写入CSV文件
try:
print("开始定时采集系统资源并保存到CSV文件,按Ctrl+C停止...")
while True:
# 获取当前时间
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 获取资源信息
cpu_usage = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent
disk_usage = psutil.disk_usage("/").percent
# 写入CSV文件
with open(OUTPUT_FILE, "a", newline="") as file:
writer = csv.writer(file)
writer.writerow([current_time, cpu_usage, memory_usage, disk_usage])
# 打印采集结果
print(f"{current_time} | CPU: {cpu_usage}% | 内存: {memory_usage}% | 磁盘: {disk_usage}%")
# 等待5秒
time.sleep(5)
except KeyboardInterrupt:
print(f"\n采集任务已停止,数据已保存到 {OUTPUT_FILE}")

六、requests模块

作用:requests模块主要用于实现http请求(get请求、post请求),工作中主要用于爬虫、接口调用以及企微告警!

1、模块介绍

requests 是一个非常流行的 Python 库,提供了简洁的 HTTP 请求处理方式。

HTTP请求:GET请求以及POST请求

获取数据一般使用GET请求

发送数据一般使用POST请求,比如登录等等

2、安装 requests

在命令行中输入以下命令安装 requests

1
pip install requests -i https://pypi.tuna.tsinghua.edu.cn/simple

3、发送get请求

GET 请求用于从服务器获取数据,通常用于获取网页内容或 API 数据。

1
2
3
4
5
6
7
8
9
10
import requests

# 1. 准备url地址
url = "http://www.baidu.com"

# 2. 发送请求 获取响应 : requests.get(url) 调用完 get方法之后 会返回一个response响应对象
response = requests.get(url)

# 打印响应的内容(html字符串内容)
print(response.text)

常见错误:

img

4、response 响应对象的属性和方法

☆ response.text

获取响应的字符串内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import requests

# 1. 准备url地址
url = "http://www.baidu.com"

# 2. 发送请求 获取响应 : requests.get(url) 调用完 get方法之后 会返回一个response响应对象
response = requests.get(url)

# 打印响应的内容(html字符串内容)
# 使用 response.text 获取的的就是响应的html字符串内容
# 数据在互联网 进行传输的时候,都是使用的 bytes类型, response.text 拿到字符串内容的时候
# 也是对数据进行 解码,解码的时候 使用的编码格式 是推测出来的
# 查看 使用response.text 解码的时候,使用的编码格式
print(response.encoding)
# print(response.text)
# 解决使用 response.text 乱码的问题,我们可以,在调用 text 属性之前,指定解码的编码方式
response.encoding = "utf-8"

print(response.text)

print(type(response.text))

☆ response.content

获取响应的是字节流(bytes类型)的数据, 如果想要拿到字符串类型的数据可以使用以下方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
response.content.decode()  
# 1. 导入rquests模块
import requests

# 2. 定义一个url地址
url = 'http://www.baidu.com'

# 3. 发送get请求
response = requests.get(url)

# 4. 获取响应结果
print(response.content) # 字节流数据
print(type(response.content)) # <class 'bytes'>
print(response.content.decode('utf-8')) # decode,对字节流数据进行解码

问题:requests模块发起请求后,如果结果保存在response中,则response.text与response.content有何区别?

答:response.txt返回字符串,需要通过response.encoding解决中文乱码

response.content返回字节流,需要通过decode进行解码才能查看文本信息

5、requests 携带 headers

默认使使用requests模块去发送请求的时候,使用的User-Agent不是一个正常的浏览器的UA,服务器会监测到,监测到之后,就会返回一些假数据 或者是不完整的数据,

如果我们想要获取到完整的数据,就可以在发送请求的时候,携带上一个正常的浏览器的User-Agent。

目的:就是为了让我们的爬取程序伪装的更像是一个正常的浏览器在请求对方的服务器,进而拿到正确的数据。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
requests.get(url, headers={})

字典中的键值对,就是我们直接从浏览器中复制过来的请求头中的字段,
# 1. 导入模块
import requests

# 2. 定义url
url = "http://www.baidu.com"

# 3. 定义请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"
}

# 4. 发送请求
response = requests.get(url, headers=headers)

# 5. 打印返回结果
print(response.status_code) # 响应状态码200、404
print(response.text) # 获取响应内容

6、requests 携带参数

1
https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&rsv_idx=1&tn=baidu&wd=%E9%BB%91%E9%A9%AC%E7%A8%8B%E5%BA%8F%E5%91%98&fenlei=256&oq=%25E5%2590%25B4%25E4%25BA%25A6%25E5%2587%25A1&rsv_pq=ba9c81ff00d46df8&rsv_t=e889fT1nEVc7LzjqPYkcDxgiM9lu2eu9sJantVoWaIudn0OUpNFizzxFtns&rqlang=cn&rsv_dl=tb&rsv_enter=1&rsv_sug3=15&rsv_sug1=1&rsv_sug7=100&rsv_sug2=0&rsv_btype=t&inputT=2394&rsv_sug4=2394

查询字符串参数:query_string

格式: 以 ? 开始,? 后面 key=value 如果有多个参数,每个参数之间使用 & 符号进行连接。

在使用requests模块的时候想要去携带查询字符串参数如何去操作:

第一种方式: 直接对url地址去发送请求即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
https://www.baidu.com/s?wd=%E9%BB%91%E9%A9%AC%E7%A8%8B%E5%BA%8F%E5%91%98
import requests

# 1. 准备url地址
url = "https://www.baidu.com/s?wd=黑马程序员"
# 准备请求头的字典
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.164 Safari/537.36"
}

# 2. 发送请求 获取响应
response = requests.get(url, headers=headers)

# 3. 将响应的内容 保存到一个html文件中
with open('heima.html', 'w', encoding='utf-8') as f:
f.write(response.content.decode())

经验:查询字符串参数,必传参数 和 非必传参数 ,怎么去判断 必传 和非必传参数呢? 试

第二种方式: 使用 get方法中提供的一参数,params

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
requests.get(url, params={})

构造请求 参数的字典,只需要将 等号左边的内容 作为字典key,等号右边的内容作为value
{"wd": "黑马程序员"}
import requests

# 1. 准备url地址 url地址中如果有查询字符串参数,在使用params参数携带参数的时候,
# requests模块会自动的将 url地址中缺少的 ? 给补充上
url = "https://www.baidu.com/s"
# 准备参数的字典
params = {
"wd": "黑马程序员"
}
# 准备请求头的字典
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.164 Safari/537.36"
}

# 2. 发送请求 获取响应
response = requests.get(url,params=params, headers=headers)

# 3. 将响应的内容 保存到一个html文件中
with open('heima.html', 'w', encoding='utf-8') as f:
f.write(response.content.decode())

# 获取响应的url地址
print(response.url)

7、发送post请求

在 HTTP 协议中,GETPOST 是两种最常用的请求方法,用于客户端与服务器之间的通信。它们的主要区别在于数据传递的方式和使用场景。

案例:

1
2
3
4
5
6
7
8
9
import requests

url = "https://httpbin.org/post"
data = {"username": "admin", "password": "123456"}

response = requests.post(url, json=data)

print("状态码:", response.status_code)
print("响应内容:", response.json())

URL地址:

请求数据:

data 是一个 Python 字典,包含了需要提交的键值对数据。 使用 json=data 参数指定将数据以 JSON 格式发送。

发送请求:

requests.post(url, json=data) 将数据作为 JSON 负载发送到服务器。

响应处理:

response.status_code:打印响应状态码(如200表示成功)。 response.json():将响应内容解析为 Python 字典。

8、GET与POST区别对比

记住3条zuo上即可

特性 GET POST
数据传输方式 数据通过 URL 参数(Query String)传递。 数据通过请求体(Request Body)传递。
数据大小限制 有限制(具体取决于浏览器和服务器)。 理论上无大小限制(受服务器配置影响)。
请求用途 通常用于获取数据(无副作用)。 通常用于提交数据(可能更改服务器状态)。
安全性 数据直接暴露在 URL 中,安全性较低。 数据存储在请求体中,相对更安全。
缓存 浏览器会缓存 GET 请求的结果。 默认不会缓存 POST 请求。
可见性 数据在 URL 中可见,适合发送非敏感数据。 数据不可见,适合发送敏感数据(如密码)。
使用场景 数据查询、静态页面加载。 数据提交、文件上传、登录操作等。

企微接口文档地址:https://developer.work.weixin.qq.com/document/path/91770

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import requests
# 企业微信机器人 Webhook URL
WEBHOOK_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=aa1ec80c-221b-4d96-a2cd-ceffca46eba9"

headers = {"Content-Type": "application/json"}
message = "企业微信告警测试!!!"
data = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
response = requests.post(WEBHOOK_URL, json=data, headers=headers)
if response.status_code == 200:
print("企业微信告警发送成功")
else:
print(f"企业微信告警发送失败,状态码: {response.status_code}")
except Exception as e:
print(f"发送企业微信告警失败: {e}")

七、阈值检测与企微报警

1、任务背景

在生产环境中,资源使用率(如CPU、内存、磁盘)过高会导致性能问题甚至系统崩溃。为了确保系统的稳定运行,需要设置资源使用率的阈值(如CPU使用率 > 80%),并在超出阈值时触发报警,同时记录报警信息到日志文件以便后续排查。

2、任务拆解

  • 定时监控系统资源(CPU、内存、磁盘)的使用率。
  • 判断资源使用是否超过设定的阈值(如CPU > 80%,内存 > 90%,磁盘 > 85%)。
  • 超过阈值时,触发报警并将报警信息记录到日志文件。

3、任务实现

企微接口文档地址:https://developer.work.weixin.qq.com/document/path/91770

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# 导入相关模块
import psutil
import time
import requests
from datetime import datetime

LOG_FILE = 'resource_alert.log'

# 定义阈值 以及 企业告警链接地址
CPU_THRESHOLD = 80
MEMORY_THRESHOLD = 90
DISK_THRESHOLD = 85

WEBHOOK_URL = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=aa1ec80c-221b-4d96-a2cd-ceffca46eba9'

# 日志记录函数
def log_alert(resource_type, usage, threshold):
# 获取当前时间 => 记录大概什么时间发生了本次故障
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 本次资源采集涉及到CPU、内存、磁盘等信息,需要在日志文件中对其进行区分以及记录
# [2025-05-14 11:04:05] CPU使用率过高,超过了阈值80%,当前使用率为90%
message = f'[{current_time}] {resource_type}使用率过高,超过了阈值{threshold}%, 当前使用率为{usage}%'
# 打开文件并写入日志到文件中
with open(LOG_FILE, 'a', encoding='utf-8') as file:
file.write(message + '\n')
print(message)
# 接口开发
def send_wechat_alert(message):
# 定义header头信息
headers = {'Content-Type': 'application/json'}
# 定义传输数据(要求是一个字典)
data = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
response = requests.post(url=WEBHOOK_URL, headers=headers, json=data)
if response.status_code == 200:
print('资源使用率告警已被触发,已通过企业微信发送给运维人员!')
else:
print('资源使用率告警已被触发,但通过企业微信发送给运维人员失败!')
except Exception as e:
print(f'企业微信接口调用失败,错误信息为:{e}')


def check_resource_usage():
# 获取CPU使用率
cpu_usage = psutil.cpu_percent(interval=1)
if cpu_usage > CPU_THRESHOLD:
log_alert('CPU', cpu_usage, CPU_THRESHOLD)
send_wechat_alert(f'CPU使用率过高,超过了阈值{CPU_THRESHOLD}%, 当前使用率为{cpu_usage}%')
# 获取内存使用率
memory_usage = psutil.virtual_memory().percent
if memory_usage > MEMORY_THRESHOLD:
log_alert('内存', memory_usage, MEMORY_THRESHOLD)
send_wechat_alert(f'内存使用率过高,超过了阈值{MEMORY_THRESHOLD}%, 当前使用率为{memory_usage}%')
# 获取磁盘使用率
disk_usage = psutil.disk_usage('/').percent
if disk_usage > DISK_THRESHOLD:
log_alert('磁盘', disk_usage, DISK_THRESHOLD)
send_wechat_alert(f'磁盘使用率过高,超过了阈值{DISK_THRESHOLD}%, 当前使用率为{disk_usage}%')
# 打印出当前资源使用率
print(f'CPU使用率为{cpu_usage}%,内存使用率为{memory_usage}%,磁盘使用率为{disk_usage}%')

if __name__ == '__main__':
try:
print('正在采集资源使用率数据...')
while True:
check_resource_usage()
time.sleep(5)
except KeyboardInterrupt:
print('本次数据采集已结束!')