Files
markdownToHexo/auto_check_files.py
2026-01-14 21:27:38 +08:00

472 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from pickle import FALSE
import yaml
import shutil
import datetime
import configparser
from typing import Dict, List, Optional
from git import Repo
from apscheduler.schedulers.blocking import BlockingScheduler
# 配置文件路径,使用绝对路径确保能找到
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.ini')
# 全局变量,用于存储配置
config = configparser.ConfigParser()
# 全局变量,用于存储上一次检查的时间
last_check_time = None
def parse_markdown_frontmatter(file_path: str) -> Dict:
"""
读取Markdown文件并解析其frontmatter YAML属性
Args:
file_path (str): Markdown文件的路径
Returns:
dict: 解析后的frontmatter属性字典如果没有frontmatter则返回空字典
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查文件是否以frontmatter开头
if not content.startswith('---'):
return {}
# 找到frontmatter的结束位置
end_index = content.find('---', 3)
if end_index == -1:
return {}
# 提取frontmatter内容
frontmatter_content = content[3:end_index].strip()
# 解析YAML
frontmatter = yaml.safe_load(frontmatter_content)
# 确保返回的是字典类型
if frontmatter is None:
frontmatter = {}
return frontmatter
except FileNotFoundError:
print(f"文件 {file_path} 不存在")
return {}
except yaml.YAMLError as e:
print(f"解析文件 {file_path} 的frontmatter时出错: {e}")
return {}
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return {}
def is_updated_since_last_check(frontmatter: Dict, file_path: str, last_check: datetime.datetime) -> bool:
"""
检查文件是否在上一次检查时间之后更新
Args:
frontmatter (dict): 文件的frontmatter属性
file_path (str): 文件路径
last_check (datetime.datetime): 上一次检查的时间
Returns:
bool: 如果文件在上一次检查时间之后更新则返回True否则返回False
"""
# 尝试从frontmatter获取updated时间
updated_value = frontmatter.get('updated')
# 如果frontmatter没有updated字段使用文件的修改时间
if not updated_value:
try:
mtime = os.path.getmtime(file_path)
updated_dt = datetime.datetime.fromtimestamp(mtime)
except OSError as e:
print(f"无法获取文件 {file_path} 的修改时间: {e}")
return False
else:
# 检查updated_value是否已经是datetime对象
if isinstance(updated_value, datetime.datetime):
updated_dt = updated_value
else:
# 解析updated字符串为datetime对象
try:
# 支持多种时间格式
date_formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M',
'%Y-%m-%d',
'%Y/%m/%d %H:%M:%S',
'%Y/%m/%d %H:%M',
'%Y/%m/%d'
]
updated_dt = None
for fmt in date_formats:
try:
updated_dt = datetime.datetime.strptime(updated_value, fmt)
break
except ValueError:
continue
if updated_dt is None:
print(f"文件 {file_path} 的updated时间格式不正确: {updated_value}")
return False
except Exception as e:
print(f"解析文件 {file_path} 的updated时间时出错: {e}")
return False
# 检查是否在上一次检查时间之后更新
return updated_dt >= last_check
def copy_file_to_destination(source_path: str, destination_dir: str, repo: Repo, frontmatter: Dict) -> bool:
"""
将文件复制到目标目录并提交Git变更
Args:
source_path (str): 源文件路径
destination_dir (str): 目标目录路径
repo (Repo): Git仓库对象
frontmatter (dict): 文件的frontmatter属性
Returns:
bool: 复制和提交成功返回True否则返回False
"""
try:
# 从配置文件获取同名文件处理策略
overwrite_strategy = config['DEFAULT'].get('overwrite_strategy', 'overwrite')
# 确保目标目录存在
if not os.path.exists(destination_dir):
os.makedirs(destination_dir, exist_ok=True)
print(f"创建目标目录: {destination_dir}")
# 获取文件名和目标路径
file_name = os.path.basename(source_path)
file_base, file_ext = os.path.splitext(file_name)
destination_path = os.path.join(destination_dir, file_name)
existsResult = FALSE
# 处理同名文件
if os.path.exists(destination_path):
existsResult = True
if overwrite_strategy == 'skip':
print(f"跳过已存在文件: {destination_path}")
return False
elif overwrite_strategy == 'rename':
# 生成带时间戳的新文件名
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
new_file_name = f"{file_base}_{timestamp}{file_ext}"
destination_path = os.path.join(destination_dir, new_file_name)
print(f"文件已存在,重命名为: {new_file_name}")
# 默认overwrite策略直接覆盖
# 复制文件,保留元数据
shutil.copy2(source_path, destination_path)
print(f"成功拷贝: {source_path} -> {destination_path}")
# 获取提交信息
title = frontmatter.get('title', file_name)
if existsResult:
commit_message = f"update: 📝 更新 《{title}》文章内容"
else:
commit_message = f"feat: 📝 新建《{title}》文章"
# 添加文件到Git暂存区
repo.git.add(".")
# 提交变更
repo.git.commit('-m', commit_message)
print(f"成功提交: {commit_message}")
return True
except Exception as e:
print(f"拷贝或提交失败 {source_path}: {e}")
return False
def check_and_process_files(repo: Repo) -> Dict:
"""
检查并处理指定目录下的所有Markdown文件将上一次检查后更新的文件复制到目标目录并提交Git变更
Args:
repo (Repo): Git仓库对象
Returns:
dict: 处理结果统计
"""
global last_check_time
# 从配置文件获取参数
source_dir = config['DEFAULT'].get('source_dir')
destination_dir = config['DEFAULT'].get('destination_dir')
overwrite_strategy = config['DEFAULT'].get('overwrite_strategy', 'overwrite')
# 替换为当前目录进行测试(如果默认目录不存在)
if not os.path.exists(source_dir):
source_dir = os.getcwd()
print(f"默认源目录不存在,使用当前目录进行测试: {source_dir}")
# 确保目标目录存在
if not os.path.exists(destination_dir):
os.makedirs(destination_dir, exist_ok=True)
print(f"创建目标目录: {destination_dir}")
stats = {
'total_files': 0,
'md_files': 0,
'with_frontmatter': 0,
'updated_since_last_check': 0,
'copied_successfully': 0,
'copy_failed': 0
}
current_time = datetime.datetime.now()
print(f"\n{'='*80}")
print(f"开始检查文件,时间: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"上一次检查时间: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"源目录: {source_dir}")
print(f"目标目录: {destination_dir}")
print(f"同名文件处理策略: {overwrite_strategy}")
print(f"{'='*80}")
# 遍历源目录下的所有文件,忽略以`.`开头的文件夹
for root, dirs, files in os.walk(source_dir):
# 过滤掉以`.`开头的文件夹
dirs[:] = [d for d in dirs if not d.startswith('.')]
for file in files:
# 过滤掉以`.`开头的文件
if file.startswith('.'):
continue
# 只处理Markdown文件
if not file.lower().endswith('.md'):
continue
stats['total_files'] += 1
stats['md_files'] += 1
file_path = os.path.join(root, file)
# 解析frontmatter
frontmatter = parse_markdown_frontmatter(file_path)
if frontmatter:
stats['with_frontmatter'] += 1
# 检查是否在上一次检查时间之后更新
if is_updated_since_last_check(frontmatter, file_path, last_check_time):
stats['updated_since_last_check'] += 1
# 复制到目标目录并提交
if copy_file_to_destination(file_path, destination_dir, repo, frontmatter):
stats['copied_successfully'] += 1
else:
stats['copy_failed'] += 1
else:
# 对于没有frontmatter的文件创建一个默认的frontmatter
default_frontmatter = {
'title': os.path.basename(file_path)
}
# 使用文件修改时间判断
try:
mtime = os.path.getmtime(file_path)
updated_dt = datetime.datetime.fromtimestamp(mtime)
if updated_dt >= last_check_time:
stats['updated_since_last_check'] += 1
if copy_file_to_destination(file_path, destination_dir, repo, default_frontmatter):
stats['copied_successfully'] += 1
else:
stats['copy_failed'] += 1
except Exception as e:
print(f"无法处理文件 {file_path}: {e}")
# 更新最后检查时间
last_check_time = current_time
# 打印统计结果
print(f"\n{'='*80}")
print("处理结果统计:")
print(f"总文件数: {stats['total_files']}")
print(f"Markdown文件数: {stats['md_files']}")
print(f"包含frontmatter的文件数: {stats['with_frontmatter']}")
print(f"上一次检查后更新的文件数: {stats['updated_since_last_check']}")
print(f"成功复制的文件数: {stats['copied_successfully']}")
print(f"复制失败的文件数: {stats['copy_failed']}")
print(f"{'='*80}")
return stats
def load_config():
"""
从配置文件中读取配置,并为缺失的参数设置默认值
"""
global config, last_check_time
# 读取配置文件
config.read(CONFIG_FILE)
# 确保DEFAULT section存在
if 'DEFAULT' not in config:
config['DEFAULT'] = {}
# 设置默认值
default_config = {
'source_dir': '/opt/src',
'destination_dir': '/opt/doc/_post',
'overwrite_strategy': 'overwrite',
'remote_name': 'origin',
'branch': 'master'
}
# 更新配置文件中的缺失参数
for key, value in default_config.items():
if key not in config['DEFAULT'] or not config['DEFAULT'][key]:
config['DEFAULT'][key] = value
# 获取上一次检查时间
last_check_time_str = config['DEFAULT'].get('last_check_time')
if last_check_time_str:
last_check_time = datetime.datetime.strptime(last_check_time_str, '%Y-%m-%d %H:%M:%S')
else:
last_check_time = datetime.datetime.now()
# 将默认的上一次检查时间写入配置文件
config['DEFAULT']['last_check_time'] = last_check_time.strftime('%Y-%m-%d %H:%M:%S')
print(f"从配置文件中读取配置成功")
print(f"上一次检查时间: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"源目录: {config['DEFAULT'].get('source_dir')}")
print(f"目标目录: {config['DEFAULT'].get('destination_dir')}")
print(f"同名文件处理策略: {config['DEFAULT'].get('overwrite_strategy')}")
print(f"远程仓库名称: {config['DEFAULT'].get('remote_name')}")
print(f"分支名称: {config['DEFAULT'].get('branch')}")
def save_config():
"""
将配置写入配置文件,特别是更新上一次检查时间
"""
global config, last_check_time
# 更新上一次检查时间
config['DEFAULT']['last_check_time'] = last_check_time.strftime('%Y-%m-%d %H:%M:%S')
# 写入配置文件
with open(CONFIG_FILE, 'w') as f:
config.write(f)
print(f"配置文件保存成功,上一次检查时间更新为: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
def push_to_remote(repo: Repo) -> bool:
"""
将本地提交推送到远程仓库
Args:
repo (Repo): Git仓库对象
Returns:
bool: 推送成功返回True否则返回False
"""
try:
# 从配置文件获取远程仓库名称和分支名称
remote_name = config['DEFAULT'].get('remote_name', 'origin')
branch = config['DEFAULT'].get('branch', 'master')
# 检查远程仓库是否存在
if remote_name not in repo.remotes:
print(f"远程仓库 {remote_name} 不存在,无法推送")
return False
# 推送到远程仓库
repo.git.push(remote_name, branch)
print(f"成功推送到远程仓库 {remote_name}{branch} 分支")
return True
except Exception as e:
print(f"推送失败: {e}")
return False
def job():
"""
定时任务函数,执行文件检查和处理
"""
global last_check_time
# 从配置文件获取参数
source_dir = config['DEFAULT'].get('source_dir')
destination_dir = config['DEFAULT'].get('destination_dir')
# 替换为当前目录进行测试(如果默认目录不存在)
if not os.path.exists(source_dir):
source_dir = os.getcwd()
destination_dir = os.path.join(source_dir, 'recent_posts')
print(f"默认源目录不存在,使用当前目录进行测试: {source_dir}")
# 初始化或打开Git仓库
try:
# 检查目标目录是否已经是Git仓库
if os.path.exists(os.path.join(destination_dir, '.git')):
repo = Repo(destination_dir)
print(f"打开现有Git仓库: {destination_dir}")
else:
# 初始化新的Git仓库
repo = Repo.init(destination_dir)
print(f"初始化新Git仓库: {destination_dir}")
# 检查并处理文件
check_and_process_files(repo)
# 更新上一次检查时间
last_check_time = datetime.datetime.now()
# 保存配置,特别是更新上一次检查时间
save_config()
# 推送到远程仓库
push_to_remote(repo)
print("任务完成!")
except Exception as e:
print(f"Git操作失败: {e}")
print("任务失败!")
def main():
"""
主函数,启动定时任务
"""
# 加载配置文件
load_config()
# 立即执行一次检查
print(f"\n立即执行第一次检查")
job()
# 创建调度器
scheduler = BlockingScheduler()
# 添加定时任务每隔5分钟执行一次
scheduler.add_job(job, 'interval', minutes=5)
print(f"\n定时任务已启动每隔5分钟执行一次")
print(f"下次执行时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"按 Ctrl+C 停止任务")
try:
# 启动调度器
scheduler.start()
except KeyboardInterrupt:
print(f"\n定时任务已停止")
except Exception as e:
print(f"定时任务执行出错: {e}")
if __name__ == "__main__":
main()