Python仿wireshark

# coding=utf-8
import datetime
import threading
import tkinter
from tkinter import *
from tkinter import font, filedialog
from tkinter.constants import *
from tkinter.messagebox import askyesno
from tkinter.scrolledtext import ScrolledText
from tkinter.ttk import Treeview

from scapy.layers.inet import *
from scapy.layers.l2 import *
from scapy.all import *
# 用来终止抓包线程的线程事件
stop_sending = threading.Event()
# 用来给抓到扥数据包编号
packet_id = 1
# 用来存放抓到的数据包
packet_list =[]
# 暂停抓包的标志位
pause_flag = False
# 保存文件标志位
save_flag = False
# 停止抓包标志位
stop_flag=False

# 状态栏类
class StatusBar(Frame):
    def __init__(self, master):
        Frame.__init__(self, master)
        self.label = Label(self, bd=1, relief=SUNKEN, anchor=W)
        self.label.pack(fill=X)
    def set(self, fmt, *args):
        self.label.config(text=fmt % args)
        self.label.update_idletasks()
    def clear(self):
        self.label.config(text="")
        self.label.update_idletasks()


# 时间戳转为格式化的时间字符串
def timestamp2time(timestamp):
    time_array = time.localtime(timestamp)
    mytime = time.strftime("%Y-%m-%d %H:%M:%S", time_array)
    return mytime
"""
数据包列表单击事件响应函数,在数据包列表单击某数据包时,在协议解析区解析此数据包,并在hexdump区显示此数据包的十六进制内容
:param event: TreeView单击事件
:return: None
"""
def on_click_packet_list_tree(event):
    # event.widget获取Treeview对象,调用selection获取选择对象名称,返回结果为字符型元祖
    selected_item = event.widget.selection()
    # 清空packet_dissect_tree上现有的内容------------------------
    packet_dissect_tree.delete(*packet_dissect_tree.get_children())
    # 设置协议解析区的宽度
    packet_dissect_tree.column('Dissect', width=packet_list_frame.winfo_width())
    # 转换为整型
    packet_id = int(selected_item[0])-1
    # 取出要分析的数据包
    packet = packet_list[packet_id]
    #packet.show()
    lines = (packet.show(dump=True)).split('\n')  # dump=True返回字符串,不打出,\n换行符
    last_tree_entry = None
    for line in lines:
        if line.startswith('#'):
            line = line.strip('# ')  # 删除#
            last_tree_entry = packet_dissect_tree.insert('', 'end', text=line)  # 第一个参数为空表示根节点
        else:
            packet_dissect_tree.insert(last_tree_entry, 'end', text=line)
        col_width = font.Font().measure(line)
        # 根据新插入数据项的长度动态调整协议解析区的宽度
        if packet_dissect_tree.column('Dissect', width=None) < col_width:
            packet_dissect_tree.column('Dissect', width=col_width)

    if IP in packet:
        ip = packet[IP]
        ip_chksum = ip.chksum
        # ip.show()#抓到的IP报文
        ip.chksum = None
        # ip.show()
        ip_check = IP(raw(ip)).chksum
        ip.chksum = ip_chksum
        print(ip_chksum, "计算出的IP首部校验和:", ip_check)
        if TCP in packet:
            tcp = packet[TCP]
            tcp_chksum = tcp.chksum
            tcp.chksum = None
            tcp_check = TCP(raw(tcp)).chksum
            tcp.chksum = tcp_chksum
            print(tcp_chksum, "计算出的TCP检验和:", tcp_check)
            information = "IP与TCP的校验和检查通过\r\nIP的校验和为:{chksum_ip}\r\nTCP的检验和为:" \
                          "{chksum_tcp}".format(chksum_ip=ip_chksum, chksum_tcp=tcp_chksum)
            print(information)
            if ip_check == ip_chksum and tcp_check == tcp_chksum:
                tkinter.messagebox.showinfo("校验和的检查", information)
            else:
                tkinter.messagebox.showerror("校验和错误警告", "IP或TCP的校验和出错")
        elif UDP in packet:
            udp = packet[UDP]
            udp_chksum = udp.chksum
            udp.chksum = None
            # 重新计算数据包的校验和
            udp_check = UDP(raw(udp)).chksum
            udp.chksum = udp_chksum
            print(udp_chksum, "计算出的UDP检验和:", udp_check)
            information = "IP与UDP的校验和检查通过\r\nIP的校验和为:" \
                          "{chksum_ip}\r\nUDP的检验和为:{chksum_udp}".format(chksum_ip=ip_chksum, chksum_udp=udp_chksum)
            print(information)
            # 弹出提示窗口
            if ip_check == ip_chksum and udp_check == udp_chksum:
                tkinter.messagebox.showinfo("校验和的检查", information)
            else:
                tkinter.messagebox.showerror("校验和错误警告", "IP或UDP的校验和出错")

    # 在hexdump区显示此数据包的十六进制内容,不用修改
    hexdump_scrolledtext['state'] = 'normal'
    hexdump_scrolledtext.delete(1.0, END)
    hexdump_scrolledtext.insert(END, hexdump(packet, dump=True))
    hexdump_scrolledtext['state'] = 'disabled'

# 抓抓取数据包并保存
def capture_packet():
    # 设置过滤条件
    filters = fitler_entry.get()
    print("抓包条件:"+filters)
    # 设置停止抓包的条件stop_filter
    stop_sending.clear()
    global packet_list
    # 清空列表
    packet_list.clear()
    # 抓取数据包并将抓到的包存在列表中
    sniff(prn=(lambda x: process_packet(x)), filter=filters, stop_filter=(lambda x: stop_sending.is_set()))

# 处理抓到的数据包
def process_packet(packet):
    if pause_flag == False:
        global packet_list
        # 将抓到的包存在列表中
        packet_list.append(packet)
        # 抓包的时间
        packet_time= timestamp2time(packet.time)
        src = packet[Ether].src
        dst = packet[Ether].dst
        type = packet[Ether].type
        types = {0x0800:'IPv4',0x0806:'ARP',0x86dd:'IPv6',0x88cc:'LLDP',0x891D:'TTE'}
        if type in types:
           proto = types[type]
        else:
             proto = 'LOOP'  # 协议
        # IP
        if proto == 'IPv4':
            # 建立协议查询字典
            protos = {1: 'ICMP', 2: 'IGMP', 4: 'IP', 6: 'TCP', 8: 'EGP', 9: 'IGP', 17: 'UDP', 41: 'IPv6', 50: 'ESP', 89:'OSPF'}
            src = packet[IP].src
            dst = packet[IP].dst
            proto=packet[IP].proto
            if proto in protos:
                proto=protos[proto]
        # tcp
        if TCP in packet:
            protos_tcp = {80: 'Http', 443: 'Https', 23: 'Telnet', 21: 'Ftp', 20: 'ftp_data', 22: 'SSH', 25: 'SMTP'}
            sport = packet[TCP].sport
            dport = packet[TCP].dport
            if sport in protos_tcp:
                proto = protos_tcp[sport]
            elif dport in protos_tcp:
                proto = protos_tcp[dport]
        elif UDP in packet:
            if packet[UDP].sport == 53 or packet[UDP].dport == 53:
                proto = 'DNS'
        length = len(packet)  # 长度
        info = packet.summary()  # 信息
        global packet_id  # 数据包的编号
        packet_list_tree.insert("", 'end', packet_id, text=packet_id,
                            values=(packet_id, packet_time, src, dst, proto, length, info))
        packet_list_tree.update_idletasks()  # 更新列表,不需要修改
        packet_id = packet_id + 1


# 将抓到的数据包保存为pcap格式的文件
def save_captured_data_to_file():
    global save_flag
    save_flag = True
    # 默认打开位置initialdir='d:\\',默认命名initialfile='.pcap'
    filename=tkinter.filedialog.asksaveasfilename(title='保存文件', filetypes=[('所有文件', '.*'),
                                                                           ('数据包', '.pcap')], initialfile='.pcap')
    if filename.find('.pcap') == -1:
        # 默认文件格式为 pcap
        filename = filename+'.pcap'

    wrpcap(filename, packet_list)

# 开始按钮单击响应函数,如果是停止后再次开始捕获,要提示用户保存已经捕获的数据
def start_capture():
    """
    开新线程,进行抓包,sniff的prn 选项设置为包处理函数,仿照just_a_test()写,过滤器为filter选项
    :return:
    """
    # 暂停,停止,保存的标志位
    global pause_flag,stop_flag,save_flag
    # 已经停止,重新开始抓包但没进行保存操作
    if stop_flag is True and save_flag is False:
        resault = tkinter.messagebox.askyesnocancel("保存提醒", "是否保存抓到的数据包")
        if resault is False:
            print("直接开始不保存")
        elif resault is True:
            print("先保存数据包,在进行抓包")
            # 默认打开位置initialdir='d:\\',默认命名initialfile='.pcap'
            filename = tkinter.filedialog.asksaveasfilename(title='保存文件', filetypes=[('所有文件', '.*'),
                                                                                     ('数据包', '.pcap')], initialfile='.pcap')
            if filename.find('.pcap') == -1:
                # 默认文件格式为 pcap
                filename = filename + '.pcap'
            wrpcap(filename, packet_list)
        else:
            print("取消抓包操作")
            stop_flag = False
            return
    # 设置开始按钮为不可用,暂停按钮可操作
    start_button['state'] = DISABLED  # 不可操作
    save_button['state'] = DISABLED
    pause_button['state'] = NORMAL  # 可操作
    stop_button['state'] = NORMAL
    stop_flag = False
    if pause_flag is False:
        # 清空已经抓到的数据包列表--------------
        items = packet_list_tree.get_children()
        for item in items:
            packet_list_tree.delete(item)
        packet_list_tree.clipboard_clear()
        global packet_id
        packet_id = 1
        # 开启新线程进行抓包
        t = threading.Thread(target=capture_packet)
        t.setDaemon(True)
        t.start()
        save_flag = False
    else:
        pause_flag = False

# 暂停按钮单击响应函数
def pause_capture():
    """
    抓包处理函数停止运行,仍然在抓包
    :return:
    """
    # 设置开始按钮为可用,暂停按钮为不可用
    start_button['state'] = NORMAL  # 可操作
    pause_button['state'] = DISABLED  # 不可操作
    global pause_flag
    pause_flag = True

# 停止按钮单击响应函数
def stop_capture():
    """
    终止线程,停止抓包
    :return:
    """
    # 终止线程,停止抓包
    stop_sending.set()
    # 设置开始按钮为可用,暂停按钮为不可用,保存为可用
    start_button['state'] = NORMAL # 可操作
    pause_button['state'] = DISABLED  # 不可操作
    stop_button['state'] = DISABLED
    save_button['state'] = NORMAL
    global pause_flag, stop_flag
    pause_flag = False
    stop_flag = True
    # 不能用加号+,连接不同格式字符
    print("停止抓包,共抓到", packet_id, "个数据包")

# 退出按钮单击响应函数,退出程序前要提示用户保存已经捕获的数据
def quit_program():
    """
    保存数据的函数,wrpcap   : Write a list of packets to a pcap file
    :return:
    """
    #终止线程,停止抓包
    stop_sending.set()
    # 已经暂停,或停止,需要提示保存在退出
    if stop_flag is True or pause_flag is True:
        # 没进行保存操作
        if save_flag is False:

            resault = tkinter.messagebox.askyesnocancel("保存提醒", "是否保存抓到的数据包")
            if resault is False:
                print("直接退出不保存")
                tk.destroy()
            elif resault is True:
                print("先保存数据包,再退出")
                # 默认打开位置initialdir='d:\\',默认命名initialfile='.pcap'
                filename = tkinter.filedialog.asksaveasfilename(title='保存文件',
                                                                filetypes=[('所有文件', '.*'), ('数据包', '.pcap')],initialfile='.pcap')
                if filename.find('.pcap') == -1:
                    # 默认文件格式为 pcap
                    filename = filename + '.pcap'
                wrpcap(filename, packet_list)
                tk.destroy()
            else:
                print("取消退出")
        else:
            print("已经保存,直接退出")
            tk.destroy()
    else:
        print("直接关闭窗口")
        tk.destroy()
# ---------------------以下代码负责绘制GUI界面---------------------
tk = tkinter.Tk()
tk.title("协议分析器")
# tk.resizable(0, 0)
# 带水平分割条的主窗体
main_panedwindow = PanedWindow(tk, sashrelief=RAISED, sashwidth=5, orient=VERTICAL)

# 顶部的按钮及过滤器区
toolbar = Frame(tk)
start_button = Button(toolbar, width=8, text="开始", command=start_capture)
pause_button = Button(toolbar, width=8, text="暂停", command=pause_capture)
stop_button = Button(toolbar, width=8, text="停止", command=stop_capture)
save_button = Button(toolbar, width=8, text="保存数据", command=save_captured_data_to_file)
quit_button = Button(toolbar, width=8, text="退出", command=quit_program)
start_button['state'] = 'normal'
pause_button['state'] = 'disabled'
stop_button['state'] = 'disabled'
save_button['state'] = 'disabled'
quit_button['state'] = 'normal'
filter_label = Label(toolbar, width=10, text="BPF过滤器:")
fitler_entry = Entry(toolbar)
start_button.pack(side=LEFT, padx=5)
pause_button.pack(side=LEFT, after=start_button, padx=10, pady=10)
stop_button.pack(side=LEFT, after=pause_button, padx=10, pady=10)
save_button.pack(side=LEFT, after=stop_button, padx=10, pady=10)
quit_button.pack(side=LEFT, after=save_button, padx=10, pady=10)
filter_label.pack(side=LEFT, after=quit_button, padx=0, pady=10)
fitler_entry.pack(side=LEFT, after=filter_label, padx=20, pady=10, fill=X, expand=YES)
toolbar.pack(side=TOP, fill=X)

# 数据包列表区
packet_list_frame = Frame()
packet_list_sub_frame = Frame(packet_list_frame)
packet_list_tree = Treeview(packet_list_sub_frame, selectmode='browse')
packet_list_tree.bind('<<TreeviewSelect>>', on_click_packet_list_tree)
# 数据包列表垂直滚动条
packet_list_vscrollbar = Scrollbar(packet_list_sub_frame, orient="vertical", command=packet_list_tree.yview)
packet_list_vscrollbar.pack(side=RIGHT, fill=Y, expand=YES)
packet_list_tree.configure(yscrollcommand=packet_list_vscrollbar.set)
packet_list_sub_frame.pack(side=TOP, fill=BOTH, expand=YES)
# 数据包列表水平滚动条
packet_list_hscrollbar = Scrollbar(packet_list_frame, orient="horizontal", command=packet_list_tree.xview)
packet_list_hscrollbar.pack(side=BOTTOM, fill=X, expand=YES)
packet_list_tree.configure(xscrollcommand=packet_list_hscrollbar.set)
# 数据包列表区列标题
packet_list_tree["columns"] = ("No.", "Time", "Source", "Destination", "Protocol", "Length", "Info")
packet_list_column_width = [100, 180, 160, 160, 100, 100, 800]
packet_list_tree['show'] = 'headings'
for column_name, column_width in zip(packet_list_tree["columns"], packet_list_column_width):
    packet_list_tree.column(column_name, width=column_width, anchor='w')
    packet_list_tree.heading(column_name, text=column_name)
packet_list_tree.pack(side=LEFT, fill=X, expand=YES)
packet_list_frame.pack(side=TOP, fill=X, padx=5, pady=5, expand=YES, anchor='n')
# 将数据包列表区加入到主窗体
main_panedwindow.add(packet_list_frame)

# 协议解析区
packet_dissect_frame = Frame()
packet_dissect_sub_frame = Frame(packet_dissect_frame)
packet_dissect_tree = Treeview(packet_dissect_sub_frame, selectmode='browse')
packet_dissect_tree["columns"] = ("Dissect",)
packet_dissect_tree.column('Dissect', anchor='w')
packet_dissect_tree.heading('#0', text='Packet Dissection', anchor='w')
packet_dissect_tree.pack(side=LEFT, fill=BOTH, expand=YES)
# 协议解析区垂直滚动条
packet_dissect_vscrollbar = Scrollbar(packet_dissect_sub_frame, orient="vertical", command=packet_dissect_tree.yview)
packet_dissect_vscrollbar.pack(side=RIGHT, fill=Y)
packet_dissect_tree.configure(yscrollcommand=packet_dissect_vscrollbar.set)
packet_dissect_sub_frame.pack(side=TOP, fill=X, expand=YES)
# 协议解析区水平滚动条
packet_dissect_hscrollbar = Scrollbar(packet_dissect_frame, orient="horizontal", command=packet_dissect_tree.xview)
packet_dissect_hscrollbar.pack(side=BOTTOM, fill=X)
packet_dissect_tree.configure(xscrollcommand=packet_dissect_hscrollbar.set)
packet_dissect_frame.pack(side=LEFT, fill=X, padx=5, pady=5, expand=YES)
# 将协议解析区加入到主窗体
main_panedwindow.add(packet_dissect_frame)

# hexdump区
hexdump_scrolledtext = ScrolledText(height=10)
hexdump_scrolledtext['state'] = 'disabled'
# 将hexdump区区加入到主窗体
main_panedwindow.add(hexdump_scrolledtext)

main_panedwindow.pack(fill=BOTH, expand=1)

# 状态栏
status_bar = StatusBar(tk)
status_bar.pack(side=BOTTOM, fill=X)
#just_a_test()  # 展示一个数据包,不是抓来的
tk.mainloop()

 

阅读剩余
THE END