472 lines
16 KiB
Python
472 lines
16 KiB
Python
import os
|
||
from pickle import FALSE
|
||
import yaml
|
||
import shutil
|
||
import datetime
|
||
import configparser
|
||
from typing import Dict, List, Optional
|
||
from git import Repo
|
||
from apscheduler.schedulers.blocking import BlockingScheduler
|
||
|
||
# 配置文件路径,使用绝对路径确保能找到
|
||
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.ini')
|
||
|
||
# 全局变量,用于存储配置
|
||
config = configparser.ConfigParser()
|
||
|
||
# 全局变量,用于存储上一次检查的时间
|
||
last_check_time = None
|
||
|
||
|
||
def parse_markdown_frontmatter(file_path: str) -> Dict:
|
||
"""
|
||
读取Markdown文件并解析其frontmatter YAML属性
|
||
|
||
Args:
|
||
file_path (str): Markdown文件的路径
|
||
|
||
Returns:
|
||
dict: 解析后的frontmatter属性字典,如果没有frontmatter则返回空字典
|
||
"""
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
# 检查文件是否以frontmatter开头
|
||
if not content.startswith('---'):
|
||
return {}
|
||
|
||
# 找到frontmatter的结束位置
|
||
end_index = content.find('---', 3)
|
||
if end_index == -1:
|
||
return {}
|
||
|
||
# 提取frontmatter内容
|
||
frontmatter_content = content[3:end_index].strip()
|
||
|
||
# 解析YAML
|
||
frontmatter = yaml.safe_load(frontmatter_content)
|
||
|
||
# 确保返回的是字典类型
|
||
if frontmatter is None:
|
||
frontmatter = {}
|
||
|
||
return frontmatter
|
||
except FileNotFoundError:
|
||
print(f"文件 {file_path} 不存在")
|
||
return {}
|
||
except yaml.YAMLError as e:
|
||
print(f"解析文件 {file_path} 的frontmatter时出错: {e}")
|
||
return {}
|
||
except Exception as e:
|
||
print(f"处理文件 {file_path} 时出错: {e}")
|
||
return {}
|
||
|
||
|
||
def is_updated_since_last_check(frontmatter: Dict, file_path: str, last_check: datetime.datetime) -> bool:
|
||
"""
|
||
检查文件是否在上一次检查时间之后更新
|
||
|
||
Args:
|
||
frontmatter (dict): 文件的frontmatter属性
|
||
file_path (str): 文件路径
|
||
last_check (datetime.datetime): 上一次检查的时间
|
||
|
||
Returns:
|
||
bool: 如果文件在上一次检查时间之后更新则返回True,否则返回False
|
||
"""
|
||
# 尝试从frontmatter获取updated时间
|
||
updated_value = frontmatter.get('updated')
|
||
|
||
# 如果frontmatter没有updated字段,使用文件的修改时间
|
||
if not updated_value:
|
||
try:
|
||
mtime = os.path.getmtime(file_path)
|
||
updated_dt = datetime.datetime.fromtimestamp(mtime)
|
||
except OSError as e:
|
||
print(f"无法获取文件 {file_path} 的修改时间: {e}")
|
||
return False
|
||
else:
|
||
# 检查updated_value是否已经是datetime对象
|
||
if isinstance(updated_value, datetime.datetime):
|
||
updated_dt = updated_value
|
||
else:
|
||
# 解析updated字符串为datetime对象
|
||
try:
|
||
# 支持多种时间格式
|
||
date_formats = [
|
||
'%Y-%m-%d %H:%M:%S',
|
||
'%Y-%m-%d %H:%M',
|
||
'%Y-%m-%d',
|
||
'%Y/%m/%d %H:%M:%S',
|
||
'%Y/%m/%d %H:%M',
|
||
'%Y/%m/%d'
|
||
]
|
||
|
||
updated_dt = None
|
||
for fmt in date_formats:
|
||
try:
|
||
updated_dt = datetime.datetime.strptime(updated_value, fmt)
|
||
break
|
||
except ValueError:
|
||
continue
|
||
|
||
if updated_dt is None:
|
||
print(f"文件 {file_path} 的updated时间格式不正确: {updated_value}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"解析文件 {file_path} 的updated时间时出错: {e}")
|
||
return False
|
||
|
||
# 检查是否在上一次检查时间之后更新
|
||
return updated_dt >= last_check
|
||
|
||
|
||
def copy_file_to_destination(source_path: str, destination_dir: str, repo: Repo, frontmatter: Dict) -> bool:
|
||
"""
|
||
将文件复制到目标目录并提交Git变更
|
||
|
||
Args:
|
||
source_path (str): 源文件路径
|
||
destination_dir (str): 目标目录路径
|
||
repo (Repo): Git仓库对象
|
||
frontmatter (dict): 文件的frontmatter属性
|
||
|
||
Returns:
|
||
bool: 复制和提交成功返回True,否则返回False
|
||
"""
|
||
try:
|
||
# 从配置文件获取同名文件处理策略
|
||
overwrite_strategy = config['DEFAULT'].get('overwrite_strategy', 'overwrite')
|
||
|
||
# 确保目标目录存在
|
||
if not os.path.exists(destination_dir):
|
||
os.makedirs(destination_dir, exist_ok=True)
|
||
print(f"创建目标目录: {destination_dir}")
|
||
|
||
# 获取文件名和目标路径
|
||
file_name = os.path.basename(source_path)
|
||
file_base, file_ext = os.path.splitext(file_name)
|
||
destination_path = os.path.join(destination_dir, file_name)
|
||
existsResult = FALSE
|
||
# 处理同名文件
|
||
if os.path.exists(destination_path):
|
||
existsResult = True
|
||
if overwrite_strategy == 'skip':
|
||
print(f"跳过已存在文件: {destination_path}")
|
||
return False
|
||
elif overwrite_strategy == 'rename':
|
||
# 生成带时间戳的新文件名
|
||
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
new_file_name = f"{file_base}_{timestamp}{file_ext}"
|
||
destination_path = os.path.join(destination_dir, new_file_name)
|
||
print(f"文件已存在,重命名为: {new_file_name}")
|
||
# 默认overwrite策略,直接覆盖
|
||
|
||
# 复制文件,保留元数据
|
||
shutil.copy2(source_path, destination_path)
|
||
print(f"成功拷贝: {source_path} -> {destination_path}")
|
||
|
||
# 获取提交信息
|
||
title = frontmatter.get('title', file_name)
|
||
if existsResult:
|
||
commit_message = f"update: 📝 更新 《{title}》文章内容"
|
||
else:
|
||
commit_message = f"feat: 📝 新建《{title}》文章"
|
||
|
||
# 添加文件到Git暂存区
|
||
repo.git.add(".")
|
||
|
||
# 提交变更
|
||
repo.git.commit('-m', commit_message)
|
||
print(f"成功提交: {commit_message}")
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"拷贝或提交失败 {source_path}: {e}")
|
||
return False
|
||
|
||
|
||
def check_and_process_files(repo: Repo) -> Dict:
|
||
"""
|
||
检查并处理指定目录下的所有Markdown文件,将上一次检查后更新的文件复制到目标目录并提交Git变更
|
||
|
||
Args:
|
||
repo (Repo): Git仓库对象
|
||
|
||
Returns:
|
||
dict: 处理结果统计
|
||
"""
|
||
global last_check_time
|
||
|
||
# 从配置文件获取参数
|
||
source_dir = config['DEFAULT'].get('source_dir')
|
||
destination_dir = config['DEFAULT'].get('destination_dir')
|
||
overwrite_strategy = config['DEFAULT'].get('overwrite_strategy', 'overwrite')
|
||
|
||
# 替换为当前目录进行测试(如果默认目录不存在)
|
||
if not os.path.exists(source_dir):
|
||
source_dir = os.getcwd()
|
||
print(f"默认源目录不存在,使用当前目录进行测试: {source_dir}")
|
||
|
||
# 确保目标目录存在
|
||
if not os.path.exists(destination_dir):
|
||
os.makedirs(destination_dir, exist_ok=True)
|
||
print(f"创建目标目录: {destination_dir}")
|
||
|
||
stats = {
|
||
'total_files': 0,
|
||
'md_files': 0,
|
||
'with_frontmatter': 0,
|
||
'updated_since_last_check': 0,
|
||
'copied_successfully': 0,
|
||
'copy_failed': 0
|
||
}
|
||
|
||
current_time = datetime.datetime.now()
|
||
print(f"\n{'='*80}")
|
||
print(f"开始检查文件,时间: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"上一次检查时间: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"源目录: {source_dir}")
|
||
print(f"目标目录: {destination_dir}")
|
||
print(f"同名文件处理策略: {overwrite_strategy}")
|
||
print(f"{'='*80}")
|
||
|
||
# 遍历源目录下的所有文件,忽略以`.`开头的文件夹
|
||
for root, dirs, files in os.walk(source_dir):
|
||
# 过滤掉以`.`开头的文件夹
|
||
dirs[:] = [d for d in dirs if not d.startswith('.')]
|
||
|
||
for file in files:
|
||
# 过滤掉以`.`开头的文件
|
||
if file.startswith('.'):
|
||
continue
|
||
# 只处理Markdown文件
|
||
if not file.lower().endswith('.md'):
|
||
continue
|
||
|
||
stats['total_files'] += 1
|
||
stats['md_files'] += 1
|
||
file_path = os.path.join(root, file)
|
||
|
||
# 解析frontmatter
|
||
frontmatter = parse_markdown_frontmatter(file_path)
|
||
|
||
if frontmatter:
|
||
stats['with_frontmatter'] += 1
|
||
|
||
# 检查是否在上一次检查时间之后更新
|
||
if is_updated_since_last_check(frontmatter, file_path, last_check_time):
|
||
stats['updated_since_last_check'] += 1
|
||
|
||
# 复制到目标目录并提交
|
||
if copy_file_to_destination(file_path, destination_dir, repo, frontmatter):
|
||
stats['copied_successfully'] += 1
|
||
else:
|
||
stats['copy_failed'] += 1
|
||
else:
|
||
# 对于没有frontmatter的文件,创建一个默认的frontmatter
|
||
default_frontmatter = {
|
||
'title': os.path.basename(file_path)
|
||
}
|
||
|
||
# 使用文件修改时间判断
|
||
try:
|
||
mtime = os.path.getmtime(file_path)
|
||
updated_dt = datetime.datetime.fromtimestamp(mtime)
|
||
|
||
if updated_dt >= last_check_time:
|
||
stats['updated_since_last_check'] += 1
|
||
if copy_file_to_destination(file_path, destination_dir, repo, default_frontmatter):
|
||
stats['copied_successfully'] += 1
|
||
else:
|
||
stats['copy_failed'] += 1
|
||
except Exception as e:
|
||
print(f"无法处理文件 {file_path}: {e}")
|
||
|
||
# 更新最后检查时间
|
||
last_check_time = current_time
|
||
|
||
# 打印统计结果
|
||
print(f"\n{'='*80}")
|
||
print("处理结果统计:")
|
||
print(f"总文件数: {stats['total_files']}")
|
||
print(f"Markdown文件数: {stats['md_files']}")
|
||
print(f"包含frontmatter的文件数: {stats['with_frontmatter']}")
|
||
print(f"上一次检查后更新的文件数: {stats['updated_since_last_check']}")
|
||
print(f"成功复制的文件数: {stats['copied_successfully']}")
|
||
print(f"复制失败的文件数: {stats['copy_failed']}")
|
||
print(f"{'='*80}")
|
||
|
||
return stats
|
||
|
||
|
||
def load_config():
|
||
"""
|
||
从配置文件中读取配置,并为缺失的参数设置默认值
|
||
"""
|
||
global config, last_check_time
|
||
|
||
# 读取配置文件
|
||
config.read(CONFIG_FILE)
|
||
|
||
# 确保DEFAULT section存在
|
||
if 'DEFAULT' not in config:
|
||
config['DEFAULT'] = {}
|
||
|
||
# 设置默认值
|
||
default_config = {
|
||
'source_dir': '/opt/src',
|
||
'destination_dir': '/opt/doc/_post',
|
||
'overwrite_strategy': 'overwrite',
|
||
'remote_name': 'origin',
|
||
'branch': 'master'
|
||
}
|
||
|
||
# 更新配置文件中的缺失参数
|
||
for key, value in default_config.items():
|
||
if key not in config['DEFAULT'] or not config['DEFAULT'][key]:
|
||
config['DEFAULT'][key] = value
|
||
|
||
# 获取上一次检查时间
|
||
last_check_time_str = config['DEFAULT'].get('last_check_time')
|
||
if last_check_time_str:
|
||
last_check_time = datetime.datetime.strptime(last_check_time_str, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
last_check_time = datetime.datetime.now()
|
||
# 将默认的上一次检查时间写入配置文件
|
||
config['DEFAULT']['last_check_time'] = last_check_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
print(f"从配置文件中读取配置成功")
|
||
print(f"上一次检查时间: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"源目录: {config['DEFAULT'].get('source_dir')}")
|
||
print(f"目标目录: {config['DEFAULT'].get('destination_dir')}")
|
||
print(f"同名文件处理策略: {config['DEFAULT'].get('overwrite_strategy')}")
|
||
print(f"远程仓库名称: {config['DEFAULT'].get('remote_name')}")
|
||
print(f"分支名称: {config['DEFAULT'].get('branch')}")
|
||
|
||
|
||
def save_config():
|
||
"""
|
||
将配置写入配置文件,特别是更新上一次检查时间
|
||
"""
|
||
global config, last_check_time
|
||
|
||
# 更新上一次检查时间
|
||
config['DEFAULT']['last_check_time'] = last_check_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 写入配置文件
|
||
with open(CONFIG_FILE, 'w') as f:
|
||
config.write(f)
|
||
|
||
print(f"配置文件保存成功,上一次检查时间更新为: {last_check_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
|
||
def push_to_remote(repo: Repo) -> bool:
|
||
"""
|
||
将本地提交推送到远程仓库
|
||
|
||
Args:
|
||
repo (Repo): Git仓库对象
|
||
|
||
Returns:
|
||
bool: 推送成功返回True,否则返回False
|
||
"""
|
||
try:
|
||
# 从配置文件获取远程仓库名称和分支名称
|
||
remote_name = config['DEFAULT'].get('remote_name', 'origin')
|
||
branch = config['DEFAULT'].get('branch', 'master')
|
||
|
||
# 检查远程仓库是否存在
|
||
if remote_name not in repo.remotes:
|
||
print(f"远程仓库 {remote_name} 不存在,无法推送")
|
||
return False
|
||
|
||
# 推送到远程仓库
|
||
repo.git.push(remote_name, branch)
|
||
print(f"成功推送到远程仓库 {remote_name} 的 {branch} 分支")
|
||
return True
|
||
except Exception as e:
|
||
print(f"推送失败: {e}")
|
||
return False
|
||
|
||
|
||
def job():
|
||
"""
|
||
定时任务函数,执行文件检查和处理
|
||
"""
|
||
global last_check_time
|
||
|
||
# 从配置文件获取参数
|
||
source_dir = config['DEFAULT'].get('source_dir')
|
||
destination_dir = config['DEFAULT'].get('destination_dir')
|
||
|
||
# 替换为当前目录进行测试(如果默认目录不存在)
|
||
if not os.path.exists(source_dir):
|
||
source_dir = os.getcwd()
|
||
destination_dir = os.path.join(source_dir, 'recent_posts')
|
||
print(f"默认源目录不存在,使用当前目录进行测试: {source_dir}")
|
||
|
||
# 初始化或打开Git仓库
|
||
try:
|
||
# 检查目标目录是否已经是Git仓库
|
||
if os.path.exists(os.path.join(destination_dir, '.git')):
|
||
repo = Repo(destination_dir)
|
||
print(f"打开现有Git仓库: {destination_dir}")
|
||
else:
|
||
# 初始化新的Git仓库
|
||
repo = Repo.init(destination_dir)
|
||
print(f"初始化新Git仓库: {destination_dir}")
|
||
|
||
# 检查并处理文件
|
||
check_and_process_files(repo)
|
||
|
||
# 更新上一次检查时间
|
||
last_check_time = datetime.datetime.now()
|
||
|
||
# 保存配置,特别是更新上一次检查时间
|
||
save_config()
|
||
|
||
# 推送到远程仓库
|
||
push_to_remote(repo)
|
||
|
||
print("任务完成!")
|
||
|
||
except Exception as e:
|
||
print(f"Git操作失败: {e}")
|
||
print("任务失败!")
|
||
|
||
|
||
def main():
|
||
"""
|
||
主函数,启动定时任务
|
||
"""
|
||
# 加载配置文件
|
||
load_config()
|
||
|
||
# 立即执行一次检查
|
||
print(f"\n立即执行第一次检查")
|
||
job()
|
||
|
||
# 创建调度器
|
||
scheduler = BlockingScheduler()
|
||
|
||
# 添加定时任务,每隔5分钟执行一次
|
||
scheduler.add_job(job, 'interval', minutes=5)
|
||
|
||
print(f"\n定时任务已启动,每隔5分钟执行一次")
|
||
print(f"下次执行时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"按 Ctrl+C 停止任务")
|
||
|
||
try:
|
||
# 启动调度器
|
||
scheduler.start()
|
||
except KeyboardInterrupt:
|
||
print(f"\n定时任务已停止")
|
||
except Exception as e:
|
||
print(f"定时任务执行出错: {e}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|