clean up sfcf input types

This commit is contained in:
Justus Kuhlmann 2025-03-29 11:10:26 +00:00
parent 7a3a28dad0
commit f44b19c9d1

View file

@ -7,7 +7,7 @@ from ..obs import Obs
from .utils import sort_names, check_idl from .utils import sort_names, check_idl
import itertools import itertools
from numpy import ndarray from numpy import ndarray
from typing import Any, Union, Optional from typing import Any, Union, Optional, Literal
sep = "/" sep = "/"
@ -143,14 +143,14 @@ def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: l
dict[name][quarks][offset][wf][wf2] = list[Obs] dict[name][quarks][offset][wf][wf2] = list[Obs]
""" """
if kwargs.get('im'): im: Literal[1, 0] = 0
part: str = 'real'
if kwargs.get('im', False):
im = 1 im = 1
part = 'imaginary' part = 'imaginary'
else:
im = 0
part = 'real'
known_versions = ["0.0", "1.0", "2.0", "1.0c", "2.0c", "1.0a", "2.0a"] known_versions: list = ["0.0", "1.0", "2.0", "1.0c", "2.0c", "1.0a", "2.0a"]
if version not in known_versions: if version not in known_versions:
raise Exception("This version is not known!") raise Exception("This version is not known!")
@ -165,9 +165,9 @@ def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: l
else: else:
compact = False compact = False
appended = False appended = False
ls = kwargs.get("replica") ls: list = kwargs.get("replica", [])
if ls is None: if ls == []:
ls = []
for (dirpath, dirnames, filenames) in os.walk(path): for (dirpath, dirnames, filenames) in os.walk(path):
if not appended: if not appended:
ls.extend(dirnames) ls.extend(dirnames)
@ -180,7 +180,7 @@ def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: l
for exc in ls: for exc in ls:
if not fnmatch.fnmatch(exc, prefix + '*'): if not fnmatch.fnmatch(exc, prefix + '*'):
ls = list(set(ls) - set([exc])) ls = list(set(ls) - set([exc]))
replica: int = 0
if not appended: if not appended:
ls = sort_names(ls) ls = sort_names(ls)
replica = len(ls) replica = len(ls)
@ -246,6 +246,7 @@ def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: l
for key in needed_keys: for key in needed_keys:
internal_ret_dict[key] = [] internal_ret_dict[key] = []
rep_idl: list = []
if not appended: if not appended:
for i, item in enumerate(ls): for i, item in enumerate(ls):
rep_path = path + '/' + item rep_path = path + '/' + item
@ -318,7 +319,7 @@ def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: l
internal_ret_dict[key][t].append(rep_deltas[key][t]) internal_ret_dict[key][t].append(rep_deltas[key][t])
else: else:
for key in needed_keys: for key in needed_keys:
rep_data = [] rep_data: list = []
name = _key2specs(key)[0] name = _key2specs(key)[0]
for subitem in sub_ls: for subitem in sub_ls:
cfg_path = path + '/' + item + '/' + subitem cfg_path = path + '/' + item + '/' + subitem
@ -350,6 +351,7 @@ def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: l
deltas = [] deltas = []
for rep, file in enumerate(name_ls): for rep, file in enumerate(name_ls):
rep_idl = [] rep_idl = []
rep_data = []
filename = path + '/' + file filename = path + '/' + file
T, rep_idl, rep_data = _read_append_rep(filename, pattern, intern[name]['b2b'], cfg_separator, im, intern[name]['single']) T, rep_idl, rep_data = _read_append_rep(filename, pattern, intern[name]['b2b'], cfg_separator, im, intern[name]['single'])
if rep == 0: if rep == 0:
@ -362,12 +364,12 @@ def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: l
if name == name_list[0]: if name == name_list[0]:
idl.append(rep_idl) idl.append(rep_idl)
if kwargs.get("check_configs") is True: che: Union[list[list[int]], None] = kwargs.get("check_configs", None)
if che is not None:
if not silent: if not silent:
print("Checking for missing configs...") print("Checking for missing configs...")
che = kwargs.get("check_configs")
if not (len(che) == len(idl)): if not (len(che) == len(idl)):
raise Exception("check_configs has to be the same length as replica!") raise Exception("check_configs has to have an entry for each replicum!")
for r in range(len(idl)): for r in range(len(idl)):
if not silent: if not silent:
print("checking " + new_names[r]) print("checking " + new_names[r])