diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7c8032650dd3fd4af3bd1fd2480d40a1e30fa3d7..6810898e3e2cf635b0b7b4df1f984770297240e2 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -138,7 +138,7 @@ stages: .merge-request-comparison-check: &merge-request-comparison-check - if [ $zero_errors != 1 ]; then echo "Run errors encountered!"; exit $EXIT_CODE_FAIL; fi - if [ $exit_code -eq 1 ] && [ $non_be_flag == 0 ]; then echo "Non-bitexact cases without non-BE tag encountered!"; exit $EXIT_CODE_FAIL; fi - - if [ $exit_code -eq 1 ] && [ $non_be_flag != 0 ]; then echo "Non-bitexact cases with non-BE tag encountered"; exit $EXIT_CODE_NON_BE; fi + - if [ $exit_code -eq 1 ] && [ $non_be_flag != 0 ]; then echo "Non-bitexact cases with non-BE tag encountered"; exit $EXIT_CODE_NON_BE; fi - exit 0 .update-ltv-repo: &update-ltv-repo @@ -732,7 +732,7 @@ ivas-pytest-on-merge-request: - non_be_flag=$(grep -c --ignore-case "\[non[ -]*be\]" tmp.txt) || true - ref_using_main=$(grep -c --ignore-case "\[ref[ -]*using[ -]*main\]" tmp.txt) || true - ### If ref_using_main is not set, checkoug the source branch to use scripts and input from there + ### If ref_using_main is not set, checkout the source branch to use scripts and input from there - if [ $ref_using_main == 0 ]; then git checkout $source_branch_commit_sha; fi ### prepare pytest @@ -767,6 +767,62 @@ ivas-pytest-on-merge-request: junit: - report-junit.xml + +# Check interop IVAS_cod_test -> IVAS_dec_ref +ivas-interop-on-merge-request: + extends: + - .test-job-linux + - .rules-merge-request + stage: test + needs: ["build-codec-linux-cmake"] + timeout: "10 minutes" + script: + - *print-common-info + - *get-commits-behind-count + - *check-commits-behind-count-in-compare-jobs + - *merge-request-comparison-setup-codec + # the next line is dependent on ref-using-main flag in the other tests, but here the flag does not make sense + - git checkout $source_branch_commit_sha + + # some helper variables - "|| true" to prevent failures from grep not finding anything + # write to temporary file as workaround for failures observed with piping echo + - echo $CI_MERGE_REQUEST_TITLE > tmp.txt + - non_interop_flag=$(grep -c --ignore-case "\[non[ -]*io\]" tmp.txt) || true + + ### prepare pytest + # create short test vectors + - python3 tests/create_short_testvectors.py + + # Run reference creation, using source branch encoder and main decoder (see .merge-request-comparison-setup-codec) + - exit_code=0 + - exit_code2=0 + # set timeout for individual testcase runs to 60 seconds + - testcase_timeout=60 + - python3 -m pytest $TESTS_DIR_CODEC_BE_ON_MR -v --html=report.html --self-contained-html --junit-xml=report-junit.xml --update_ref 1 -m create_ref --ref_encoder_path ./IVAS_cod --ref_decoder_path ./IVAS_dec_ref --testcase_timeout=$testcase_timeout || exit_code=$? + - python3 -m pytest $TESTS_DIR_CODEC_BE_ON_MR -v --html=report2.html --self-contained-html --junit-xml=report2-junit.xml --update_ref 1 -m create_ref_part2 --ref_encoder_path ./IVAS_cod --ref_decoder_path ./IVAS_dec_ref --testcase_timeout=$testcase_timeout || exit_code2=$? + - zero_failures=$(cat report-junit.xml report2-junit.xml | grep -c 'failures="0"') || true + + - if [ $zero_failures != 2 ] && [ $non_interop_flag == 0 ]; then echo "Non-interop cases without non-interop flag encountered!"; exit $EXIT_CODE_FAIL; fi + - if [ $zero_failures != 2 ] && [ $non_interop_flag == 1 ]; then echo "Non-interop cases with non-interop flag encountered"; exit $EXIT_CODE_NON_BE; fi + - exit 0 + + allow_failure: + exit_codes: + - 123 + artifacts: + name: "mr-$CI_MERGE_REQUEST_IID--sha-$CI_COMMIT_SHORT_SHA--stage-$CI_JOB_STAGE--results" + expire_in: 1 week + when: always + paths: + - report-junit.xml + - report.html + - report2-junit.xml + - report2.html + expose_as: "interop test results" + reports: + junit: + - report*-junit.xml + evs-pytest-on-merge-request: extends: - .test-job-linux diff --git a/tests/codec_be_on_mr_nonselection/test_masa_enc_dec.py b/tests/codec_be_on_mr_nonselection/test_masa_enc_dec.py index 134611ae046c82de39df2d9b56038e41fec06ba1..31dee0c233cf8c258a7796b35bc183dfa239bb1a 100644 --- a/tests/codec_be_on_mr_nonselection/test_masa_enc_dec.py +++ b/tests/codec_be_on_mr_nonselection/test_masa_enc_dec.py @@ -1,5 +1,4 @@ -__copyright__ = \ - """ +__copyright__ = """ (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, @@ -29,8 +28,7 @@ __copyright__ = \ the United Nations Convention on Contracts on the International Sales of Goods. """ -__doc__ = \ - """ +__doc__ = """ Test file to run C encoder and decoder code. The outputs are compared with C generated references. """ @@ -45,21 +43,37 @@ from tests.conftest import EncoderFrontend, DecoderFrontend from tests.cmp_pcm import cmp_pcm # params -#output_mode_list = ['MONO', 'STEREO', '5_1', '7_1', '5_1_2', '5_1_4', '7_1_4', 'FOA', 'HOA2', 'HOA3', 'BINAURAL', 'BINAURAL_ROOM', 'EXT'] -output_mode_list = ['BINAURAL', 'EXT'] -ivas_br_masa = [13200, 16400, 24400, 32000, 48000, 64000, 80000, 96000, 128000, 160000, 192000, 256000, 384000, 512000] +# output_mode_list = ['MONO', 'STEREO', '5_1', '7_1', '5_1_2', '5_1_4', '7_1_4', 'FOA', 'HOA2', 'HOA3', 'BINAURAL', 'BINAURAL_ROOM', 'EXT'] +output_mode_list = ["BINAURAL", "EXT"] +ivas_br_masa = [ + 13200, + 16400, + 24400, + 32000, + 48000, + 64000, + 80000, + 96000, + 128000, + 160000, + 192000, + 256000, + 384000, + 512000, +] # Write file-based parameter sets here (metafile, pcm/wave, numDir, numTC, DTX_toggle) -masa_metadata_audio_ndir_ntransportch_dtx_list = [#('stv1MASA1TC48c.met', 'stv1MASA1TC48c.wav', 1, 1, False), - #('stv1MASA2TC48c.met', 'stv1MASA2TC48c.wav', 1, 2, False), - ('stv2MASA1TC48c.met', 'stv2MASA1TC48c.wav', 2, 1, False), - #('stv2MASA2TC48c.met', 'stv2MASA2TC48c.wav', 2, 2, False), - #('stv1MASA1TC48n.met', 'stv1MASA1TC48n.wav', 1, 1, True), - ('stv1MASA2TC48n.met', 'stv1MASA2TC48n.wav', 1, 2, True)] +masa_metadata_audio_ndir_ntransportch_dtx_list = [ # ('stv1MASA1TC48c.met', 'stv1MASA1TC48c.wav', 1, 1, False), + # ('stv1MASA2TC48c.met', 'stv1MASA2TC48c.wav', 1, 2, False), + ("stv2MASA1TC48c.met", "stv2MASA1TC48c.wav", 2, 1, False), + # ('stv2MASA2TC48c.met', 'stv2MASA2TC48c.wav', 2, 2, False), + # ('stv1MASA1TC48n.met', 'stv1MASA1TC48n.wav', 1, 1, True), + ("stv1MASA2TC48n.met", "stv1MASA2TC48n.wav", 1, 2, True), +] # Used to not test every combination -test_split_br = [13200, 24400, 48000, 80000, 128000, 256000, 512000]; -AbsTol = '0' +test_split_br = [13200, 24400, 48000, 80000, 128000, 256000, 512000] +AbsTol = "0" def check_and_makedir(dir_path): @@ -74,20 +88,23 @@ def check_and_makedir(dir_path): @pytest.mark.create_ref @pytest.mark.parametrize("output_mode", output_mode_list) @pytest.mark.parametrize("ivas_br", ivas_br_masa) -@pytest.mark.parametrize("masa_metadata_audio_ndir_ntransportch_dtx", masa_metadata_audio_ndir_ntransportch_dtx_list) +@pytest.mark.parametrize( + "masa_metadata_audio_ndir_ntransportch_dtx", + masa_metadata_audio_ndir_ntransportch_dtx_list, +) def test_masa_enc_dec( - dut_encoder_frontend: EncoderFrontend, - dut_decoder_frontend: DecoderFrontend, - ref_encoder_path, - ref_decoder_path, - reference_path, - dut_base_path, - update_ref, - keep_files, - ivas_br, - masa_metadata_audio_ndir_ntransportch_dtx, - test_vector_path, - output_mode, + dut_encoder_frontend: EncoderFrontend, + dut_decoder_frontend: DecoderFrontend, + ref_encoder_frontend: EncoderFrontend, + ref_decoder_frontend: DecoderFrontend, + reference_path, + dut_base_path, + update_ref, + keep_files, + ivas_br, + masa_metadata_audio_ndir_ntransportch_dtx, + test_vector_path, + output_mode, ): # Input parameters in_fs = 48 @@ -102,13 +119,13 @@ def test_masa_enc_dec( # Apply test skipping here if dtx: - if output_mode != 'EXT': + if output_mode != "EXT": if ivas_br not in test_split_br: - pytest.skip("Skipping some DTX bitrates for other than EXT output to save time") + pytest.skip( + "Skipping some DTX bitrates for other than EXT output to save time" + ) # Set reference encoder and decoder - ref_encoder_frontend = EncoderFrontend(ref_encoder_path, "REF") - ref_decoder_frontend = DecoderFrontend(ref_decoder_path, "REF") # Set output paths out_dir_bs_ref = f"{reference_path}/masa_test/bitstreams" @@ -125,7 +142,7 @@ def test_masa_enc_dec( output_bitstream_dut = f"{out_dir_bs_dut}/masa{masa_channel_count}_ndirs{n_directions}_outputmode{output_mode}_ivasbr{ivas_br}k_DTX{dtx}.bts" dec_output_ref = f"{out_dir_dec_output_ref}/masa{masa_channel_count}_ndirs{n_directions}_outputmode{output_mode}_ivasbr{ivas_br}k_DTX{dtx}.wav" dec_output_dut = f"{out_dir_dec_output_dut}/masa{masa_channel_count}_ndirs{n_directions}_outputmode{output_mode}_ivasbr{ivas_br}k_DTX{dtx}.wav" - if output_mode == 'EXT': + if output_mode == "EXT": dec_met_output_ref = f"{dec_output_ref}.met" dec_met_output_dut = f"{dec_output_dut}.met" else: @@ -177,12 +194,14 @@ def test_masa_enc_dec( ) # Compare outputs. For EXT output, also compare metadata. - if output_mode == 'EXT': - # Compare metadata as binary blob + if output_mode == "EXT": + # Compare metadata as binary blob metacmp_res = cmp(dec_met_output_ref, dec_met_output_dut) - + # Compare audio outputs - pcmcmp_res, reason = cmp_pcm(dec_output_dut, dec_output_ref, output_mode, int(out_fs*1000)) + pcmcmp_res, reason = cmp_pcm( + dec_output_dut, dec_output_ref, output_mode, int(out_fs * 1000) + ) # Fail if compare fails compare result if metacmp_res == False and pcmcmp_res != 0: @@ -192,17 +211,19 @@ def test_masa_enc_dec( elif pcmcmp_res != 0: pytest.fail("Transport output difference detected") else: - print("Comparison bit exact") + print("Comparison bit exact") else: # Compare audio outputs filecmp_res = cmp(dec_output_ref, dec_output_dut) if filecmp_res == False: - cmp_result, reason = cmp_pcm(dec_output_dut, dec_output_ref, output_mode, int(out_fs*1000)) + cmp_result, reason = cmp_pcm( + dec_output_dut, dec_output_ref, output_mode, int(out_fs * 1000) + ) # Report compare result assert cmp_result == 0, reason else: - print("Comparison bit exact") + print("Comparison bit exact") # remove_output( # keep_files, @@ -218,18 +239,17 @@ def test_masa_enc_dec( ######################################################### # -------------------- test function -------------------- def ivas_enc( - encoder_frontend, - masa_channel_count, - masa_path, - ivas_br, - sampling_rate, - input_audio_path, - output_bitstream, - dtx: Optional[bool] = False, + encoder_frontend, + masa_channel_count, + masa_path, + ivas_br, + sampling_rate, + input_audio_path, + output_bitstream, + dtx: Optional[bool] = False, ): - # ------------ run cmd ------------ - options = ['-masa', f"{masa_channel_count}", f"{masa_path}"] + options = ["-masa", f"{masa_channel_count}", f"{masa_path}"] # call encoder encoder_frontend.run( @@ -243,13 +263,12 @@ def ivas_enc( def ivas_dec( - decoder_frontend, - output_mode, - sampling_rate, - input_bitstream, - output_path, + decoder_frontend, + output_mode, + sampling_rate, + input_bitstream, + output_path, ): - # -------- run cmd ------------ # call decoder @@ -262,13 +281,13 @@ def ivas_dec( def remove_output( - keep_files, - output_bitstream_ref, - output_bitstream_dut, - dec_output_ref, - dec_output_dut, - dec_met_output_ref: Optional[str] = None, - dec_met_output_dut: Optional[str] = None, + keep_files, + output_bitstream_ref, + output_bitstream_dut, + dec_output_ref, + dec_output_dut, + dec_met_output_ref: Optional[str] = None, + dec_met_output_dut: Optional[str] = None, ): if not keep_files: os.remove(output_bitstream_ref) diff --git a/tests/codec_be_on_mr_nonselection/test_param_file.py b/tests/codec_be_on_mr_nonselection/test_param_file.py index e09ec73c5ff4cd1271f23e04de2944a12e93e142..8c3ed5ccaedc820606535d1ec11df4d85c23c3c3 100644 --- a/tests/codec_be_on_mr_nonselection/test_param_file.py +++ b/tests/codec_be_on_mr_nonselection/test_param_file.py @@ -124,8 +124,8 @@ def convert_test_string_to_tag(test_string): def test_param_file_tests( dut_encoder_frontend: EncoderFrontend, dut_decoder_frontend: DecoderFrontend, - ref_encoder_path, - ref_decoder_path, + ref_encoder_frontend: EncoderFrontend, + ref_decoder_frontend: DecoderFrontend, reference_path, dut_base_path, test_vector_path, @@ -180,7 +180,7 @@ def test_param_file_tests( encode( dut_encoder_frontend, - ref_encoder_path, + ref_encoder_frontend, reference_path, dut_base_path, bitrate, @@ -272,7 +272,7 @@ def test_param_file_tests( stdout = decode( dut_decoder_frontend, - ref_decoder_path, + ref_decoder_frontend, reference_path, dut_base_path, output_config, @@ -291,7 +291,6 @@ def test_param_file_tests( output_differs, reason = cmp_pcm( dut_output_file, ref_output_file, output_config, fs ) - md_out_files = get_expected_md_files(ref_output_file, enc_split, output_config) metadata_differs = False @@ -331,8 +330,8 @@ def test_param_file_tests( def encode( - encoder_frontend, - ref_encoder_path, + dut_encoder_frontend, + ref_encoder_frontend, reference_path, dut_base_path, bitrate, @@ -355,9 +354,7 @@ def encode( if update_ref == 1 or update_ref == 2 and not os.path.exists(ref_out_file): check_and_makedir(ref_out_dir) # call REF encoder - assert ref_encoder_path - ref_encoder = EncoderFrontend(ref_encoder_path, "REF") - ref_encoder.run( + ref_encoder_frontend.run( bitrate, sampling_rate, testv_file, @@ -368,7 +365,7 @@ def encode( if update_ref in [0, 2]: check_and_makedir(dut_out_dir) # call DUT encoder - encoder_frontend.run( + dut_encoder_frontend.run( bitrate, sampling_rate, testv_file, @@ -447,7 +444,7 @@ def simulate( def decode( decoder_frontend, - ref_decoder_path, + ref_decoder_frontend, reference_path, dut_base_path, output_config, @@ -479,9 +476,7 @@ def decode( for x in dec_opts_list ] # call REF decoder - assert ref_decoder_path - ref_decoder = DecoderFrontend(ref_decoder_path, "REF") - ref_decoder.run( + ref_decoder_frontend.run( output_config, sampling_rate, ref_in_file, @@ -489,7 +484,7 @@ def decode( add_option_list=add_option_list, ) - stdout = ref_decoder.stdout + stdout = ref_decoder_frontend.stdout if update_ref in [0, 2]: check_and_makedir(dut_out_dir) diff --git a/tests/codec_be_on_mr_nonselection/test_sba_bs_enc.py b/tests/codec_be_on_mr_nonselection/test_sba_bs_enc.py index 3b05399784a3d7ca8a323a6b1d3f2ec05d712f34..d175e1583d4f5bfb2318ef484d1ff93b4c319e3c 100644 --- a/tests/codec_be_on_mr_nonselection/test_sba_bs_enc.py +++ b/tests/codec_be_on_mr_nonselection/test_sba_bs_enc.py @@ -1,5 +1,4 @@ -__copyright__ = \ - """ +__copyright__ = """ (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, @@ -29,8 +28,7 @@ __copyright__ = \ the United Nations Convention on Contracts on the International Sales of Goods. """ -__doc__ = \ - """ +__doc__ = """ Test file to run C encoder and decoder code. The outputs are compared with C generated references. """ @@ -43,28 +41,40 @@ from tests.cmp_pcm import cmp_pcm from tests.cut_pcm import cut_samples from tests.conftest import EncoderFrontend, DecoderFrontend from cut_bs import cut_from_start -# params - -tag_list = ['stvFOA'] -tag_list_HOA2 = ['stv2OA'] -tag_list_HOA3 = ['stv3OA'] -tag_list_bw_force = ['stvFOA'] -dtx_set = ['0', '1'] -dict_fsample_bw = {'48': '3', '32': '2', '16': '1'} -dict_bw_idx = {'FB': '3', 'SWB': '2', 'WB': '1'} -dict_bw_tag = {'SWB': '_ForceSWB', 'WB': '_ForceWB'} -ivas_br_FOA = ['13200','16400','32000','64000', '96000', '160000', '256000', '384000', '512000','sw_24k4_256k.bin'] -ivas_br_HOA2 = ['256000', '384000', '512000'] -ivas_br_HOA3 = ['256000', '384000', '512000'] +# params -sample_rate_list = ['48', '32', '16'] +tag_list = ["stvFOA"] +tag_list_HOA2 = ["stv2OA"] +tag_list_HOA3 = ["stv3OA"] + +tag_list_bw_force = ["stvFOA"] +dtx_set = ["0", "1"] +dict_fsample_bw = {"48": "3", "32": "2", "16": "1"} +dict_bw_idx = {"FB": "3", "SWB": "2", "WB": "1"} +dict_bw_tag = {"SWB": "_ForceSWB", "WB": "_ForceWB"} +ivas_br_FOA = [ + "13200", + "16400", + "32000", + "64000", + "96000", + "160000", + "256000", + "384000", + "512000", + "sw_24k4_256k.bin", +] +ivas_br_HOA2 = ["256000", "384000", "512000"] +ivas_br_HOA3 = ["256000", "384000", "512000"] + +sample_rate_list = ["48", "32", "16"] bypass_list = [1, 2] gain_list = [0, 1] -sample_rate_bw_idx_list = [('48', 'SWB'), ('48', 'WB'), ('32', 'WB')] +sample_rate_bw_idx_list = [("48", "SWB"), ("48", "WB"), ("32", "WB")] -AbsTol = '0' +AbsTol = "0" def check_and_makedir(dir_path): @@ -86,8 +96,8 @@ def test_bypass_enc( test_vector_path, reference_path, dut_base_path, - ref_encoder_path, - ref_decoder_path, + ref_encoder_frontend, + ref_decoder_frontend, update_ref, keep_files, tag, @@ -97,9 +107,9 @@ def test_bypass_enc( if update_ref == 1 and bypass == 1: pytest.skip() - tag = tag + fs + 'c' - ivas_br = '256000' - dtx = '0' + tag = tag + fs + "c" + ivas_br = "256000" + dtx = "0" max_bw = "FB" gain_flag = -1 sba_order = "+1" @@ -109,10 +119,10 @@ def test_bypass_enc( sba_enc( dut_encoder_frontend, test_vector_path, - ref_encoder_path, + ref_encoder_frontend, reference_path, dut_base_path, - None, + None, tag, fs, ivas_br, @@ -123,13 +133,13 @@ def test_bypass_enc( sba_order, update_ref, gain_flag, - cut_testv=True + cut_testv=True, ) # dec sba_dec( dut_decoder_frontend, - ref_decoder_path, + ref_decoder_frontend, reference_path, dut_base_path, tag, @@ -158,8 +168,8 @@ def test_sba_enc_system( test_vector_path, reference_path, dut_base_path, - ref_encoder_path, - ref_decoder_path, + ref_encoder_frontend, + ref_decoder_frontend, br_switch_file_path, update_ref, keep_files, @@ -170,26 +180,26 @@ def test_sba_enc_system( gain_flag, ): SID = 0 - if dtx == '1' and ivas_br not in ['13200','16400','24400','32000','64000']: + if dtx == "1" and ivas_br not in ["13200", "16400", "24400", "32000", "64000"]: # skip high bitrates for DTX until DTX issue is resolved pytest.skip() - if ivas_br == 'sw_24k4_256k.bin' and gain_flag != 1: + if ivas_br == "sw_24k4_256k.bin" and gain_flag != 1: pytest.skip() - if ivas_br == '13200' or ivas_br == '16400': - if dtx == '1' and gain_flag == 0 and fs != '16': + if ivas_br == "13200" or ivas_br == "16400": + if dtx == "1" and gain_flag == 0 and fs != "16": SID = 1 else: pytest.skip() - if gain_flag == 1 and ivas_br not in ['13200','16400','24400','32000']: + if gain_flag == 1 and ivas_br not in ["13200", "16400", "24400", "32000"]: pytest.skip() - tag = tag + fs + 'c' + tag = tag + fs + "c" max_bw = "FB" bypass = -1 sba_order = "+1" output_config = "FOA" if gain_flag == 1: cut_gain = "16.0" - elif dtx == '1': + elif dtx == "1": cut_gain = ".004" else: cut_gain = "1.0" @@ -197,7 +207,7 @@ def test_sba_enc_system( sba_enc( dut_encoder_frontend, test_vector_path, - ref_encoder_path, + ref_encoder_frontend, reference_path, dut_base_path, br_switch_file_path, @@ -213,13 +223,13 @@ def test_sba_enc_system( gain_flag, cut_gain=cut_gain, create_dutenc=True, - cut_testv=True + cut_testv=True, ) # dec sba_dec( dut_decoder_frontend, - ref_decoder_path, + ref_decoder_frontend, reference_path, dut_base_path, tag, @@ -245,18 +255,18 @@ def test_spar_hoa2_enc_system( test_vector_path, reference_path, dut_base_path, - ref_encoder_path, - ref_decoder_path, + ref_encoder_frontend, + ref_decoder_frontend, update_ref, keep_files, ivas_br, tag, ): - fs = '48' - dtx = '0' + fs = "48" + dtx = "0" gain_flag = -1 - tag = tag + fs + 'c' + tag = tag + fs + "c" max_bw = "FB" bypass = -1 sba_order = "+2" @@ -266,7 +276,7 @@ def test_spar_hoa2_enc_system( sba_enc( dut_encoder_frontend, test_vector_path, - ref_encoder_path, + ref_encoder_frontend, reference_path, dut_base_path, None, @@ -285,7 +295,7 @@ def test_spar_hoa2_enc_system( # dec sba_dec( dut_decoder_frontend, - ref_decoder_path, + ref_decoder_frontend, reference_path, dut_base_path, tag, @@ -311,18 +321,18 @@ def test_spar_hoa3_enc_system( test_vector_path, reference_path, dut_base_path, - ref_encoder_path, - ref_decoder_path, + ref_encoder_frontend, + ref_decoder_frontend, update_ref, keep_files, ivas_br, tag, ): - fs = '48' - dtx = '0' + fs = "48" + dtx = "0" gain_flag = -1 - tag = tag + fs + 'c' + tag = tag + fs + "c" max_bw = "FB" bypass = -1 sba_order = "+3" @@ -332,7 +342,7 @@ def test_spar_hoa3_enc_system( sba_enc( dut_encoder_frontend, test_vector_path, - ref_encoder_path, + ref_encoder_frontend, reference_path, dut_base_path, None, @@ -351,7 +361,7 @@ def test_spar_hoa3_enc_system( # dec sba_dec( dut_decoder_frontend, - ref_decoder_path, + ref_decoder_frontend, reference_path, dut_base_path, tag, @@ -379,8 +389,8 @@ def test_sba_enc_BWforce_system( test_vector_path, reference_path, dut_base_path, - ref_encoder_path, - ref_decoder_path, + ref_encoder_frontend, + ref_decoder_frontend, update_ref, keep_files, ivas_br, @@ -388,16 +398,16 @@ def test_sba_enc_BWforce_system( tag, sample_rate_bw_idx, ): - if dtx == '1' and ivas_br not in ['32000','64000']: + if dtx == "1" and ivas_br not in ["32000", "64000"]: # skip high bitrates for DTX until DTX issue is resolved pytest.skip() - if ivas_br == '13200' or ivas_br == '16400': + if ivas_br == "13200" or ivas_br == "16400": pytest.skip() - if ivas_br == 'sw_24k4_256k.bin': + if ivas_br == "sw_24k4_256k.bin": pytest.skip() fs = sample_rate_bw_idx[0] bw = sample_rate_bw_idx[1] - tag = tag + fs + 'c' + tag = tag + fs + "c" bypass = -1 gain_flag = -1 sba_order = "+1" @@ -407,7 +417,7 @@ def test_sba_enc_BWforce_system( sba_enc( dut_encoder_frontend, test_vector_path, - ref_encoder_path, + ref_encoder_frontend, reference_path, dut_base_path, None, @@ -421,13 +431,13 @@ def test_sba_enc_BWforce_system( sba_order, update_ref, gain_flag, - cut_testv=True + cut_testv=True, ) # dec sba_dec( dut_decoder_frontend, - ref_decoder_path, + ref_decoder_frontend, reference_path, dut_base_path, tag, @@ -449,7 +459,7 @@ def test_sba_enc_BWforce_system( def sba_enc( encoder_frontend, test_vector_path, - ref_encoder_path, + ref_encoder_frontend, reference_path, dut_base_path, br_switch_file_path, @@ -463,11 +473,10 @@ def sba_enc( sba_order, update_ref, gain_flag, - cut_gain='1.0', + cut_gain="1.0", create_dutenc=False, - cut_testv=False + cut_testv=False, ): - # ------------ run cmd ------------ dut_out_dir = f"{dut_base_path}/sba_bs/pkt" ref_out_dir = f"{reference_path}/sba_bs/pkt" @@ -484,13 +493,13 @@ def sba_enc( tag = tag + dict_bw_tag[ivas_max_bw] tag_out = f"{tag}_ivasbr{ivas_br[:-3]}k_DTX{dtx}" - if ivas_br == 'sw_24k4_256k.bin': - ivas_br = f"{br_switch_file_path}/sw_24k4_256k.bin" + if ivas_br == "sw_24k4_256k.bin": + ivas_br = f"{br_switch_file_path}/sw_24k4_256k.bin" short_tag_ext = "" if gain_flag == 1: - short_tag_ext += f'_Gain{gain_flag}' + short_tag_ext += f"_Gain{gain_flag}" if SID == 1: - short_tag_ext += f'_SID' + short_tag_ext += f"_SID" # we update only bypass = 0/2 (bypass 1 is the same as the baseline) if bypass in [0, 2]: short_tag_ext += f"_pca{bypass}" @@ -507,10 +516,12 @@ def sba_enc( if SID == 1: dut_pkt_file_cut = f"{dut_out_dir}/{tag_out}{long_tag_ext}_cut.pkt" ref_pkt_file_cut = f"{ref_out_dir}/{tag_out}{short_tag_ext}_cut.pkt" - ref_pkt_file_dutenc_cut = f"{ref_out_dir}/{tag_out}{short_tag_ext}_dutenc_cut.pkt" + ref_pkt_file_dutenc_cut = ( + f"{ref_out_dir}/{tag_out}{short_tag_ext}_dutenc_cut.pkt" + ) input_path = f"{test_vector_path}/{tag_in}{in_extension}" bypass_mode = bypass if bypass >= 0 else None - dtx_mode = dtx == '1' + dtx_mode = dtx == "1" if cut_testv: # use shortened and potentially gain adjusted input PCM file - create if not present @@ -523,13 +534,20 @@ def sba_enc( else: cut_file = f"{test_vector_path}/{tag_in}_cut_{cut_gain}{in_extension}" if not os.path.exists(cut_file): - cut_samples(input_path, cut_file, num_channels, sampling_rate + "000", cut_from, cut_len, cut_gain) + cut_samples( + input_path, + cut_file, + num_channels, + sampling_rate + "000", + cut_from, + cut_len, + cut_gain, + ) input_path = cut_file - if ref_encoder_path: - ref_encoder = EncoderFrontend(ref_encoder_path, "REF") + if ref_encoder_frontend: # call REF encoder - ref_encoder.run( + ref_encoder_frontend.run( ivas_br, sampling_rate, input_path, @@ -566,10 +584,10 @@ def sba_enc( ) if SID == 1: - if ref_encoder_path: + if ref_encoder_frontend: with open(ref_pkt_file, "rb") as fp_in: - with open(ref_pkt_file_cut, "wb") as fp_out: - fr_cnt, cut_cnt = cut_from_start(fp_in, fp_out, 0, True) + with open(ref_pkt_file_cut, "wb") as fp_out: + fr_cnt, cut_cnt = cut_from_start(fp_in, fp_out, 0, True) with open(ref_pkt_file_dutenc, "rb") as fp_in: with open(ref_pkt_file_dutenc_cut, "wb") as fp_out: fr_cnt, cut_cnt = cut_from_start(fp_in, fp_out, 0, True) @@ -577,12 +595,14 @@ def sba_enc( os.remove(ref_pkt_file_dutenc) if update_ref == 0: with open(dut_pkt_file, "rb") as fp_in: - with open(dut_pkt_file_cut, "wb") as fp_out: - fr_cnt, cut_cnt = cut_from_start(fp_in, fp_out, 0, True) - os.remove(dut_pkt_file) + with open(dut_pkt_file_cut, "wb") as fp_out: + fr_cnt, cut_cnt = cut_from_start(fp_in, fp_out, 0, True) + os.remove(dut_pkt_file) + + def sba_dec( decoder_frontend, - ref_decoder_path, + ref_decoder_frontend, reference_path, dut_base_path, tag, @@ -597,7 +617,6 @@ def sba_dec( gain_flag, keep_files, ): - # -------- run cmd ------------ # sampling rate to BW mapping bw_idx = dict_fsample_bw[sampling_rate] @@ -608,12 +627,12 @@ def sba_dec( short_tag_ext = "" if gain_flag == 1: - short_tag_ext += f'_Gain{gain_flag}' + short_tag_ext += f"_Gain{gain_flag}" # we update only bypass = 0/2 (bypass 1 is the same as the baseline) if bypass in [0, 2]: short_tag_ext += f"_pca{bypass}" if SID == 1: - short_tag_ext += f'_SID_cut' + short_tag_ext += f"_SID_cut" # to avoid conflicting names in case of parallel test execution, differentiate all cases if gain_flag == 1: long_tag_ext = f"_Gain{gain_flag}" @@ -633,11 +652,9 @@ def sba_dec( check_and_makedir(dut_out_dir) check_and_makedir(ref_out_dir) - if ref_decoder_path: - ref_decoder = DecoderFrontend(ref_decoder_path, "REF") - + if ref_decoder_frontend: # call REF decoder - ref_decoder.run( + ref_decoder_frontend.run( output_config, sampling_rate, ref_in_pkt, diff --git a/tests/conftest.py b/tests/conftest.py index ac1b37121af50d3ba529766cc0ee4eb75b4b9227..3c6d18ceb339e677897d33aaedcf26ad5df29f13 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,4 @@ -__copyright__ = \ -""" +__copyright__ = """ (C) 2022-2023 IVAS codec Public Collaboration with portions copyright Dolby International AB, Ericsson AB, Fraunhofer-Gesellschaft zur Foerderung der angewandten Forschung e.V., Huawei Technologies Co. LTD., Koninklijke Philips N.V., Nippon Telegraph and Telephone Corporation, Nokia Technologies Oy, Orange, @@ -29,17 +28,16 @@ accordance with the laws of the Federal Republic of Germany excluding its confli the United Nations Convention on Contracts on the International Sales of Goods. """ -__doc__ = \ -""" +__doc__ = """ Pytest customization (configuration and fixtures) for the IVAS codec test suite. """ import logging from pathlib import Path import platform -from subprocess import run +from subprocess import run, TimeoutExpired import textwrap -from typing import Optional +from typing import Optional, Union import os from tests import testconfig import pytest @@ -132,19 +130,25 @@ def pytest_addoption(parser): help="By default, the DUT output files of successful tests are deleted." " Use --keep_files to prevent these deletions.", ) - + parser.addoption( "--compare_bitstream", action="store_true", help="By default, the IVAS encoded bitstream is not compared with ref." " Use --compare_bitstream to compare IVAS encoded bitstream.", ) - parser.addoption( "--selection_be_md5_file", type=Path, - help="Path to file with md5 sums for the reference signals of the selection-BE test" + help="Path to file with md5 sums for the reference signals of the selection-BE test", + ) + + parser.addoption( + "--testcase_timeout", + type=int, + default=None, + help="Timeout in seconds for each individual testcase. Default is no timeout.", ) @@ -166,6 +170,7 @@ def keep_files(request) -> bool: """ return request.config.option.keep_files + @pytest.fixture(scope="session") def compare_bitstream(request) -> bool: """ @@ -173,6 +178,7 @@ def compare_bitstream(request) -> bool: """ return request.config.option.compare_bitstream + @pytest.fixture(scope="session") def dut_encoder_path(request) -> str: """ @@ -200,12 +206,13 @@ def dut_encoder_path(request) -> str: class EncoderFrontend: - def __init__(self, path, enc_type) -> None: + def __init__(self, path, enc_type, timeout=None) -> None: self._path = path self._type = enc_type self.returncode = None self.stdout = None self.stderr = None + self.timeout = timeout def run( self, @@ -252,7 +259,11 @@ class EncoderFrontend: cmd_str = textwrap.indent(" ".join(command), prefix="\t") log_dbg_msg(f"{self._type} encoder command:\n{cmd_str}") - result = run(command, capture_output=True, check=False) + try: + result = run(command, capture_output=True, check=True, timeout=self.timeout) + except TimeoutExpired: + pytest.fail(f"{self._type} encoder run timed out after {self.timeout}s.") + self.returncode = result.returncode self.stderr = result.stderr.decode("ascii") self.stdout = result.stdout.decode("ascii") @@ -263,12 +274,16 @@ class EncoderFrontend: stderr_str = textwrap.indent(self.stderr, prefix="\t") log_dbg_msg(f"{self._type} encoder stderr:\n{stderr_str}") if self.returncode: - pytest.fail(f"{self._type} encoder terminated with a non-0 return code: {self.returncode}") + pytest.fail( + f"{self._type} encoder terminated with a non-0 return code: {self.returncode}" + ) def _check_run(self): if self.returncode is not None: if self.returncode: - pytest.fail(f"{self._type} encoder terminated with a non-0 return code: {self.returncode}") + pytest.fail( + f"{self._type} encoder terminated with a non-0 return code: {self.returncode}" + ) else: logger.warning("%s encoder was set-up, but not run", self._type) # next assert is not OK since stderr contains messages even when encoding was successful @@ -276,11 +291,12 @@ class EncoderFrontend: @pytest.fixture(scope="function") -def dut_encoder_frontend(dut_encoder_path) -> EncoderFrontend: +def dut_encoder_frontend(dut_encoder_path, request) -> EncoderFrontend: """ Return a :class:`conftest.EncoderFrontend` instance as DUT for the test session. """ - encoder = EncoderFrontend(dut_encoder_path, "DUT") + timeout = request.config.getoption("--testcase_timeout") + encoder = EncoderFrontend(dut_encoder_path, "DUT", timeout=timeout) yield encoder # Fixture teardown @@ -317,6 +333,24 @@ def ref_encoder_path(request) -> str: return path +@pytest.fixture(scope="function") +def ref_encoder_frontend(ref_encoder_path, request) -> Union[None, EncoderFrontend]: + """ + Return a :class:`conftest.EncoderFrontend` instance as REF for the test session. + """ + encoder = None + + if ref_encoder_path: + timeout = request.config.getoption("--testcase_timeout") + encoder = EncoderFrontend(ref_encoder_path, "REF", timeout=timeout) + + yield encoder + + if encoder is not None: + # Fixture teardown + encoder._check_run() + + @pytest.fixture(scope="session") def dut_decoder_path(request) -> str: """ @@ -344,12 +378,13 @@ def dut_decoder_path(request) -> str: class DecoderFrontend: - def __init__(self, path, dec_type) -> None: + def __init__(self, path, dec_type, timeout=None) -> None: self._path = path self._type = dec_type self.returncode = None self.stdout = None self.stderr = None + self.timeout = timeout def run( self, @@ -386,7 +421,11 @@ class DecoderFrontend: cmd_str = textwrap.indent(" ".join(command), prefix="\t") log_dbg_msg(f"{self._type} decoder command:\n{cmd_str}") - result = run(command, capture_output=True, check=False) + try: + result = run(command, capture_output=True, check=False, timeout=self.timeout) + except TimeoutExpired: + pytest.fail(f"{self._type} decoder run timed out after {self.timeout}s.") + self.returncode = result.returncode self.stderr = result.stderr.decode("ascii") self.stdout = result.stdout.decode("ascii") @@ -397,12 +436,16 @@ class DecoderFrontend: stderr_str = textwrap.indent(self.stderr, prefix="\t") log_dbg_msg(f"{self._type} decoder stderr:\n{stderr_str}") if self.returncode: - pytest.fail(f"{self._type} decoder terminated with a non-0 return code: {self.returncode}") + pytest.fail( + f"{self._type} decoder terminated with a non-0 return code: {self.returncode}" + ) def _check_run(self): if self.returncode is not None: if self.returncode: - pytest.fail(f"{self._type} decoder terminated with a non-0 return code: {self.returncode}") + pytest.fail( + f"{self._type} decoder terminated with a non-0 return code: {self.returncode}" + ) else: logger.warning("%s decoder was set-up, but not run", self._type) # next assert is not OK since stderr contains messages even when decoding was successful @@ -410,11 +453,13 @@ class DecoderFrontend: @pytest.fixture(scope="function") -def dut_decoder_frontend(dut_decoder_path) -> DecoderFrontend: +def dut_decoder_frontend(dut_decoder_path, request) -> DecoderFrontend: """ Return a :class:`conftest.DecoderFrontend` instance as DUT for the test session. """ - decoder = DecoderFrontend(dut_decoder_path, "DUT") + decoder = DecoderFrontend( + dut_decoder_path, "DUT", timeout=request.config.getoption("--testcase_timeout") + ) yield decoder # Fixture teardown @@ -451,6 +496,24 @@ def ref_decoder_path(request) -> str: return path +@pytest.fixture(scope="function") +def ref_decoder_frontend(ref_decoder_path, request) -> Union[None, DecoderFrontend]: + """ + Return a :class:`conftest.DecoderFrontend` instance as DUT for the test session. + """ + decoder = None + + if ref_decoder_path: + timeout = request.config.getoption("--testcase_timeout") + decoder = DecoderFrontend(ref_decoder_path, "REF", timeout=timeout) + + yield decoder + + if decoder is not None: + # Fixture teardown + decoder._check_run() + + @pytest.fixture(scope="session") def test_vector_path(request) -> str: """ @@ -466,7 +529,8 @@ def test_vector_path(request) -> str: path = str(path.resolve()) return path - + + @pytest.fixture(scope="session") def br_switch_file_path(request) -> str: """ @@ -483,6 +547,7 @@ def br_switch_file_path(request) -> str: return path + @pytest.fixture(scope="session") def reference_path(request) -> str: """ @@ -499,7 +564,9 @@ def reference_path(request) -> str: if request.config.option.update_ref == "0": if not os.path.isdir(path): - raise FileNotFoundError(f"REF path {path} not found!\nPlease generate the references, first!\n!") + raise FileNotFoundError( + f"REF path {path} not found!\nPlease generate the references, first!\n!" + ) return path @@ -522,20 +589,23 @@ def dut_base_path(request) -> str: def pytest_configure(config): - config.addinivalue_line( - "markers", "serial: mark test to run only in serial" - ) + config.addinivalue_line("markers", "serial: mark test to run only in serial") config.addinivalue_line( "markers", "create_ref: mark test capable of producing references" ) config.addinivalue_line( - "markers", "create_ref_part2: reference creation test that depends on create_ref references" + "markers", + "create_ref_part2: reference creation test that depends on create_ref references", ) if config.option.param_file: testconfig.PARAM_FILE = config.option.param_file if config.option.selection_be_md5_file: md5_file_path = config.option.selection_be_md5_file if not platform.system() == "Windows": - raise NotImplementedError("MD5 comparison is currently hardcoded for windows") + raise NotImplementedError( + "MD5 comparison is currently hardcoded for windows" + ) with open(md5_file_path) as f: - testconfig.MD5_REF_DICT = {line.split()[0]: line.split()[1] for line in f.readlines()} + testconfig.MD5_REF_DICT = { + line.split()[0]: line.split()[1] for line in f.readlines() + }