import os import glob from datetime import datetime, timedelta def parse_time(time_str): time_str = time_str.strip() if '_' in time_str: return datetime.strptime(time_str, '%Y-%m-%d_%H-%M-%S') elif ' ' in time_str: return datetime.strptime(time_str.replace(' ', ' '), '%Y-%m-%d %H:%M:%S') return None def parse_data_status(hex_str): try: hex_str = hex_str.strip().upper() if hex_str.startswith('0X'): hex_str = hex_str[2:] value = int(hex_str, 16) except: return hex_str time_flag = '时间有效' if (value & 0x0001) else '时间无效' loc_type = (value >> 3) & 0x03 loc_types = { 0: '记忆定位', 1: '卫星定位', 2: '惯性定位', 3: '组合定位' } loc_flag = loc_types.get(loc_type, f'未知({loc_type})') return f"{time_flag},{loc_flag}" def find_time_gaps(folder_path): txt_files = glob.glob(os.path.join(folder_path, '*.txt')) txt_files.sort() results = [] for file_path in txt_files: filename = os.path.basename(file_path) print(f"\n正在分析文件: {filename}") with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() if len(lines) < 2: print(f" 文件数据行数不足,跳过") continue header = lines[0].strip().split(',') time_col = header[0] status_col = None data_status_col = None for i, col in enumerate(header): if col == '设备状态': status_col = i elif col == '数据状态': data_status_col = i gap_count = 0 data_lines = lines[1:] total_rows = len(data_lines) for i in range(0, total_rows - 4, 4): curr_parts = data_lines[i].strip().split(',') next_parts = data_lines[i + 4].strip().split(',') if len(curr_parts) < 2 or len(next_parts) < 2: continue curr_time = parse_time(curr_parts[0]) next_time = parse_time(next_parts[0]) if curr_time is None or next_time is None: continue curr_status = curr_parts[status_col] if status_col is not None and status_col < len(curr_parts) else None curr_data_status = curr_parts[data_status_col] if data_status_col is not None and data_status_col < len(curr_parts) else None next_status = next_parts[status_col] if status_col is not None and status_col < len(next_parts) else None next_data_status = next_parts[data_status_col] if data_status_col is not None and data_status_col < len(next_parts) else None time_diff = (next_time - curr_time).total_seconds() if time_diff != 1: gap_count += 1 gap_line = i + 4 gap_info = { 'file': filename, 'line': gap_line, 'prev_time': curr_time.strftime('%Y-%m-%d %H:%M:%S'), 'curr_time': next_time.strftime('%Y-%m-%d %H:%M:%S'), 'time_diff': time_diff, 'prev_status': curr_status, 'curr_status': next_status, 'prev_data_status': f"{curr_data_status}({parse_data_status(curr_data_status)})", 'curr_data_status': f"{next_data_status}({parse_data_status(next_data_status)})" } results.append(gap_info) print(f" [发现时间不连续] 第{gap_line}行: {gap_info['prev_time']} -> {gap_info['curr_time']}, 差值={time_diff}秒 (预期差值=1秒)") print(f" 前一组最后一行: 第{i+4}行, 时间={curr_parts[0]}, 状态={curr_data_status}({parse_data_status(curr_data_status)})") print(f" 后一组第一行: 第{i+5}行, 时间={next_parts[0]}, 状态={next_data_status}({parse_data_status(next_data_status)})") print(f" 前一行状态: 设备状态={curr_status}, 数据状态={curr_data_status}({parse_data_status(curr_data_status)})") print(f" 当前行状态: 设备状态={next_status}, 数据状态={next_data_status}({parse_data_status(next_data_status)})") print(f" 文件分析完成,共发现 {gap_count} 处时间不连续") return results def main(): folder_path = r"C:\Users\Administrator\Desktop\temp\wangchao\数据\原始\F3-daohang" print("=" * 60) print("日志文件时间连续性分析工具") print("=" * 60) results = find_time_gaps(folder_path) print("\n" + "=" * 60) print("分析结果汇总") print("=" * 60) if results: print(f"\n共发现 {len(results)} 处时间不连续:\n") for i, r in enumerate(results, 1): print(f"{i}. 文件: {r['file']}, 行号: {r['line']}") print(f" 时间: {r['prev_time']} -> {r['curr_time']} (差值: {r['time_diff']}秒)") print(f" 前状态: 设备状态={r['prev_status']}, 数据状态={r['prev_data_status']}") print(f" 当前状态: 设备状态={r['curr_status']}, 数据状态={r['curr_data_status']}") print() else: print("\n未发现时间不连续的情况,所有数据时间连续。") if __name__ == '__main__': main()