first commit

This commit is contained in:
cuianbing
2026-01-14 21:27:38 +08:00
commit 735abf2d32
5 changed files with 645 additions and 0 deletions

471
auto_check_files.py Normal file
View File

@@ -0,0 +1,471 @@
import os
from pickle import FALSE
import yaml
import shutil
import datetime
import configparser
from typing import Dict, List, Optional
from git import Repo
from apscheduler.schedulers.blocking import BlockingScheduler
# 配置文件路径,使用绝对路径确保能找到
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.ini')
# 全局变量,用于存储配置
config = configparser.ConfigParser()
# 全局变量,用于存储上一次检查的时间
last_check_time = None
def parse_markdown_frontmatter(file_path: str) -> Dict:
"""
读取Markdown文件并解析其frontmatter YAML属性
Args:
file_path (str): Markdown文件的路径
Returns:
dict: 解析后的frontmatter属性字典如果没有frontmatter则返回空字典
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查文件是否以frontmatter开头
if not content.startswith('---'):
return {}
# 找到frontmatter的结束位置
end_index = content.find('---', 3)
if end_index == -1:
return {}
# 提取frontmatter内容
frontmatter_content = content[3:end_index].strip()
# 解析YAML
frontmatter = yaml.safe_load(frontmatter_content)
# 确保返回的是字典类型
if frontmatter is None:
frontmatter = {}
return frontmatter
except FileNotFoundError:
print(f"文件 {file_path} 不存在")
return {}
except yaml.YAMLError as e:
print(f"解析文件 {file_path} 的frontmatter时出错: {e}")
return {}
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
return {}
def is_updated_since_last_check(frontmatter: Dict, file_path: str, last_check: datetime.datetime) -> bool:
"""
检查文件是否在上一次检查时间之后更新
Args:
frontmatter (dict): 文件的frontmatter属性
file_path (str): 文件路径
last_check (datetime.datetime): 上一次检查的时间
Returns:
bool: 如果文件在上一次检查时间之后更新则返回True否则返回False
"""
# 尝试从frontmatter获取updated时间
updated_value = frontmatter.get('updated')
# 如果frontmatter没有updated字段使用文件的修改时间
if not updated_value:
try:
mtime = os.path.getmtime(file_path)
updated_dt = datetime.datetime.fromtimestamp(mtime)
except OSError as e:
print(f"无法获取文件 {file_path} 的修改时间: {e}")
return False
else:
# 检查updated_value是否已经是datetime对象
if isinstance(updated_value, datetime.datetime):
updated_dt = updated_value
else:
# 解析updated字符串为datetime对象
try:
# 支持多种时间格式
date_formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M',
'%Y-%m-%d',
'%Y/%m/%d %H:%M:%S',
'%Y/%m/%d %H:%M',
'%Y/%m/%d'
]
updated_dt = None
for fmt in date_formats:
try:
updated_dt = datetime.datetime.strptime(updated_value, fmt)
break
except ValueError:
continue
if updated_dt is None:
print(f"文件 {file_path} 的updated时间格式不正确: {updated_value}")
return False
except Exception as e:
print(f"解析文件 {file_path} 的updated时间时出错: {e}")
return False
# 检查是否在上一次检查时间之后更新
return updated_dt >= last_check
def copy_file_to_destination(source_path: str, destination_dir: str, repo: Repo, frontmatter: Dict) -> bool:
"""
将文件复制到目标目录并提交Git变更
Args:
source_path (str): 源文件路径
destination_dir (str): 目标目录路径
repo (Repo): Git仓库对象
frontmatter (dict): 文件的frontmatter属性
Returns:
bool: 复制和提交成功返回True否则返回False
"""
try:
# 从配置文件获取同名文件处理策略
overwrite_strategy = config['DEFAULT'].get('overwrite_strategy', 'overwrite')
# 确保目标目录存在
if not os.path.exists(destination_dir):
os.makedirs(destination_dir, exist_ok=True)
print(f"创建目标目录: {destination_dir}")
# 获取文件名和目标路径
file_name = os.path.basename(source_path)
file_base, file_ext = os.path.splitext(file_name)
destination_path = os.path.join(destination_dir, file_name)
existsResult = FALSE
# 处理同名文件
if os.path.exists(destination_path):
existsResult = True
if overwrite_strategy == 'skip':
print(f"跳过已存在文件: {destination_path}")
return False
elif overwrite_strategy == 'rename':
# 生成带时间戳的新文件名
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
new_file_name = f"{file_base}_{timestamp}{file_ext}"
destination_path = os.path.join(destination_dir, new_file_name)
print(f"文件已存在,重命名为: {new_file_name}")
# 默认overwrite策略直接覆盖
# 复制文件,保留元数据
shutil.copy2(source_path, destination_path)
print(f"成功拷贝: {source_path} -> {destination_path}")
# 获取提交信息
title = frontmatter.get('title', file_name)
if existsResult:
commit_message = f"update: 📝 更新 《{title}》文章内容"
else:
commit_message = f"feat: 📝 新建《{title}》文章"
# 添加文件到Git暂存区
repo.git.add(".")
# 提交变更
repo.git.commit('-m', commit_message)
print(f"成功提交: {commit_message}")
return True
except Exception as e:
print(f"拷贝或提交失败 {source_path}: {e}")
return False
def check_and_process_files(repo: Repo) -> Dict:
"""
检查并处理指定目录下的所有Markdown文件将上一次检查后更新的文件复制到目标目录并提交Git变更
Args:
repo (Repo): Git仓库对象
Returns:
dict: 处理结果统计
"""
global last_check_time
# 从配置文件获取参数
source_dir = config['DEFAULT'].get('source_dir')
destination_dir = config['DEFAULT'].get('destination_dir')
overwrite_strategy = config['DEFAULT'].get('overwrite_strategy', 'overwrite')
# 替换为当前目录进行测试(如果默认目录不存在)
if not os.path.exists(source_dir):
source_dir = os.getcwd()
print(f"默认源目录不存在,使用当前目录进行测试: {source_dir}")
# 确保目标目录存在
if not os.path.exists(destination_dir):
os.makedirs(destination_dir, exist_ok=True)
print(f"创建目标目录: {destination_dir}")
stats = {
'total_files': 0,
'md_files': 0,
'with_frontmatter': 0,
'updated_since_last_check': 0,
'copied_successfully': 0,
'copy_failed': 0
}
current_time = datetime.datetime.now()
print(f"\n{'='*80}")
print(f"开始检查文件,时间: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"上一次检查时间: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"源目录: {source_dir}")
print(f"目标目录: {destination_dir}")
print(f"同名文件处理策略: {overwrite_strategy}")
print(f"{'='*80}")
# 遍历源目录下的所有文件,忽略以`.`开头的文件夹
for root, dirs, files in os.walk(source_dir):
# 过滤掉以`.`开头的文件夹
dirs[:] = [d for d in dirs if not d.startswith('.')]
for file in files:
# 过滤掉以`.`开头的文件
if file.startswith('.'):
continue
# 只处理Markdown文件
if not file.lower().endswith('.md'):
continue
stats['total_files'] += 1
stats['md_files'] += 1
file_path = os.path.join(root, file)
# 解析frontmatter
frontmatter = parse_markdown_frontmatter(file_path)
if frontmatter:
stats['with_frontmatter'] += 1
# 检查是否在上一次检查时间之后更新
if is_updated_since_last_check(frontmatter, file_path, last_check_time):
stats['updated_since_last_check'] += 1
# 复制到目标目录并提交
if copy_file_to_destination(file_path, destination_dir, repo, frontmatter):
stats['copied_successfully'] += 1
else:
stats['copy_failed'] += 1
else:
# 对于没有frontmatter的文件创建一个默认的frontmatter
default_frontmatter = {
'title': os.path.basename(file_path)
}
# 使用文件修改时间判断
try:
mtime = os.path.getmtime(file_path)
updated_dt = datetime.datetime.fromtimestamp(mtime)
if updated_dt >= last_check_time:
stats['updated_since_last_check'] += 1
if copy_file_to_destination(file_path, destination_dir, repo, default_frontmatter):
stats['copied_successfully'] += 1
else:
stats['copy_failed'] += 1
except Exception as e:
print(f"无法处理文件 {file_path}: {e}")
# 更新最后检查时间
last_check_time = current_time
# 打印统计结果
print(f"\n{'='*80}")
print("处理结果统计:")
print(f"总文件数: {stats['total_files']}")
print(f"Markdown文件数: {stats['md_files']}")
print(f"包含frontmatter的文件数: {stats['with_frontmatter']}")
print(f"上一次检查后更新的文件数: {stats['updated_since_last_check']}")
print(f"成功复制的文件数: {stats['copied_successfully']}")
print(f"复制失败的文件数: {stats['copy_failed']}")
print(f"{'='*80}")
return stats
def load_config():
"""
从配置文件中读取配置,并为缺失的参数设置默认值
"""
global config, last_check_time
# 读取配置文件
config.read(CONFIG_FILE)
# 确保DEFAULT section存在
if 'DEFAULT' not in config:
config['DEFAULT'] = {}
# 设置默认值
default_config = {
'source_dir': '/opt/src',
'destination_dir': '/opt/doc/_post',
'overwrite_strategy': 'overwrite',
'remote_name': 'origin',
'branch': 'master'
}
# 更新配置文件中的缺失参数
for key, value in default_config.items():
if key not in config['DEFAULT'] or not config['DEFAULT'][key]:
config['DEFAULT'][key] = value
# 获取上一次检查时间
last_check_time_str = config['DEFAULT'].get('last_check_time')
if last_check_time_str:
last_check_time = datetime.datetime.strptime(last_check_time_str, '%Y-%m-%d %H:%M:%S')
else:
last_check_time = datetime.datetime.now()
# 将默认的上一次检查时间写入配置文件
config['DEFAULT']['last_check_time'] = last_check_time.strftime('%Y-%m-%d %H:%M:%S')
print(f"从配置文件中读取配置成功")
print(f"上一次检查时间: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"源目录: {config['DEFAULT'].get('source_dir')}")
print(f"目标目录: {config['DEFAULT'].get('destination_dir')}")
print(f"同名文件处理策略: {config['DEFAULT'].get('overwrite_strategy')}")
print(f"远程仓库名称: {config['DEFAULT'].get('remote_name')}")
print(f"分支名称: {config['DEFAULT'].get('branch')}")
def save_config():
"""
将配置写入配置文件,特别是更新上一次检查时间
"""
global config, last_check_time
# 更新上一次检查时间
config['DEFAULT']['last_check_time'] = last_check_time.strftime('%Y-%m-%d %H:%M:%S')
# 写入配置文件
with open(CONFIG_FILE, 'w') as f:
config.write(f)
print(f"配置文件保存成功,上一次检查时间更新为: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
def push_to_remote(repo: Repo) -> bool:
"""
将本地提交推送到远程仓库
Args:
repo (Repo): Git仓库对象
Returns:
bool: 推送成功返回True否则返回False
"""
try:
# 从配置文件获取远程仓库名称和分支名称
remote_name = config['DEFAULT'].get('remote_name', 'origin')
branch = config['DEFAULT'].get('branch', 'master')
# 检查远程仓库是否存在
if remote_name not in repo.remotes:
print(f"远程仓库 {remote_name} 不存在,无法推送")
return False
# 推送到远程仓库
repo.git.push(remote_name, branch)
print(f"成功推送到远程仓库 {remote_name}{branch} 分支")
return True
except Exception as e:
print(f"推送失败: {e}")
return False
def job():
"""
定时任务函数,执行文件检查和处理
"""
global last_check_time
# 从配置文件获取参数
source_dir = config['DEFAULT'].get('source_dir')
destination_dir = config['DEFAULT'].get('destination_dir')
# 替换为当前目录进行测试(如果默认目录不存在)
if not os.path.exists(source_dir):
source_dir = os.getcwd()
destination_dir = os.path.join(source_dir, 'recent_posts')
print(f"默认源目录不存在,使用当前目录进行测试: {source_dir}")
# 初始化或打开Git仓库
try:
# 检查目标目录是否已经是Git仓库
if os.path.exists(os.path.join(destination_dir, '.git')):
repo = Repo(destination_dir)
print(f"打开现有Git仓库: {destination_dir}")
else:
# 初始化新的Git仓库
repo = Repo.init(destination_dir)
print(f"初始化新Git仓库: {destination_dir}")
# 检查并处理文件
check_and_process_files(repo)
# 更新上一次检查时间
last_check_time = datetime.datetime.now()
# 保存配置,特别是更新上一次检查时间
save_config()
# 推送到远程仓库
push_to_remote(repo)
print("任务完成!")
except Exception as e:
print(f"Git操作失败: {e}")
print("任务失败!")
def main():
"""
主函数,启动定时任务
"""
# 加载配置文件
load_config()
# 立即执行一次检查
print(f"\n立即执行第一次检查")
job()
# 创建调度器
scheduler = BlockingScheduler()
# 添加定时任务每隔5分钟执行一次
scheduler.add_job(job, 'interval', minutes=5)
print(f"\n定时任务已启动每隔5分钟执行一次")
print(f"下次执行时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"按 Ctrl+C 停止任务")
try:
# 启动调度器
scheduler.start()
except KeyboardInterrupt:
print(f"\n定时任务已停止")
except Exception as e:
print(f"定时任务执行出错: {e}")
if __name__ == "__main__":
main()