본문 바로가기

Work

오랜만의 코딩: BLF 변환

어제 퇴근 시각 즈음하여 BLF 로그 파일 무료 변환 방법을 찾아보다가,

python-can 모듈에서 관련 함수를 제공하는 것 같아서 CSV 변환을 목표로 간단하게 짜보기 시작했다.

오늘은 금요일이라 회의도 많지 않았고, 피카 시간과 특허 아이디어 회의 등 사이사이 조금씩 작성했다.

오랜만에 API 검색하며 짜니까 재미있었고, 예전엔 기대할 수 없던 ChatGPT가 있어서 상당히 수월하다.

다만, 예전의 기분 생각하며 주로 공식 문서를 읽었고 예제는 stackoverflow를 참고하기도 했는데, ChatGPT가 라이브러리나 무료 툴 소개할 땐 조금 부정확한 이야길 할 때가 있기 때문이었다.

 

generator에 대해 iterator를 쓰려다 보니 문제가 생겨 막혔다. 첫 라인에서 필드 이름으로 헤더 텍스트를 만들려 했는데, next()를 사용하면 더이상 for문으로 접근이 불가능하다. 직관적이고 간단한 방법은 그냥 몽땅 리스트로 만드는 거지만 그렇게 구현하고 싶진 않고, 첫번째 enumerator에 대해서만 텍스트 추출하는 조건문을 넣는 건 못생겨서 싫다.

 

샤워하면서 생각하면 기발한 방법이 떠오를지도.

 

다음 날 추가) iterator를 여러개 연결하는 방법을 찾았지만 효율적이지 않다는 조언에 따라, 그냥 '끝날 때까지 읽어오는' 방식을 쓰기로 했다.

예상치 못한 문제: object_count-1개를 읽어오면 될 거라 생각했는데, 어느 불특정 count까지 읽은 뒤 StopIteration exception이 발생했다. 이 예외의 의미를 몰라서, 파일이 너무 커진 탓으로 오해했다. 그래서 그 전 특정 count에 이르면 파일을 분할 생성하도록 구현을 했는데... 알고보니 이 exception은 iterator가 끝에 도달해 발생하는 것이었다.

왜 예제 blf 파일이 count보다 적은 object를 갖고 있는지는 아직 모른다. 다만 StopIteration은 '정상 종료'라고 봐도 무방할 것 같아 그리 처리함.

import csv
import can
import os
from termcolor import cprint

### https://pypi.org/project/termcolor/
printError = lambda txt: cprint(txt, "red", "underline")
printInfo = lambda txt: cprint(txt, "yellow")
printFail = lambda txt: cprint(txt, "red", "on_black")
printSuccess = lambda txt: cprint(txt, "green", "on_black")

def canMsg2list(msg:can.Message, withKey=False) -> list:
    canMessageDict = {
        "timestamp":	msg.timestamp,	###	timestamp
        "channel":	msg.channel,	###	channel
        "bitrate_switch":	msg.bitrate_switch,	###	bitrate_switch
        "dlc":	msg.dlc,	###	dlc
        "data":	'-'.join([f"{d:02X}" for d in msg.data]),	###	data
        "error_state_indicator":	msg.error_state_indicator,	###	error_state_indicator
        "is_error_frame":	msg.is_error_frame,	###	is_error_frame
        "is_extended_id":	msg.is_extended_id,	###	is_extended_id
        "is_rx":	msg.is_rx,	###	is_rx
        "is_fd":	msg.is_fd,	###	is_fd
        "is_remote_frame":	msg.is_remote_frame,	###	is_remote_frame
        "arbitration_id":	msg.arbitration_id,	###	arbitration_id
    }
    if withKey is True:
        return list(canMessageDict.keys()), list(canMessageDict.values())
    return list(canMessageDict.values())

def convertToCSVstring(valueList:list) -> str:
    return ','.join([str(val) for val in valueList])

def blf2csv(blf_filepath = None, csv_filepath = None):
    ### https://python-can.readthedocs.io/en/stable/file_io.html#blf-binary-logging-format
    if blf_filepath is None:
        blf_filepath = r"C:\Users\a461269\OneDrive - Volvo Group\100.ESS Diagnostics\Supplier Project_CATL\B sample\Commissioning Arendal\EP_3_Logging_2.blf"
    if not os.path.exists(blf_filepath):
        raise FileNotFoundError("No file found: " + blf_filepath)
    blf_object = can.BLFReader(blf_filepath)
    printSuccess("Reading BLF done: " + blf_filepath)

    ### View log in list
    if False:
        blf_log = [msg for msg in blf_object]
        text_log = [canMsg2list(blf_log[0], True)[0]] ### Add header text line
        for log in blf_log:
            l = canMsg2list(log)
            print(l, sep='\t')
            text_log.append(l)
        print("Log extracted: %d lines"%(len(text_log)))

    ### Write to CSV file
    conversion_completed = False
    file_split_cnt = 0
    blf_obj_itr = iter(blf_object)
    global header

    if csv_filepath is None:
        csv_filepath_base = blf_filepath.removesuffix(".blf")
    else:
        csv_filepath_base = csv_filepath.removesuffix(".csv")

    while conversion_completed is False:
        csv_filepath = csv_filepath_base
        csv_filepath += '_' + str(file_split_cnt) if file_split_cnt > 0 else '' + ".csv"
        try:
            csv_file = open(csv_filepath, 'w')
            #csv_writer = csv.writer(csv_file)
            file_split_cnt += 1

            header, msg_fields = canMsg2list(next(blf_obj_itr), withKey=True) ### the initial msg is used to make header text in CSV.
            csv_file.write(convertToCSVstring(header)+'\n')
            csv_file.write(convertToCSVstring(msg_fields)+'\n')
            for msg_cnt in range(blf_object.object_count-1):
                msg = next(blf_obj_itr)
                msg_fields = canMsg2list(msg)
                string = convertToCSVstring(canMsg2list(msg))
                csv_file.write(convertToCSVstring(canMsg2list(msg))+'\n')
                #csv_writer.writerow("hello\n") ### this method splits all letters with comma separator.
        except PermissionError as e:
            printFail("Error to create CSV file: Close the file and retry.")
        except StopIteration as e:
            if msg_cnt < blf_object.object_count-1:
                printInfo("Reading lines exhausted before BLF size.")
            printSuccess("Writing CSV done: " + csv_filepath)
            conversion_completed = True
        except Exception as e:
            printError("Error in writing CSV file: " + str(e))
        finally:
            printInfo("CSV lines written: %d out of %d lines"%(msg_cnt+1, blf_object.object_count))
            csv_file.close()