| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- import os
- import glob
- from datetime import datetime, timedelta
- def parse_time(time_str):
- time_str = time_str.strip()
- if '_' in time_str:
- return datetime.strptime(time_str, '%Y-%m-%d_%H-%M-%S')
- elif ' ' in time_str:
- return datetime.strptime(time_str.replace(' ', ' '), '%Y-%m-%d %H:%M:%S')
- return None
- def parse_data_status(hex_str):
- try:
- hex_str = hex_str.strip().upper()
- if hex_str.startswith('0X'):
- hex_str = hex_str[2:]
- value = int(hex_str, 16)
- except:
- return hex_str
- time_flag = '时间有效' if (value & 0x0001) else '时间无效'
- loc_type = (value >> 3) & 0x03
- loc_types = {
- 0: '记忆定位',
- 1: '卫星定位',
- 2: '惯性定位',
- 3: '组合定位'
- }
- loc_flag = loc_types.get(loc_type, f'未知({loc_type})')
- return f"{time_flag},{loc_flag}"
- def find_time_gaps(folder_path):
- txt_files = glob.glob(os.path.join(folder_path, '*.txt'))
- txt_files.sort()
- results = []
- for file_path in txt_files:
- filename = os.path.basename(file_path)
- print(f"\n正在分析文件: {filename}")
- with open(file_path, 'r', encoding='utf-8') as f:
- lines = f.readlines()
- if len(lines) < 2:
- print(f" 文件数据行数不足,跳过")
- continue
- header = lines[0].strip().split(',')
- time_col = header[0]
- status_col = None
- data_status_col = None
- for i, col in enumerate(header):
- if col == '设备状态':
- status_col = i
- elif col == '数据状态':
- data_status_col = i
- gap_count = 0
- data_lines = lines[1:]
- total_rows = len(data_lines)
- for i in range(0, total_rows - 4, 4):
- curr_parts = data_lines[i].strip().split(',')
- next_parts = data_lines[i + 4].strip().split(',')
- if len(curr_parts) < 2 or len(next_parts) < 2:
- continue
- curr_time = parse_time(curr_parts[0])
- next_time = parse_time(next_parts[0])
- if curr_time is None or next_time is None:
- continue
- curr_status = curr_parts[status_col] if status_col is not None and status_col < len(curr_parts) else None
- curr_data_status = curr_parts[data_status_col] if data_status_col is not None and data_status_col < len(curr_parts) else None
- next_status = next_parts[status_col] if status_col is not None and status_col < len(next_parts) else None
- next_data_status = next_parts[data_status_col] if data_status_col is not None and data_status_col < len(next_parts) else None
- time_diff = (next_time - curr_time).total_seconds()
- if time_diff != 1:
- gap_count += 1
- gap_line = i + 4
- gap_info = {
- 'file': filename,
- 'line': gap_line,
- 'prev_time': curr_time.strftime('%Y-%m-%d %H:%M:%S'),
- 'curr_time': next_time.strftime('%Y-%m-%d %H:%M:%S'),
- 'time_diff': time_diff,
- 'prev_status': curr_status,
- 'curr_status': next_status,
- 'prev_data_status': f"{curr_data_status}({parse_data_status(curr_data_status)})",
- 'curr_data_status': f"{next_data_status}({parse_data_status(next_data_status)})"
- }
- results.append(gap_info)
- print(f" [发现时间不连续] 第{gap_line}行: {gap_info['prev_time']} -> {gap_info['curr_time']}, 差值={time_diff}秒 (预期差值=1秒)")
- print(f" 前一组最后一行: 第{i+4}行, 时间={curr_parts[0]}, 状态={curr_data_status}({parse_data_status(curr_data_status)})")
- print(f" 后一组第一行: 第{i+5}行, 时间={next_parts[0]}, 状态={next_data_status}({parse_data_status(next_data_status)})")
- print(f" 前一行状态: 设备状态={curr_status}, 数据状态={curr_data_status}({parse_data_status(curr_data_status)})")
- print(f" 当前行状态: 设备状态={next_status}, 数据状态={next_data_status}({parse_data_status(next_data_status)})")
- print(f" 文件分析完成,共发现 {gap_count} 处时间不连续")
- return results
- def main():
- folder_path = r"C:\Users\Administrator\Desktop\temp\wangchao\数据\原始\F3-daohang"
- print("=" * 60)
- print("日志文件时间连续性分析工具")
- print("=" * 60)
- results = find_time_gaps(folder_path)
- print("\n" + "=" * 60)
- print("分析结果汇总")
- print("=" * 60)
- if results:
- print(f"\n共发现 {len(results)} 处时间不连续:\n")
- for i, r in enumerate(results, 1):
- print(f"{i}. 文件: {r['file']}, 行号: {r['line']}")
- print(f" 时间: {r['prev_time']} -> {r['curr_time']} (差值: {r['time_diff']}秒)")
- print(f" 前状态: 设备状态={r['prev_status']}, 数据状态={r['prev_data_status']}")
- print(f" 当前状态: 设备状态={r['curr_status']}, 数据状态={r['curr_data_status']}")
- print()
- else:
- print("\n未发现时间不连续的情况,所有数据时间连续。")
- if __name__ == '__main__':
- main()
|