|
@@ -0,0 +1,137 @@
|
|
|
|
|
+import os
|
|
|
|
|
+import glob
|
|
|
|
|
+from datetime import datetime, timedelta
|
|
|
|
|
+
|
|
|
|
|
+def parse_time(time_str):
|
|
|
|
|
+ time_str = time_str.strip()
|
|
|
|
|
+ if '_' in time_str:
|
|
|
|
|
+ return datetime.strptime(time_str, '%Y-%m-%d_%H-%M-%S')
|
|
|
|
|
+ elif ' ' in time_str:
|
|
|
|
|
+ return datetime.strptime(time_str.replace(' ', ' '), '%Y-%m-%d %H:%M:%S')
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+def parse_data_status(hex_str):
|
|
|
|
|
+ try:
|
|
|
|
|
+ hex_str = hex_str.strip().upper()
|
|
|
|
|
+ if hex_str.startswith('0X'):
|
|
|
|
|
+ hex_str = hex_str[2:]
|
|
|
|
|
+ value = int(hex_str, 16)
|
|
|
|
|
+ except:
|
|
|
|
|
+ return hex_str
|
|
|
|
|
+
|
|
|
|
|
+ time_flag = '时间有效' if (value & 0x0001) else '时间无效'
|
|
|
|
|
+
|
|
|
|
|
+ loc_type = (value >> 3) & 0x03
|
|
|
|
|
+ loc_types = {
|
|
|
|
|
+ 0: '记忆定位',
|
|
|
|
|
+ 1: '卫星定位',
|
|
|
|
|
+ 2: '惯性定位',
|
|
|
|
|
+ 3: '组合定位'
|
|
|
|
|
+ }
|
|
|
|
|
+ loc_flag = loc_types.get(loc_type, f'未知({loc_type})')
|
|
|
|
|
+
|
|
|
|
|
+ return f"{time_flag},{loc_flag}"
|
|
|
|
|
+
|
|
|
|
|
+def find_time_gaps(folder_path):
|
|
|
|
|
+ txt_files = glob.glob(os.path.join(folder_path, '*.txt'))
|
|
|
|
|
+ txt_files.sort()
|
|
|
|
|
+
|
|
|
|
|
+ results = []
|
|
|
|
|
+
|
|
|
|
|
+ for file_path in txt_files:
|
|
|
|
|
+ filename = os.path.basename(file_path)
|
|
|
|
|
+ print(f"\n正在分析文件: {filename}")
|
|
|
|
|
+
|
|
|
|
|
+ with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
|
|
+ lines = f.readlines()
|
|
|
|
|
+
|
|
|
|
|
+ if len(lines) < 2:
|
|
|
|
|
+ print(f" 文件数据行数不足,跳过")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ header = lines[0].strip().split(',')
|
|
|
|
|
+ time_col = header[0]
|
|
|
|
|
+ status_col = None
|
|
|
|
|
+ data_status_col = None
|
|
|
|
|
+
|
|
|
|
|
+ for i, col in enumerate(header):
|
|
|
|
|
+ if col == '设备状态':
|
|
|
|
|
+ status_col = i
|
|
|
|
|
+ elif col == '数据状态':
|
|
|
|
|
+ data_status_col = i
|
|
|
|
|
+
|
|
|
|
|
+ gap_count = 0
|
|
|
|
|
+ data_lines = lines[1:]
|
|
|
|
|
+ total_rows = len(data_lines)
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(0, total_rows - 4, 4):
|
|
|
|
|
+ curr_parts = data_lines[i].strip().split(',')
|
|
|
|
|
+ next_parts = data_lines[i + 4].strip().split(',')
|
|
|
|
|
+
|
|
|
|
|
+ if len(curr_parts) < 2 or len(next_parts) < 2:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ curr_time = parse_time(curr_parts[0])
|
|
|
|
|
+ next_time = parse_time(next_parts[0])
|
|
|
|
|
+
|
|
|
|
|
+ if curr_time is None or next_time is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ curr_status = curr_parts[status_col] if status_col is not None and status_col < len(curr_parts) else None
|
|
|
|
|
+ curr_data_status = curr_parts[data_status_col] if data_status_col is not None and data_status_col < len(curr_parts) else None
|
|
|
|
|
+ next_status = next_parts[status_col] if status_col is not None and status_col < len(next_parts) else None
|
|
|
|
|
+ next_data_status = next_parts[data_status_col] if data_status_col is not None and data_status_col < len(next_parts) else None
|
|
|
|
|
+
|
|
|
|
|
+ time_diff = (next_time - curr_time).total_seconds()
|
|
|
|
|
+
|
|
|
|
|
+ if time_diff != 1:
|
|
|
|
|
+ gap_count += 1
|
|
|
|
|
+ gap_line = i + 4
|
|
|
|
|
+ gap_info = {
|
|
|
|
|
+ 'file': filename,
|
|
|
|
|
+ 'line': gap_line,
|
|
|
|
|
+ 'prev_time': curr_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
|
|
|
+ 'curr_time': next_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
|
|
|
+ 'time_diff': time_diff,
|
|
|
|
|
+ 'prev_status': curr_status,
|
|
|
|
|
+ 'curr_status': next_status,
|
|
|
|
|
+ 'prev_data_status': f"{curr_data_status}({parse_data_status(curr_data_status)})",
|
|
|
|
|
+ 'curr_data_status': f"{next_data_status}({parse_data_status(next_data_status)})"
|
|
|
|
|
+ }
|
|
|
|
|
+ results.append(gap_info)
|
|
|
|
|
+ print(f" [发现时间不连续] 第{gap_line}行: {gap_info['prev_time']} -> {gap_info['curr_time']}, 差值={time_diff}秒 (预期差值=1秒)")
|
|
|
|
|
+ print(f" 前一组最后一行: 第{i+4}行, 时间={curr_parts[0]}, 状态={curr_data_status}({parse_data_status(curr_data_status)})")
|
|
|
|
|
+ print(f" 后一组第一行: 第{i+5}行, 时间={next_parts[0]}, 状态={next_data_status}({parse_data_status(next_data_status)})")
|
|
|
|
|
+ print(f" 前一行状态: 设备状态={curr_status}, 数据状态={curr_data_status}({parse_data_status(curr_data_status)})")
|
|
|
|
|
+ print(f" 当前行状态: 设备状态={next_status}, 数据状态={next_data_status}({parse_data_status(next_data_status)})")
|
|
|
|
|
+
|
|
|
|
|
+ print(f" 文件分析完成,共发现 {gap_count} 处时间不连续")
|
|
|
|
|
+
|
|
|
|
|
+ return results
|
|
|
|
|
+
|
|
|
|
|
+def main():
|
|
|
|
|
+ folder_path = r"C:\Users\Administrator\Desktop\temp\wangchao\数据\原始\F3-daohang"
|
|
|
|
|
+
|
|
|
|
|
+ print("=" * 60)
|
|
|
|
|
+ print("日志文件时间连续性分析工具")
|
|
|
|
|
+ print("=" * 60)
|
|
|
|
|
+
|
|
|
|
|
+ results = find_time_gaps(folder_path)
|
|
|
|
|
+
|
|
|
|
|
+ print("\n" + "=" * 60)
|
|
|
|
|
+ print("分析结果汇总")
|
|
|
|
|
+ print("=" * 60)
|
|
|
|
|
+
|
|
|
|
|
+ if results:
|
|
|
|
|
+ print(f"\n共发现 {len(results)} 处时间不连续:\n")
|
|
|
|
|
+ for i, r in enumerate(results, 1):
|
|
|
|
|
+ print(f"{i}. 文件: {r['file']}, 行号: {r['line']}")
|
|
|
|
|
+ print(f" 时间: {r['prev_time']} -> {r['curr_time']} (差值: {r['time_diff']}秒)")
|
|
|
|
|
+ print(f" 前状态: 设备状态={r['prev_status']}, 数据状态={r['prev_data_status']}")
|
|
|
|
|
+ print(f" 当前状态: 设备状态={r['curr_status']}, 数据状态={r['curr_data_status']}")
|
|
|
|
|
+ print()
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("\n未发现时间不连续的情况,所有数据时间连续。")
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
|
+ main()
|