diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 624322e87badd6af4ec23fe3ba2fbc84971ddf52..5497e5e362338ddf78039fb0ce1a8616558f651c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -342,6 +342,26 @@ self-test-on-merge-request: junit: report-junit.xml +measure-delay: + extends: + - .rules-basis + - .rules-merge-request + stage: test + tags: + - delay-test + needs: [ "build-codec-linux-cmake" ] + script: + - *print-common-info + + - make -j + + - mkdir out_delay_cmp + - mkdir out_no_delay_cmp + - python3 ci/delay_measurement_test.py + artifacts: + paths: + - delay_test_results.csv + # --------------------------------------------------------------- # Test jobs for main branch # --------------------------------------------------------------- diff --git a/ci/delay_measurement_test.py b/ci/delay_measurement_test.py new file mode 100644 index 0000000000000000000000000000000000000000..c9245c159f48d28e6b231b28faa77bb4d1263cc2 --- /dev/null +++ b/ci/delay_measurement_test.py @@ -0,0 +1,226 @@ +# steps: +# 1 define expected delays for all modes/bitrates combination +# 2 generate the test signals +# 3 run all modes with delay compensation +# 4 run all modes without delay compenstaion +# 5 measure delay for all channels and report/compare to expected delay +import subprocess +import sys +import re +import numpy as np +import pandas as pd +import shutil + +PYAUDIO3DTOOLS_PATH = "./scripts/" +sys.path.append(PYAUDIO3DTOOLS_PATH) +from pyaudio3dtools import audioarray, audiofile +import pathlib + +OC_TO_NCHANNELS = { + "MONO": 1, + "STEREO": 2, + "BINAURAL": 2, + "BINAURAL_ROOM": 2, + "5_1": 6, + "7_1": 8, + "5_1_2": 8, + "5_1_4": 10, + "7_1_4": 12, + "FOA": 4, + "HOA2": 9, + "HOA3": 16, + "EXT": 1, + "ISM1": 1, + "ISM2": 2, + "ISM3": 3, + "ISM4": 4, + "MASA1TC": 1, + "MASA2TC": 2, +} +FORMATS = [ + "mono", + "stereo", + *[f"ISM{x}" for x in range(1, 5)], + "5_1", + "5_1_2", + "5_1_4", + "7_1", + "7_1_4", + "SBA", + "MASA_1TC_1DIR", + "MASA_1TC_2DIR", + "MASA_2TC_1DIR", + "MASA_2TC_2DIR", +] +TEST_FILE = "./delay_test_file_{}.wav" +CFG = "ci/delay_test_linux.json" + + +def get_modes(format: str) -> list: + modes_cmd = ["python3", "./scripts/runIvasCodec.py", "-l"] + modes_proc = subprocess.run(modes_cmd, capture_output=True) + + search_str = format + # TODO: exclude dtx, rate switching + if format in ["5_1", "5_1_2", "5_1_4", "7_1", "7_1_4"]: + search_str = "MC_" + format + "_b" + modes_list = [ + m + for m in modes_proc.stdout.decode("utf8").split("\n") + if m.startswith(search_str) + and not "_ball_" in m + and not "b_rs" in m + and not "dtx" in m + and not "amr" in m + ] + + return modes_list + + +SIGNAL_SOURCE = "scripts/testv/stv48c.pcm" +CUT_LEN_SECS = 1 +FS_MEASUREMENT_SIGNAL = 48000 + +def gen_signal(channels:int) -> np.ndarray: + # get the measurement signal from the source file (mono file) + signal = np.fromfile(SIGNAL_SOURCE, dtype=np.int16) + # shorten to one second + signal = signal[:FS_MEASUREMENT_SIGNAL * CUT_LEN_SECS] + # create other channels with same signal + signal = np.repeat(signal.reshape((-1, 1)), channels, axis=1) + + return signal + + +def main(): + for form in FORMATS: + + # generate test signal + test_file = TEST_FILE.format(form) + gen_form = form + if form == "SBA": + gen_form = "HOA3" + if "MASA" in form: + # copy over files + masa_source_files = ["scripts/testv/stv_IVASMASA_1dir1TC.pcm"] + for tcs in [1, 2]: + for dir in [1, 2]: + source_signal = f"scripts/testv/stv_IVASMASA_{dir}dir{tcs}TC.pcm" + source_metadata = f"scripts/testv/stv_IVASMASA_{dir}dir{tcs}TC.met" + target_signal = f"delay_test_file_MASA_{tcs}TC_{dir}DIR.wav" + target_metadata = f"delay_test_file_MASA_{tcs}TC_{dir}DIR.met" + s, fs = audiofile.readfile(source_signal, nchannels=tcs) + audiofile.writefile(target_signal, s) + shutil.copy(source_metadata, target_metadata) + else: + sig = gen_signal(OC_TO_NCHANNELS[gen_form.upper()]) + audiofile.writefile(test_file, sig) + + modes_list = get_modes(form) + + # run format + outfolder_delaycmp = "out_delay_cmp" + + run_cmd = [ + "python3", + "./scripts/runIvasCodec.py", + "-m", + *modes_list, + "-p", + CFG, + "-I", + test_file, + "-o", + outfolder_delaycmp, + ] + subprocess.call(run_cmd) + + outfolder_nodelaycmp = "out_no_delay_cmp" + run_cmd_nodelaycmp = list(run_cmd) + run_cmd_nodelaycmp[-1] = outfolder_nodelaycmp + run_cmd_nodelaycmp.extend(["-D=-NO_DELAY_CMP", "-E=-NO_DELAY_CMP"]) + subprocess.call(run_cmd_nodelaycmp) + + delays = get_delay_for_folders(outfolder_delaycmp, outfolder_nodelaycmp) + + with pd.option_context('display.max_rows', None, 'display.max_columns', None): + print(delays) + + delays.to_csv("delay_test_results.csv") + + +def get_delay_for_folders(folder_delay_cmp: str, folder_no_delay_cmp: str): + + folder_path_delay_cmp = pathlib.Path(folder_delay_cmp) + files_delay_cmp = sorted( + [ + f.name + for f in folder_path_delay_cmp.joinpath("dec").iterdir() + if f.name.endswith(".wav") + ] + ) + + folder_path_no_delay_cmp = pathlib.Path(folder_no_delay_cmp) + files_no_delay_cmp = sorted( + [ + f.name + for f in folder_path_no_delay_cmp.joinpath("dec").iterdir() + if f.name.endswith(".wav") + ] + ) + + assert files_delay_cmp == files_no_delay_cmp + + output = { + "format": list(), + "out_format": list(), + "fs": list(), + "bitrate": list(), + "bandwidth": list(), + "delay": list(), + } + + for f in files_delay_cmp: + + f_cmp = folder_path_delay_cmp.joinpath("dec", f) + f_no_cmp = folder_path_no_delay_cmp.joinpath("dec", f) + + s_cmp, fs = audiofile.readfile(f_cmp) + s_no_cmp, fs = audiofile.readfile(f_no_cmp) + + delay = audioarray.getdelay(s_cmp, s_no_cmp) / fs + + search_prefix = "delay_test_file_(([0-9_]+)|[a-zA-Z0-9]+_)" + op_name = re.sub(search_prefix, "", f).split(".wav")[0] + + out_format = op_name.split(".dec.")[-1] + + op_name_split = op_name.split("_b") + + search_br = "b[0-9]+(_[0-9])*" + br = float(re.search(search_br, op_name).group()[1:].replace("_", ".")) + + bw = "fb" + if "_swb_" in op_name: + bw = "swb" + elif "_wb_" in op_name: + bw = "wb" + elif "_nb_" in op_name: + bw = "nb" + + form = op_name_split[0] + if "MASA" in form: + form = form[form.index("MASA"):] + + output["format"].append(form) + output["out_format"].append(out_format) + output["fs"].append(fs) + output["bitrate"].append(br) + output["bandwidth"].append(bw) + output["delay"].append(delay) + + return pd.DataFrame(output) + + +if __name__ == "__main__": + main() diff --git a/ci/delay_test_linux.json b/ci/delay_test_linux.json new file mode 100644 index 0000000000000000000000000000000000000000..c71ddd543aa4d4465da165414eb420420857076f --- /dev/null +++ b/ci/delay_test_linux.json @@ -0,0 +1,25 @@ +{ + "afspPath": "not_needed", + "utilPath": "/tools", + "inpaths": { + "MONO": "delay_test_file_MONO.wav", + "STEREO": "delay_test_file_STEREO.wav", + "FOA": "delay_test_file_FOA.wav", + "HOA2": "delay_test_file_HOA2.wav", + "HOA3": "delay_test_file_HOA3.wav", + "SBA": "delay_test_file_SBA.wav", + "MASA1TC1DIR": "delay_test_file_MASA_1TC_1DIR.wav", + "MASA1TC2DIR": "delay_test_file_MASA_1TC_2DIR.wav", + "MASA2TC1DIR": "delay_test_file_MASA_2TC_1DIR.wav", + "MASA2TC2DIR": "delay_test_file_MASA_2TC_2DIR.wav", + "5_1": "delay_test_file_5_1.wav", + "5_1_2": "delay_test_file_5_1_2.wav", + "5_1_4": "delay_test_file_5_1_4.wav", + "7_1": "delay_test_file_7_1.wav", + "7_1_4": "delay_test_file_7_1_4.wav", + "ISM1": "delay_test_file_ISM1.wav", + "ISM2": "delay_test_file_ISM2.wav", + "ISM3": "delay_test_file_ISM3.wav", + "ISM4": "delay_test_file_ISM4.wav" + } +} diff --git a/ci/generate_delay_measurement_signal.py b/ci/generate_delay_measurement_signal.py new file mode 100644 index 0000000000000000000000000000000000000000..4e2fda196828cde99d82f02a3161ad5b689a9dca --- /dev/null +++ b/ci/generate_delay_measurement_signal.py @@ -0,0 +1,58 @@ +import argparse +import numpy as np + +OC_TO_NCHANNELS = { + "MONO": 1, + "STEREO": 2, + "BINAURAL": 2, + "BINAURAL_ROOM": 2, + "5_1": 6, + "7_1": 8, + "5_1_2": 8, + "5_1_4": 10, + "7_1_4": 12, + "FOA": 4, + "HOA2": 9, + "HOA3": 16, + "EXT": 1, + "ISM1": 1, + "ISM2": 2, + "ISM3": 3, + "ISM4": 4, + "MASA1TC": 1, + "MASA2TC": 2, +} + +SIGNAL_SOURCE = "scripts/testv/stv48c.pcm" +CUT_LEN_SECS = 1 +FS = 48000 + + +def main(args): + channels = OC_TO_NCHANNELS[ args.outformat ] + outfile = args.outfile + + signal = gen_signal(channels) + + # write out interleaved + signal.reshape(-1, 1).tofile(outfile) + + +def gen_signal(channels:int) -> np.ndarray: + # get the measurement signal from the source file (mono file) + signal = np.fromfile(SIGNAL_SOURCE, dtype=np.int16) + # shorten to one second + signal = signal[:FS * CUT_LEN_SECS] + # create other channels with same signal + signal = np.repeat(signal.reshape((-1, 1)), channels, axis=1) + + return signal + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("outformat", type=str.upper, choices=list(OC_TO_NCHANNELS.keys()), help="output format to generate for") + parser.add_argument("outfile", help="output .pcm file with measurement signal") + + args = parser.parse_args() + main(args) \ No newline at end of file