This commit is contained in:
晴月 2025-04-08 09:40:33 +08:00
parent ed0f786e01
commit f58665e192
4 changed files with 45 additions and 111 deletions

111
main.py
View File

@ -1,111 +0,0 @@
import paramiko
from PIL import Image, ImageDraw, ImageFont
import textwrap
from queue import Queue
import threading
import time
class SSHMonitor:
def __init__(self, host, port, user, password, n_history=10):
self.host = host
self.port = port
self.user = user
self.password = password
self.history = []
self.n_history = n_history
self._init_ssh()
self._response_queue = Queue()
self.font = ImageFont.load_default()
def _init_ssh(self):
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.host, self.port, self.user, self.password)
self.channel = self.client.invoke_shell()
self.channel.settimeout(0.1)
# 读取欢迎信息
time.sleep(1)
self._read_output()
def _read_output(self):
output = b''
while self.channel.recv_ready():
output += self.channel.recv(1024)
return output.decode('utf-8', 'ignore')
def run_interactive(self):
def output_reader():
while True:
try:
output = self._read_output()
if output:
self._response_queue.put(output)
except:
break
reader_thread = threading.Thread(target=output_reader)
reader_thread.daemon = True
reader_thread.start()
try:
while True:
command = input("$ ")
if command.startswith("capture"):
n = int(command.split()[1])
self.save_history_to_image(n)
continue
self.channel.send(command + "\n")
time.sleep(1) # 等待命令执行
output = self._response_queue.get()
# 记录历史
self.history.append({
'command': command,
'output': output.strip()
})
if len(self.history) > self.n_history:
self.history.pop(0)
except KeyboardInterrupt:
self.client.close()
def save_history_to_image(self, n):
if n > len(self.history):
n = len(self.history)
lines = []
for entry in self.history[-n:]:
lines.append(f"$ {entry['command']}")
lines.append(entry['output'])
lines.append("-" * 40)
text = "\n".join(lines)
# 创建图像
img = Image.new('RGB', (800, 600), color=(255, 255, 255))
d = ImageDraw.Draw(img)
margin = 10
x = margin
y = margin
for line in textwrap.wrap(text, width=100):
d.text((x, y), line, font=self.font, fill=(0, 0, 0))
y += 15
img.save(f'ssh_history_{time.time()}.png')
if __name__ == "__main__":
# 配置SSH连接信息
monitor = SSHMonitor(
host='your_host',
port=22,
user='your_user',
password='your_password'
)
monitor.run_interactive()

6
第七周/pytest.ini Normal file
View File

@ -0,0 +1,6 @@
[pytest]
addopts = -s --html=report/myreport.html --self-contained-html
testpaths = ./
python_files = pytest*.py
python_classes = Test*
python_functions = test_*

16
第七周/pytest01.py Normal file
View File

@ -0,0 +1,16 @@
import pytest
import requests
class TestIhrmLogin(object):
@pytest.mark.parametrize("mobile,password,ast",[
("13800000002","929itheima.CN032@.20250408",(200,True,"操作成功!")),
("13804789371", "929itheima.CN032@.20250408", (200, False, "用户名或密码错误",20001)),
("13800000002", "123456789", (200, False, "用户名或密码错误", 20001))
])
def test_login(self,mobile,password,ast):
jsondata = {"mobile":mobile,"password":password}
rsp = requests.post("http://ihrm-java.itheima.net/api/sys/login", json=jsondata)
assert rsp.status_code == ast[0]
assert rsp.json().get("success") == ast[1]
assert rsp.json().get("message") == ast[2]
if len(ast)==4:
assert rsp.json().get("code") == ast[3]

23
第七周/pytest02.py Normal file
View File

@ -0,0 +1,23 @@
import pytest
import requests
BASE_URL = 'http://kdtx-test.itheima.net/api'
class TestkdtxLogin(object):
@pytest.mark.parametrize("username,password,ast",[
("manager","HM_2023_test",("操作成功",200,{"name": "测试开发提升课01", "subject": "6", "price": 899, "applicablePerson": "2","info": "测试开发提升课01"})),
("manager", "123456", ("用户不存在/密码错误", 500)),
("uusseerr", "HM_2023_test", ("用户不存在/密码错误", 500)),
])
def test_login(self,username,password,ast):
uuid = requests.get(f'{BASE_URL}/captchaImage').json()['uuid']
login_data = {"username": username, "password": password, "code": "2", "uuid": uuid}
login_res = requests.post(f'{BASE_URL}/login', json=login_data)
assert login_res.json()['msg'] == ast[0]
assert login_res.json()['code'] == ast[1]
if len(ast)==3:
token, cookies = login_res.json()['token'], login_res.cookies.get_dict()
course_data = ast[2]
add_course_res = requests.post(f'{BASE_URL}/clues/course', json=course_data, headers={'Authorization': token},cookies=cookies)
assert add_course_res.json()['msg'] == ast[0]
assert add_course_res.json()['code'] == ast[1]