cherry-studio/checkpoint.py
2025-04-18 04:06:39 +08:00

1074 lines
51 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import shutil
import datetime
import tkinter as tk
from tkinter import filedialog, messagebox, ttk, simpledialog # Added simpledialog
import threading
import json
import platform # To handle os.startfile alternatives
import subprocess # Needed for macOS/Linux open folder
class CheckpointApp:
def __init__(self, root):
self.root = root
self.root.title("项目检查点工具 v2.1") # Updated version
self.root.geometry("750x650")
self.root.resizable(True, True)
# 设置默认项目路径
self.project_path = os.path.abspath(os.path.dirname(__file__))
# 设置默认备份路径
self.backup_path = os.path.join(
os.path.dirname(self.project_path), "Backups")
# 设置默认排除的文件夹和文件
self.default_excludes = [
"node_modules",
"dist",
"out",
".git",
"release",
"__pycache__",
"*.log",
"*.lock",
"*.exe",
"*.dll",
"*.zip",
"*.tar.gz",
"checkpoint_config.json", # Exclude config file itself
"checkpoint_info.txt" # Exclude info file from being listed if somehow copied
]
# 加载配置
self.config_file = os.path.join(
self.project_path, "checkpoint_config.json")
self.load_config()
# 创建UI
self.create_ui()
# 初始化时填充检查点列表
self.populate_checkpoint_list()
def load_config(self):
"""加载配置文件"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
saved_project_path = config.get('project_path')
if saved_project_path and os.path.isdir(saved_project_path):
self.project_path = saved_project_path
else:
print(
f"配置文件中的项目路径无效或不存在: {saved_project_path}, 使用默认值。")
saved_backup_path = config.get('backup_path')
if saved_backup_path:
self.backup_path = saved_backup_path
else:
print(f"配置文件中的备份路径无效: {saved_backup_path}, 使用默认值。")
# Ensure excludes is always a list, even if loaded config is bad
loaded_excludes = config.get('excludes')
if isinstance(loaded_excludes, list):
self.excludes = loaded_excludes
else:
print(f"配置文件中的排除项格式无效, 使用默认值。")
self.excludes = list(self.default_excludes)
else:
self.excludes = list(self.default_excludes)
# Don't save immediately, let user do it or save on create
# self.save_config()
except json.JSONDecodeError as e:
messagebox.showerror(
"配置错误", f"加载配置文件时出错 (JSON 格式无效): {str(e)}\n将使用默认配置。")
self.excludes = list(self.default_excludes)
except Exception as e:
messagebox.showerror("错误", f"加载配置文件失败: {str(e)}\n将使用默认配置。")
self.excludes = list(self.default_excludes)
def save_config(self):
"""保存配置文件"""
try:
# Ensure paths are updated from UI vars before saving
self.project_path = self.project_path_var.get() if hasattr(
self, 'project_path_var') else self.project_path
self.backup_path = self.backup_path_var.get() if hasattr(
self, 'backup_path_var') else self.backup_path
self.excludes = list(self.exclude_listbox.get(0, tk.END)) if hasattr(
self, 'exclude_listbox') else self.excludes
config = {
'project_path': self.project_path,
'backup_path': self.backup_path,
'excludes': self.excludes
}
# Ensure the config file itself is saved in the project path
config_path_to_save = os.path.join(
self.project_path, "checkpoint_config.json")
with open(config_path_to_save, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=4)
self.config_file = config_path_to_save # Update the path used by the app
# Optionally provide feedback
# self.status_var.set("配置已保存")
except Exception as e:
messagebox.showerror("错误", f"保存配置文件失败: {str(e)}")
# self.status_var.set("保存配置失败")
def create_ui(self):
"""创建用户界面"""
# --- Main Frame ---
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Allow column 1 (entries/lists) to expand
main_frame.columnconfigure(1, weight=1)
# --- Row 0: Project Path ---
ttk.Label(main_frame, text="项目路径:").grid(
column=0, row=0, sticky=tk.W, pady=5, padx=5)
self.project_path_var = tk.StringVar(value=self.project_path)
ttk.Entry(main_frame, textvariable=self.project_path_var, width=60).grid(
column=1, row=0, sticky=(tk.W, tk.E), pady=5)
ttk.Button(main_frame, text="浏览...", command=self.browse_project_path).grid(
column=2, row=0, sticky=tk.W, padx=5, pady=5)
# --- Row 1: Backup Path ---
ttk.Label(main_frame, text="备份路径:").grid(
column=0, row=1, sticky=tk.W, pady=5, padx=5)
self.backup_path_var = tk.StringVar(value=self.backup_path)
ttk.Entry(main_frame, textvariable=self.backup_path_var, width=60).grid(
column=1, row=1, sticky=(tk.W, tk.E), pady=5)
ttk.Button(main_frame, text="浏览...", command=self.browse_backup_path).grid(
column=2, row=1, sticky=tk.W, padx=5, pady=5)
# --- Row 2: Checkpoint Name ---
ttk.Label(main_frame, text="检查点名称:").grid(
column=0, row=2, sticky=tk.W, pady=5, padx=5)
self.checkpoint_name_var = tk.StringVar(
value=f"checkpoint_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}")
ttk.Entry(main_frame, textvariable=self.checkpoint_name_var, width=60).grid(
column=1, row=2, sticky=(tk.W, tk.E), pady=5)
# --- Row 3: Excludes ---
ttk.Label(main_frame, text="排除项:").grid(column=0, row=3,
sticky=tk.NW, pady=5, padx=5) # Use NW for alignment
exclude_frame = ttk.Frame(main_frame)
exclude_frame.grid(column=1, row=3, sticky=(
tk.W, tk.E, tk.N, tk.S), pady=5)
exclude_frame.columnconfigure(0, weight=1)
exclude_frame.rowconfigure(0, weight=1)
self.exclude_listbox = tk.Listbox(
exclude_frame, width=50, height=6) # Reduced height a bit
self.exclude_listbox.grid(
column=0, row=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar = ttk.Scrollbar(
exclude_frame, orient=tk.VERTICAL, command=self.exclude_listbox.yview)
scrollbar.grid(column=1, row=0, sticky=(tk.N, tk.S))
self.exclude_listbox.config(yscrollcommand=scrollbar.set)
if isinstance(self.excludes, list): # Ensure self.excludes is iterable
for item in self.excludes:
self.exclude_listbox.insert(tk.END, item)
else:
print("Error: self.excludes is not a list during UI creation.")
self.excludes = list(self.default_excludes) # Reset to default
exclude_btn_frame = ttk.Frame(main_frame)
exclude_btn_frame.grid(
column=2, row=3, sticky=tk.NW, padx=5, pady=5) # Use NW
ttk.Button(exclude_btn_frame, text="添加",
command=self.add_exclude).pack(fill=tk.X, pady=2)
ttk.Button(exclude_btn_frame, text="删除选中", # Clarify text
command=self.remove_exclude).pack(fill=tk.X, pady=2)
ttk.Button(exclude_btn_frame, text="重置默认", # Clarify text
command=self.reset_excludes).pack(fill=tk.X, pady=2)
# --- Row 4: Separator ---
ttk.Separator(main_frame, orient=tk.HORIZONTAL).grid(
column=0, row=4, columnspan=3, sticky=(tk.W, tk.E), pady=10)
# --- Row 5: Checkpoint List ---
ttk.Label(main_frame, text="可用检查点:").grid(
column=0, row=5, sticky=tk.NW, pady=5, padx=5)
checkpoint_list_frame = ttk.Frame(main_frame)
checkpoint_list_frame.grid(
column=1, row=5, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
checkpoint_list_frame.columnconfigure(0, weight=1)
checkpoint_list_frame.rowconfigure(0, weight=1)
# Allow this row containing the list to expand vertically
main_frame.rowconfigure(5, weight=1)
cols = ("Name", "Date Created")
self.checkpoint_tree = ttk.Treeview(
checkpoint_list_frame, columns=cols, show='headings', height=8)
self.checkpoint_tree.grid(
column=0, row=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.checkpoint_tree.heading("Name", text="检查点名称")
self.checkpoint_tree.heading("Date Created", text="创建日期")
self.checkpoint_tree.column("Name", width=250, anchor=tk.W)
self.checkpoint_tree.column(
"Date Created", width=150, anchor=tk.CENTER)
tree_scrollbar = ttk.Scrollbar(
checkpoint_list_frame, orient=tk.VERTICAL, command=self.checkpoint_tree.yview)
tree_scrollbar.grid(column=1, row=0, sticky=(tk.N, tk.S))
self.checkpoint_tree.config(yscrollcommand=tree_scrollbar.set)
self.checkpoint_tree.bind(
'<<TreeviewSelect>>', self.on_checkpoint_select)
checkpoint_list_btn_frame = ttk.Frame(main_frame)
checkpoint_list_btn_frame.grid(
column=2, row=5, sticky=tk.NW, padx=5, pady=5)
ttk.Button(checkpoint_list_btn_frame, text="刷新列表",
command=self.populate_checkpoint_list).pack(fill=tk.X, pady=2)
self.restore_button = ttk.Button(
checkpoint_list_btn_frame, text="恢复检查点", command=self.restore_checkpoint, state=tk.DISABLED)
self.restore_button.pack(fill=tk.X, pady=2)
# --- NEW: Delete Button ---
self.delete_button = ttk.Button(
checkpoint_list_btn_frame, text="删除检查点", command=self.delete_checkpoint, state=tk.DISABLED)
self.delete_button.pack(fill=tk.X, pady=2)
# --- End New Button ---
# --- Row 6: Progress Bar ---
ttk.Label(main_frame, text="进度:").grid(
column=0, row=6, sticky=tk.W, pady=5, padx=5)
self.progress_var = tk.DoubleVar()
self.progress_bar = ttk.Progressbar(
main_frame, variable=self.progress_var, maximum=100)
self.progress_bar.grid(column=1, row=6, columnspan=2, sticky=(
tk.W, tk.E), pady=5, padx=(0, 5))
# --- Row 7: Status Label ---
self.status_var = tk.StringVar(value="就绪")
ttk.Label(main_frame, textvariable=self.status_var).grid(
column=1, row=7, columnspan=2, sticky=tk.W, pady=5)
# --- Row 8: Action Buttons ---
btn_frame = ttk.Frame(main_frame)
btn_frame.grid(column=0, row=8, columnspan=3, sticky=tk.E, pady=10)
ttk.Button(btn_frame, text="创建检查点", command=self.create_checkpoint).pack(
side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="打开备份文件夹", command=self.open_backup_folder).pack(
side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="保存配置", command=self.save_config).pack(
side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="退出", command=self.root.quit).pack(
side=tk.LEFT, padx=5)
def browse_project_path(self):
"""浏览项目路径"""
initial_dir = self.project_path_var.get()
if not os.path.isdir(initial_dir):
initial_dir = os.path.dirname(self.project_path)
path = filedialog.askdirectory(initialdir=initial_dir, title="选择项目路径")
if path:
self.project_path = os.path.abspath(path)
self.project_path_var.set(self.project_path)
# No auto-save, no config file path change needed here
def browse_backup_path(self):
"""浏览备份路径"""
initial_dir = self.backup_path_var.get()
if not os.path.isdir(initial_dir):
parent_dir = os.path.dirname(initial_dir)
if os.path.isdir(parent_dir):
initial_dir = parent_dir
else:
initial_dir = os.path.dirname(self.project_path)
path = filedialog.askdirectory(initialdir=initial_dir, title="选择备份路径")
if path:
self.backup_path = os.path.abspath(path)
self.backup_path_var.set(self.backup_path)
self.populate_checkpoint_list() # Refresh list after changing backup path
def add_exclude(self):
"""添加排除项"""
new_exclude = simpledialog.askstring(
"添加排除项", "输入要排除的文件夹/文件模式:", parent=self.root)
if new_exclude and new_exclude.strip():
item = new_exclude.strip()
if item not in self.exclude_listbox.get(0, tk.END):
self.exclude_listbox.insert(tk.END, item)
self.excludes = list(self.exclude_listbox.get(0, tk.END))
# self.save_config() # Manual save
def remove_exclude(self):
"""删除排除项"""
selected_indices = self.exclude_listbox.curselection()
if selected_indices:
for i in reversed(selected_indices):
self.exclude_listbox.delete(i)
self.excludes = list(self.exclude_listbox.get(0, tk.END))
# self.save_config() # Manual save
else:
messagebox.showwarning("无选择", "请在排除项列表中选择要删除的项目。")
def reset_excludes(self):
"""重置排除项为默认值"""
if messagebox.askyesno("确认", "确定要将排除项重置为默认列表吗?\n当前列表中的所有项都将被替换。", parent=self.root):
self.exclude_listbox.delete(0, tk.END)
self.excludes = list(self.default_excludes) # Make a copy
for item in self.excludes:
self.exclude_listbox.insert(tk.END, item)
# self.save_config() # Manual save
def start_file_explorer(self, path):
"""Opens the folder in the default file explorer."""
try:
abs_path = os.path.abspath(path) # Ensure path is absolute
if not os.path.exists(abs_path):
messagebox.showerror("错误", f"路径不存在: {abs_path}")
return
if not os.path.isdir(abs_path):
messagebox.showerror("错误", f"路径不是一个文件夹: {abs_path}")
return
if platform.system() == "Windows":
os.startfile(abs_path)
elif platform.system() == "Darwin": # macOS
subprocess.Popen(["open", abs_path])
else: # Linux and other Unix-like
subprocess.Popen(["xdg-open", abs_path])
except FileNotFoundError:
# This might happen if xdg-open or open isn't found
messagebox.showerror("错误", f"无法找到文件浏览器命令来打开路径: {path}")
except Exception as e:
messagebox.showerror("错误", f"无法打开文件夹: {str(e)}")
def open_backup_folder(self):
"""打开备份文件夹"""
current_backup_path = self.backup_path_var.get()
if os.path.exists(current_backup_path) and os.path.isdir(current_backup_path):
self.start_file_explorer(current_backup_path)
else:
if messagebox.askyesno("提示", f"备份文件夹 '{current_backup_path}' 不存在。\n是否立即创建?", parent=self.root):
try:
os.makedirs(current_backup_path, exist_ok=True)
self.start_file_explorer(current_backup_path)
except Exception as e:
messagebox.showerror("错误", f"无法创建备份文件夹: {str(e)}")
def should_exclude(self, path, base_path):
"""检查路径是否应该被排除 (更精确的检查)"""
try:
abs_path = os.path.abspath(path)
abs_base_path = os.path.abspath(base_path)
if not abs_path.startswith(abs_base_path + os.sep) and abs_path != abs_base_path:
# If path isn't within base_path, check only basename or extension
filename = os.path.basename(abs_path)
for exclude_pattern in self.excludes:
exclude_pattern = exclude_pattern.strip()
if not exclude_pattern:
continue
if exclude_pattern.startswith("*."):
if filename.endswith(exclude_pattern[1:]):
return True
elif filename == exclude_pattern:
# Check if it's a directory name match as well
if os.path.isdir(abs_path) and filename == exclude_pattern:
return True
# Check if it's a filename match
if os.path.isfile(abs_path) and filename == exclude_pattern:
return True
return False # Not in base, and no direct match
rel_path = os.path.relpath(abs_path, abs_base_path)
except ValueError:
# Handle cases where paths are on different drives on Windows
rel_path = os.path.basename(path)
# Normalize path separators for consistent matching
rel_path_normalized = rel_path.replace(os.sep, '/')
path_parts = rel_path_normalized.split('/')
filename = os.path.basename(path) # Use original path for basename
current_excludes = list(self.exclude_listbox.get(0, tk.END)) if hasattr(
self, 'exclude_listbox') else self.excludes
for exclude_pattern in current_excludes:
exclude_pattern = exclude_pattern.strip().replace(
os.sep, '/') # Normalize pattern
if not exclude_pattern:
continue
# 1. Wildcard extension match (e.g., *.log)
if exclude_pattern.startswith("*."):
# Case-insensitive extension match
if filename.lower().endswith(exclude_pattern[1:].lower()):
# print(f"Excluding '{rel_path}' due to wildcard '{exclude_pattern}'")
return True
# 2. Exact match for file or directory name anywhere in the path
# Avoid matching parts of names, e.g. 'node' should not exclude 'node_helper'
elif exclude_pattern in path_parts:
# Check if it's a full component match
idx = path_parts.index(exclude_pattern)
# If it's a directory match (not the last part, or it is the last part and the path IS a dir)
is_dir_match = (idx < len(path_parts) - 1 or (idx ==
len(path_parts) - 1 and os.path.isdir(path)))
# If it's a file match (last part and the path IS a file)
is_file_match = (idx == len(
path_parts) - 1 and os.path.isfile(path) and filename == exclude_pattern)
if is_dir_match or is_file_match:
# print(f"Excluding '{rel_path}' because component '{exclude_pattern}' is in excludes")
return True
# 3. Exact match for relative path start (e.g., build/ or specific_file.txt)
# Ensure it matches a full directory or the exact file name.
elif rel_path_normalized.startswith(exclude_pattern):
# Check if it's an exact match or matches a directory boundary
if (rel_path_normalized == exclude_pattern or
rel_path_normalized.startswith(exclude_pattern + '/')):
# print(f"Excluding '{rel_path}' due to start match '{exclude_pattern}'")
return True
return False
# --- Checkpoint Creation ---
def create_checkpoint(self):
"""创建检查点(在单独的线程中运行)"""
self.status_var.set("准备创建...")
self.progress_var.set(0)
# Get LATEST paths, name, and excludes from UI elements
self.project_path = self.project_path_var.get()
self.backup_path = self.backup_path_var.get()
checkpoint_name = self.checkpoint_name_var.get().strip()
self.excludes = list(self.exclude_listbox.get(0, tk.END))
if not checkpoint_name:
messagebox.showerror("错误", "检查点名称不能为空。")
self.status_var.set("已取消")
return
invalid_chars = '<>:"/\\|?*'
if any(char in checkpoint_name for char in invalid_chars):
messagebox.showerror("错误", f"检查点名称包含无效字符 ({invalid_chars})。")
self.status_var.set("已取消")
return
if not os.path.exists(self.project_path) or not os.path.isdir(self.project_path):
messagebox.showerror(
"错误", f"项目路径不存在或不是有效文件夹:\n{self.project_path}")
self.status_var.set("就绪")
return
if not os.path.exists(self.backup_path):
if messagebox.askyesno("确认", f"备份根目录不存在:\n{self.backup_path}\n\n是否现在创建它?", parent=self.root):
try:
os.makedirs(self.backup_path, exist_ok=True)
except Exception as e:
messagebox.showerror("错误", f"无法创建备份根目录: {str(e)}")
self.status_var.set("就绪")
return
else:
self.status_var.set("已取消")
return
elif not os.path.isdir(self.backup_path):
messagebox.showerror("错误", f"指定的备份路径不是有效文件夹:\n{self.backup_path}")
self.status_var.set("就绪")
return
# Save configuration BEFORE starting the thread
self.save_config()
thread = threading.Thread(
target=self._create_checkpoint_thread, args=(checkpoint_name,))
thread.daemon = True
thread.start()
def _create_checkpoint_thread(self, checkpoint_name):
"""在线程中执行检查点创建"""
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# Sanitize checkpoint name slightly for folder name (replace spaces, etc.)
safe_folder_name_part = "".join(c if c.isalnum() or c in [
'-', '_'] else '_' for c in checkpoint_name)
if not safe_folder_name_part:
safe_folder_name_part = "checkpoint" # fallback
base_backup_folder_name = f"{safe_folder_name_part}_{timestamp}"
backup_folder = os.path.join(
self.backup_path, base_backup_folder_name)
counter = 1
original_backup_folder = backup_folder
while os.path.exists(backup_folder):
backup_folder = f"{original_backup_folder}_{counter}"
counter += 1
if counter > 100:
raise Exception("无法创建唯一的备份文件夹名称,请检查备份目录。")
self.root.after(0, lambda: self.status_var.set("正在扫描项目文件..."))
files_to_copy = []
folders_to_create = set()
for root, dirs, files in os.walk(self.project_path, topdown=True, onerror=lambda e: print(f"Warning: Cannot access {e.filename} - {e.strerror}")):
# Store original dirs before filtering for exclusion check
original_dirs = list(dirs)
# Filter directories based on exclusion rules
dirs[:] = [d for d in dirs if not self.should_exclude(
os.path.join(root, d), self.project_path)]
current_rel_root = os.path.relpath(root, self.project_path)
# Add directories that are *not* excluded to the creation list
# Needed if a directory is empty but should still be backed up
if current_rel_root != '.':
if not self.should_exclude(root, self.project_path):
folders_to_create.add(current_rel_root)
for file in files:
src_path = os.path.join(root, file)
if not self.should_exclude(src_path, self.project_path):
files_to_copy.append(src_path)
file_rel_dir = os.path.dirname(
os.path.relpath(src_path, self.project_path))
if file_rel_dir and file_rel_dir != '.':
# Check if the parent dir itself should be excluded
parent_dir_path = os.path.join(
self.project_path, file_rel_dir)
if not self.should_exclude(parent_dir_path, self.project_path):
folders_to_create.add(file_rel_dir)
total_files = len(files_to_copy)
if total_files == 0 and not folders_to_create:
self.root.after(0, lambda: messagebox.showwarning(
"警告", "没有找到要备份的文件或文件夹(可能都被排除了)。"))
self.root.after(0, self._checkpoint_failed, "没有文件可备份")
return
self.root.after(0, lambda: self.status_var.set(
f"准备复制 {total_files} 个文件和创建文件夹..."))
os.makedirs(backup_folder, exist_ok=True)
# Normalize paths in folders_to_create before joining
normalized_folders = {f.replace(os.sep, '/')
for f in folders_to_create}
for folder_rel_path in sorted(list(normalized_folders)):
# Reconstruct platform-specific path for creation
dst_folder_path_parts = folder_rel_path.split('/')
dst_folder_path = os.path.join(
backup_folder, *dst_folder_path_parts)
try:
os.makedirs(dst_folder_path, exist_ok=True)
except OSError as e:
print(
f"Warning: Could not create directory {dst_folder_path}: {e}")
# Decide if this is critical - maybe continue copying files?
copied_count = 0
for i, src_path in enumerate(files_to_copy):
try:
rel_path = os.path.relpath(src_path, self.project_path)
# Normalize relative path before joining
normalized_rel_path = rel_path.replace(os.sep, '/')
dst_path_parts = normalized_rel_path.split('/')
dst_path = os.path.join(backup_folder, *dst_path_parts)
# Ensure destination directory exists (redundant check, but safe)
dst_dir = os.path.dirname(dst_path)
if not os.path.exists(dst_dir):
# This shouldn't happen if folder creation worked, but handle anyway
try:
os.makedirs(dst_dir, exist_ok=True)
# print(f"Created missing destination directory: {dst_dir}")
except Exception as mkdir_err:
print(
f"ERROR: Failed to create missing directory {dst_dir} for {dst_path}: {mkdir_err}")
# Skip this file or raise error? For now, print and continue.
continue # Skip this file
shutil.copy2(src_path, dst_path)
copied_count += 1
if (i + 1) % 10 == 0 or (i + 1) == total_files: # Update less frequently
progress = (i + 1) / total_files * \
100 if total_files > 0 else 100
status_msg = f"正在复制: {os.path.basename(src_path)} ({i+1}/{total_files})"
self.root.after(0, lambda p=progress,
s=status_msg: self._update_progress(p, s))
except Exception as copy_err:
print(
f"ERROR: Failed to copy {src_path} to {dst_path}: {copy_err}")
# Optionally update status or log this error more formally
# Continue with the next file
# --- Create Checkpoint Info File ---
info_file = os.path.join(backup_folder, "checkpoint_info.txt")
creation_time = datetime.datetime.now()
# Get current excludes list *as used for this backup*
excludes_used = list(self.exclude_listbox.get(0, tk.END)) if hasattr(
self, 'exclude_listbox') else self.excludes
with open(info_file, 'w', encoding='utf-8') as f:
# User-provided name
f.write(f"Checkpoint Name: {checkpoint_name}\n")
f.write(f"Original Project Path: {self.project_path}\n")
f.write(
f"Creation Time: {creation_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(
f"Timestamped Folder: {os.path.basename(backup_folder)}\n")
# Use actual copied count
f.write(f"Files Copied: {copied_count}\n")
f.write(
f"Folders Created (in backup): {len(folders_to_create)}\n")
f.write(f"Exclusions Used ({len(excludes_used)}):\n")
for excl in sorted(excludes_used): # Sort for consistency
f.write(f"- {excl}\n")
# --- Completion ---
self.root.after(
0, lambda bf=backup_folder, cc=copied_count: self._checkpoint_completed(bf, cc))
except Exception as e:
# Attempt to clean up partially created folder on error
if 'backup_folder' in locals() and os.path.exists(backup_folder):
try:
shutil.rmtree(backup_folder)
print(
f"Cleaned up incomplete backup folder: {backup_folder}")
except Exception as cleanup_e:
print(
f"Error cleaning up failed backup folder {backup_folder}: {cleanup_e}")
self.root.after(0, self._checkpoint_failed, str(e))
def _update_progress(self, progress, status):
"""更新进度条和状态(从线程中调用)"""
self.progress_var.set(progress)
self.status_var.set(status)
def _checkpoint_completed(self, backup_folder, copied_count):
"""检查点创建完成(从线程中调用)"""
self.progress_var.set(100)
self.status_var.set(f"检查点创建完成 ({copied_count} 文件)")
self.populate_checkpoint_list()
next_name = f"checkpoint_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.checkpoint_name_var.set(next_name)
result = messagebox.askquestion("成功",
f"检查点已成功创建:\n{os.path.basename(backup_folder)}\n({copied_count} 个文件已复制)\n\n是否打开包含此备份的文件夹?",
parent=self.root)
if result == 'yes':
# Open the main backup folder
self.start_file_explorer(self.backup_path_var.get())
def _checkpoint_failed(self, error_msg):
"""检查点创建失败(从线程中调用)"""
self.progress_var.set(0)
# Show truncated error
self.status_var.set(f"创建失败: {error_msg[:100]}...")
messagebox.showerror("错误", f"创建检查点失败: \n{error_msg}", parent=self.root)
# --- Checkpoint Listing & Restoration & Deletion ---
def populate_checkpoint_list(self):
"""填充可用检查点列表"""
# Clear existing items
for item in self.checkpoint_tree.get_children():
self.checkpoint_tree.delete(item)
current_backup_path = self.backup_path_var.get()
if not os.path.isdir(current_backup_path):
self.status_var.set("备份路径无效,无法列出检查点")
self.on_checkpoint_select() # Ensure buttons are disabled
return
checkpoints = []
try:
for item_name in os.listdir(current_backup_path):
item_path = os.path.join(current_backup_path, item_name)
info_file_path = os.path.join(item_path, "checkpoint_info.txt")
# Basic check: is it a directory AND does it contain the info file?
if os.path.isdir(item_path) and os.path.exists(info_file_path):
try:
# Try reading the info file for better name/date
name = item_name # Default to folder name
date_str = "N/A" # Default date
# Attempt to get modification time as a fallback date
try:
mtime = os.path.getmtime(item_path)
date_str = datetime.datetime.fromtimestamp(
mtime).strftime('%Y-%m-%d %H:%M:%S')
except Exception:
pass # Ignore errors getting mtime
with open(info_file_path, 'r', encoding='utf-8') as f:
for line in f:
if line.startswith("Checkpoint Name:"):
# Use strip() to remove potential leading/trailing whitespace
name = line.split(":", 1)[1].strip()
elif line.startswith("Creation Time:"):
date_str = line.split(":", 1)[1].strip()
# Optional: Validate date format here if needed
checkpoints.append(
{"id": item_path, "name": name, "date": date_str, "folder": item_name})
except Exception as e:
print(f"无法读取检查点信息 {item_name}: {e}")
# Add with basic info even if info file is corrupt/unreadable
checkpoints.append(
{"id": item_path, "name": f"{item_name} (信息读取错误)", "date": date_str, "folder": item_name})
# else: # Optional: Log items that look like checkpoints but aren't (e.g., missing info file)
# if os.path.isdir(item_path) and "checkpoint" in item_name.lower():
# print(f"Skipping potential checkpoint folder without info file: {item_name}")
except Exception as e:
messagebox.showerror("列表错误", f"扫描备份文件夹时出错: {e}")
self.on_checkpoint_select() # Ensure buttons are disabled
return
# Sort checkpoints, newest first based on folder name (timestamp)
# Robust sorting: try to extract timestamp, otherwise use folder name
def get_sort_key(cp):
parts = cp.get("folder", "").split('_')
if len(parts) >= 2:
# Try combining date and time parts if they look like YYYYMMDD_HHMMSS
potential_timestamp = "_".join(parts[-2:])
if len(potential_timestamp) == 15 and potential_timestamp[8] == '_':
return potential_timestamp
return cp.get("folder", "") # Fallback to full folder name
checkpoints.sort(key=get_sort_key, reverse=True)
# Populate Treeview
for cp in checkpoints:
self.checkpoint_tree.insert(
"", tk.END, iid=cp["id"], values=(cp["name"], cp["date"]))
# Update status and button states
if not checkpoints:
self.status_var.set("未找到检查点")
else:
self.status_var.set(f"找到 {len(checkpoints)} 个检查点")
# Update button states based on selection (likely none now)
self.on_checkpoint_select()
def on_checkpoint_select(self, event=None):
"""当 Treeview 中的选择发生变化时调用"""
selected_items = self.checkpoint_tree.selection()
if selected_items:
self.restore_button.config(state=tk.NORMAL)
self.delete_button.config(state=tk.NORMAL) # Enable delete button
else:
self.restore_button.config(state=tk.DISABLED)
# Disable delete button
self.delete_button.config(state=tk.DISABLED)
def restore_checkpoint(self):
"""启动恢复选定检查点的过程"""
selected_items = self.checkpoint_tree.selection()
if not selected_items:
messagebox.showwarning("无选择", "请先在列表中选择一个检查点进行恢复。")
return
selected_item_id = selected_items[0]
checkpoint_path_to_restore = selected_item_id
checkpoint_display_name = self.checkpoint_tree.item(
selected_item_id, "values")[0]
self.project_path = self.project_path_var.get()
if not os.path.isdir(self.project_path):
messagebox.showerror(
"错误", f"项目路径无效或不存在,无法恢复:\n{self.project_path}")
return
confirm = messagebox.askyesno("确认恢复",
f"确定要从检查点恢复项目吗?\n\n"
f"检查点: {checkpoint_display_name}\n"
f"源路径: {checkpoint_path_to_restore}\n"
f"目标项目路径: {self.project_path}\n\n"
f"警告:这将用备份中的文件覆盖项目路径中的现有文件。\n"
f"(注意:项目路径中多余的文件不会被删除)\n\n"
f"此操作无法撤销!",
icon='warning', parent=self.root)
if not confirm:
self.status_var.set("恢复已取消")
return
self.status_var.set("准备恢复...")
self.progress_var.set(0)
# Disable buttons during operation
self.restore_button.config(state=tk.DISABLED)
self.delete_button.config(state=tk.DISABLED)
self.root.update_idletasks()
thread = threading.Thread(target=self._restore_checkpoint_thread,
args=(checkpoint_path_to_restore, self.project_path))
thread.daemon = True
thread.start()
def _restore_checkpoint_thread(self, checkpoint_src_path, project_target_path):
"""在线程中执行恢复操作"""
try:
self.root.after(0, lambda: self.status_var.set("正在扫描备份文件..."))
files_to_copy = []
folders_to_create = set() # Folders needed in the target project path
# Walk the *backup* directory
for root, dirs, files in os.walk(checkpoint_src_path, topdown=True, onerror=lambda e: print(f"Warning: Cannot access {e.filename} - {e.strerror}")):
# Don't restore the info file itself
if "checkpoint_info.txt" in files:
files.remove("checkpoint_info.txt")
current_rel_root = os.path.relpath(root, checkpoint_src_path)
if current_rel_root != '.':
# Need to create this dir in the target
folders_to_create.add(current_rel_root)
for file in files:
src_file_path = os.path.join(root, file)
files_to_copy.append(src_file_path)
# Ensure the file's target directory is marked for creation
file_rel_dir = os.path.dirname(
os.path.relpath(src_file_path, checkpoint_src_path))
if file_rel_dir and file_rel_dir != '.':
folders_to_create.add(file_rel_dir)
total_files = len(files_to_copy)
if total_files == 0 and not folders_to_create:
self.root.after(0, lambda: messagebox.showwarning(
"恢复警告", "选定的检查点似乎是空的(没有文件或文件夹需要恢复)。"))
self.root.after(0, self._restore_failed, "检查点为空")
return
self.root.after(0, lambda: self.status_var.set(
f"找到 {total_files} 个文件和 {len(folders_to_create)} 个文件夹准备恢复..."))
# Normalize folder paths before creating
normalized_folders = {f.replace(os.sep, '/')
for f in folders_to_create}
# Create directories first
for folder_rel_path in sorted(list(normalized_folders)):
dst_folder_path_parts = folder_rel_path.split('/')
dst_folder_path = os.path.join(
project_target_path, *dst_folder_path_parts)
try:
if os.path.exists(dst_folder_path) and not os.path.isdir(dst_folder_path):
# Critical conflict: a file exists where a directory needs to be
raise Exception(
f"无法创建目录,因为同名文件已存在于目标路径: {dst_folder_path}")
os.makedirs(dst_folder_path, exist_ok=True)
except OSError as e:
print(
f"Warning: Could not create target directory {dst_folder_path}: {e}")
# Decide if this is critical - maybe continue copying files?
# Copy files
restored_count = 0
for i, backup_src_path in enumerate(files_to_copy):
try:
rel_path = os.path.relpath(
backup_src_path, checkpoint_src_path)
# Normalize relative path before joining
normalized_rel_path = rel_path.replace(os.sep, '/')
dst_path_parts = normalized_rel_path.split('/')
project_dst_path = os.path.join(
project_target_path, *dst_path_parts)
# Ensure target directory exists before copying
project_dst_dir = os.path.dirname(project_dst_path)
if not os.path.isdir(project_dst_dir):
# This might happen if folder creation failed or structure is unexpected
try:
os.makedirs(project_dst_dir, exist_ok=True)
# print(f"Created missing target directory during file copy: {project_dst_dir}")
except Exception as mkdir_err:
print(
f"ERROR: Failed to create target directory {project_dst_dir} for {project_dst_path}: {mkdir_err}")
# Skip this file or raise error?
continue # Skip this file
# Overwrites existing files
shutil.copy2(backup_src_path, project_dst_path)
restored_count += 1
if (i + 1) % 20 == 0 or (i + 1) == total_files:
progress = (i + 1) / total_files * \
100 if total_files > 0 else 100
status_msg = f"正在恢复: {os.path.basename(project_dst_path)} ({i+1}/{total_files})"
self.root.after(0, lambda p=progress,
s=status_msg: self._update_progress(p, s))
except Exception as copy_err:
print(
f"ERROR: Failed to restore {backup_src_path} to {project_dst_path}: {copy_err}")
# Continue with the next file
# --- Completion ---
self.root.after(0, self._restore_completed, restored_count)
except Exception as e:
self.root.after(0, self._restore_failed, str(e))
def _restore_completed(self, restored_count):
"""恢复完成时调用"""
self.progress_var.set(100)
self.status_var.set(f"恢复操作完成 ({restored_count} 文件)")
messagebox.showinfo(
"成功", f"项目已成功从所选检查点恢复。\n({restored_count} 个文件已复制/覆盖)", parent=self.root)
self.on_checkpoint_select() # Re-enable buttons if selection still valid
def _restore_failed(self, error_msg):
"""恢复失败时调用"""
self.progress_var.set(0)
self.status_var.set(f"恢复失败: {error_msg[:100]}...")
messagebox.showerror(
"恢复错误", f"恢复检查点时发生错误: \n{error_msg}", parent=self.root)
self.on_checkpoint_select() # Re-enable buttons if selection still valid
# --- NEW: Delete Checkpoint Methods ---
def delete_checkpoint(self):
"""启动删除选定检查点的过程"""
selected_items = self.checkpoint_tree.selection()
if not selected_items:
messagebox.showwarning("无选择", "请先在列表中选择一个检查点进行删除。")
return
selected_item_id = selected_items[0] # iid is the full path
checkpoint_path_to_delete = selected_item_id
checkpoint_display_name = self.checkpoint_tree.item(
selected_item_id, "values")[0]
checkpoint_folder_name = os.path.basename(checkpoint_path_to_delete)
# --- CRITICAL CONFIRMATION ---
confirm = messagebox.askyesno("确认删除",
f"确定要永久删除这个检查点吗?\n\n"
f"检查点名称: {checkpoint_display_name}\n"
f"文件夹: {checkpoint_folder_name}\n"
f"完整路径: {checkpoint_path_to_delete}\n\n"
f"警告:这将永久删除备份文件夹及其所有内容。\n"
f"此操作无法撤销!",
icon='warning', parent=self.root)
if not confirm:
self.status_var.set("删除已取消")
return
self.status_var.set(f"准备删除 '{checkpoint_folder_name}'...")
# Reset progress, though deletion might be quick
self.progress_var.set(0)
# Disable buttons during operation
self.restore_button.config(state=tk.DISABLED)
self.delete_button.config(state=tk.DISABLED)
self.root.update_idletasks()
# Start deletion in a separate thread
thread = threading.Thread(target=self._delete_checkpoint_thread,
args=(checkpoint_path_to_delete, checkpoint_folder_name))
thread.daemon = True
thread.start()
def _delete_checkpoint_thread(self, checkpoint_path, folder_name):
"""在线程中执行删除操作"""
try:
self.root.after(0, lambda: self.status_var.set(
f"正在删除 {folder_name}..."))
# Add a small delay for UI update if needed, though usually not necessary for deletion
# time.sleep(0.1)
if not os.path.exists(checkpoint_path) or not os.path.isdir(checkpoint_path):
# Check if it already disappeared somehow
raise FileNotFoundError(f"检查点文件夹似乎已不存在: {checkpoint_path}")
shutil.rmtree(checkpoint_path)
# Verify deletion
if os.path.exists(checkpoint_path):
# This shouldn't happen if rmtree succeeded without error
raise OSError(f"尝试删除后文件夹仍然存在: {checkpoint_path}")
# --- Completion ---
self.root.after(0, self._delete_completed, folder_name)
except Exception as e:
self.root.after(0, self._delete_failed, folder_name, str(e))
def _delete_completed(self, folder_name):
"""删除完成时调用"""
self.progress_var.set(100) # Or 0, as progress isn't really tracked
self.status_var.set(f"检查点 '{folder_name}' 已成功删除")
messagebox.showinfo(
"删除成功", f"检查点 '{folder_name}' 已被删除。", parent=self.root)
# Refresh the list to show the updated state
self.populate_checkpoint_list()
# populate_checkpoint_list will call on_checkpoint_select, disabling buttons
def _delete_failed(self, folder_name, error_msg):
"""删除失败时调用"""
self.progress_var.set(0)
self.status_var.set(f"删除 '{folder_name}' 失败: {error_msg[:100]}...")
messagebox.showerror(
"删除错误", f"删除检查点 '{folder_name}' 时发生错误: \n{error_msg}", parent=self.root)
# Refresh the list anyway, in case the folder is partially deleted or state is inconsistent
self.populate_checkpoint_list()
# Re-enable buttons based on current selection state after refresh
# self.on_checkpoint_select() # Called by populate_checkpoint_list
if __name__ == "__main__":
root = tk.Tk()
try:
from ttkthemes import ThemedTk
# Example themes: "arc", "plastik", "adapta", "ubuntu"
root = ThemedTk(theme="arc")
except ImportError:
print("ttkthemes not found, using default Tk theme.")
# Configure ttk styles for a slightly better default look if ttkthemes is missing
style = ttk.Style()
# print(style.theme_names()) # See available default themes
try:
# Try themes available on most platforms
current_theme = style.theme_use()
if 'vista' in style.theme_names():
style.theme_use('vista')
elif 'clam' in style.theme_names():
style.theme_use('clam')
elif 'alt' in style.theme_names():
style.theme_use('alt')
else:
# Keep the default if none of the preferred ones are found
style.theme_use(current_theme)
except Exception as e:
print(f"Could not set default ttk theme: {e}")
pass # Fallback to standard tk.Tk
app = CheckpointApp(root)
def on_closing():
# Optional: Ask to save config before closing
# if messagebox.askyesno("退出", "是否在退出前保存当前配置?", parent=root):
# app.save_config() # Ensure config reflects current UI state
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()