log_analyzer.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import os
  2. import glob
  3. from datetime import datetime, timedelta
  4. def parse_time(time_str):
  5. time_str = time_str.strip()
  6. if '_' in time_str:
  7. return datetime.strptime(time_str, '%Y-%m-%d_%H-%M-%S')
  8. elif ' ' in time_str:
  9. return datetime.strptime(time_str.replace(' ', ' '), '%Y-%m-%d %H:%M:%S')
  10. return None
  11. def parse_data_status(hex_str):
  12. try:
  13. hex_str = hex_str.strip().upper()
  14. if hex_str.startswith('0X'):
  15. hex_str = hex_str[2:]
  16. value = int(hex_str, 16)
  17. except:
  18. return hex_str
  19. time_flag = '时间有效' if (value & 0x0001) else '时间无效'
  20. loc_type = (value >> 3) & 0x03
  21. loc_types = {
  22. 0: '记忆定位',
  23. 1: '卫星定位',
  24. 2: '惯性定位',
  25. 3: '组合定位'
  26. }
  27. loc_flag = loc_types.get(loc_type, f'未知({loc_type})')
  28. return f"{time_flag},{loc_flag}"
  29. def find_time_gaps(folder_path):
  30. txt_files = glob.glob(os.path.join(folder_path, '*.txt'))
  31. txt_files.sort()
  32. results = []
  33. for file_path in txt_files:
  34. filename = os.path.basename(file_path)
  35. print(f"\n正在分析文件: {filename}")
  36. with open(file_path, 'r', encoding='utf-8') as f:
  37. lines = f.readlines()
  38. if len(lines) < 2:
  39. print(f" 文件数据行数不足,跳过")
  40. continue
  41. header = lines[0].strip().split(',')
  42. time_col = header[0]
  43. status_col = None
  44. data_status_col = None
  45. for i, col in enumerate(header):
  46. if col == '设备状态':
  47. status_col = i
  48. elif col == '数据状态':
  49. data_status_col = i
  50. gap_count = 0
  51. data_lines = lines[1:]
  52. total_rows = len(data_lines)
  53. for i in range(0, total_rows - 4, 4):
  54. curr_parts = data_lines[i].strip().split(',')
  55. next_parts = data_lines[i + 4].strip().split(',')
  56. if len(curr_parts) < 2 or len(next_parts) < 2:
  57. continue
  58. curr_time = parse_time(curr_parts[0])
  59. next_time = parse_time(next_parts[0])
  60. if curr_time is None or next_time is None:
  61. continue
  62. curr_status = curr_parts[status_col] if status_col is not None and status_col < len(curr_parts) else None
  63. curr_data_status = curr_parts[data_status_col] if data_status_col is not None and data_status_col < len(curr_parts) else None
  64. next_status = next_parts[status_col] if status_col is not None and status_col < len(next_parts) else None
  65. next_data_status = next_parts[data_status_col] if data_status_col is not None and data_status_col < len(next_parts) else None
  66. time_diff = (next_time - curr_time).total_seconds()
  67. if time_diff != 1:
  68. gap_count += 1
  69. gap_line = i + 4
  70. gap_info = {
  71. 'file': filename,
  72. 'line': gap_line,
  73. 'prev_time': curr_time.strftime('%Y-%m-%d %H:%M:%S'),
  74. 'curr_time': next_time.strftime('%Y-%m-%d %H:%M:%S'),
  75. 'time_diff': time_diff,
  76. 'prev_status': curr_status,
  77. 'curr_status': next_status,
  78. 'prev_data_status': f"{curr_data_status}({parse_data_status(curr_data_status)})",
  79. 'curr_data_status': f"{next_data_status}({parse_data_status(next_data_status)})"
  80. }
  81. results.append(gap_info)
  82. print(f" [发现时间不连续] 第{gap_line}行: {gap_info['prev_time']} -> {gap_info['curr_time']}, 差值={time_diff}秒 (预期差值=1秒)")
  83. print(f" 前一组最后一行: 第{i+4}行, 时间={curr_parts[0]}, 状态={curr_data_status}({parse_data_status(curr_data_status)})")
  84. print(f" 后一组第一行: 第{i+5}行, 时间={next_parts[0]}, 状态={next_data_status}({parse_data_status(next_data_status)})")
  85. print(f" 前一行状态: 设备状态={curr_status}, 数据状态={curr_data_status}({parse_data_status(curr_data_status)})")
  86. print(f" 当前行状态: 设备状态={next_status}, 数据状态={next_data_status}({parse_data_status(next_data_status)})")
  87. print(f" 文件分析完成,共发现 {gap_count} 处时间不连续")
  88. return results
  89. def main():
  90. folder_path = r"C:\Users\Administrator\Desktop\temp\wangchao\数据\原始\F3-daohang"
  91. print("=" * 60)
  92. print("日志文件时间连续性分析工具")
  93. print("=" * 60)
  94. results = find_time_gaps(folder_path)
  95. print("\n" + "=" * 60)
  96. print("分析结果汇总")
  97. print("=" * 60)
  98. if results:
  99. print(f"\n共发现 {len(results)} 处时间不连续:\n")
  100. for i, r in enumerate(results, 1):
  101. print(f"{i}. 文件: {r['file']}, 行号: {r['line']}")
  102. print(f" 时间: {r['prev_time']} -> {r['curr_time']} (差值: {r['time_diff']}秒)")
  103. print(f" 前状态: 设备状态={r['prev_status']}, 数据状态={r['prev_data_status']}")
  104. print(f" 当前状态: 设备状态={r['curr_status']}, 数据状态={r['curr_data_status']}")
  105. print()
  106. else:
  107. print("\n未发现时间不连续的情况,所有数据时间连续。")
  108. if __name__ == '__main__':
  109. main()