111 lines
3.1 KiB
Python
111 lines
3.1 KiB
Python
import paramiko
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
import textwrap
|
|
from queue import Queue
|
|
import threading
|
|
import time
|
|
|
|
|
|
class SSHMonitor:
|
|
def __init__(self, host, port, user, password, n_history=10):
|
|
self.host = host
|
|
self.port = port
|
|
self.user = user
|
|
self.password = password
|
|
self.history = []
|
|
self.n_history = n_history
|
|
self._init_ssh()
|
|
self._response_queue = Queue()
|
|
self.font = ImageFont.load_default()
|
|
|
|
def _init_ssh(self):
|
|
self.client = paramiko.SSHClient()
|
|
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
self.client.connect(self.host, self.port, self.user, self.password)
|
|
|
|
self.channel = self.client.invoke_shell()
|
|
self.channel.settimeout(0.1)
|
|
|
|
# 读取欢迎信息
|
|
time.sleep(1)
|
|
self._read_output()
|
|
|
|
def _read_output(self):
|
|
output = b''
|
|
while self.channel.recv_ready():
|
|
output += self.channel.recv(1024)
|
|
return output.decode('utf-8', 'ignore')
|
|
|
|
def run_interactive(self):
|
|
def output_reader():
|
|
while True:
|
|
try:
|
|
output = self._read_output()
|
|
if output:
|
|
self._response_queue.put(output)
|
|
except:
|
|
break
|
|
|
|
reader_thread = threading.Thread(target=output_reader)
|
|
reader_thread.daemon = True
|
|
reader_thread.start()
|
|
|
|
try:
|
|
while True:
|
|
command = input("$ ")
|
|
if command.startswith("capture"):
|
|
n = int(command.split()[1])
|
|
self.save_history_to_image(n)
|
|
continue
|
|
|
|
self.channel.send(command + "\n")
|
|
time.sleep(1) # 等待命令执行
|
|
output = self._response_queue.get()
|
|
|
|
# 记录历史
|
|
self.history.append({
|
|
'command': command,
|
|
'output': output.strip()
|
|
})
|
|
if len(self.history) > self.n_history:
|
|
self.history.pop(0)
|
|
|
|
except KeyboardInterrupt:
|
|
self.client.close()
|
|
|
|
def save_history_to_image(self, n):
|
|
if n > len(self.history):
|
|
n = len(self.history)
|
|
|
|
lines = []
|
|
for entry in self.history[-n:]:
|
|
lines.append(f"$ {entry['command']}")
|
|
lines.append(entry['output'])
|
|
lines.append("-" * 40)
|
|
|
|
text = "\n".join(lines)
|
|
|
|
# 创建图像
|
|
img = Image.new('RGB', (800, 600), color=(255, 255, 255))
|
|
d = ImageDraw.Draw(img)
|
|
margin = 10
|
|
x = margin
|
|
y = margin
|
|
|
|
for line in textwrap.wrap(text, width=100):
|
|
d.text((x, y), line, font=self.font, fill=(0, 0, 0))
|
|
y += 15
|
|
|
|
img.save(f'ssh_history_{time.time()}.png')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 配置SSH连接信息
|
|
monitor = SSHMonitor(
|
|
host='your_host',
|
|
port=22,
|
|
user='your_user',
|
|
password='your_password'
|
|
)
|
|
|
|
monitor.run_interactive() |