from typing import *
from collections import defaultdict
from enum import Enum
import inspect
import json
import os
from pathlib import Path
import pickle as pkl
import pydoc
import recordclass
import shutil
import subprocess
import typing_inspect
import yaml
[docs]def is_obj_record_class(obj: Any) -> bool:
return obj is not None \
and isinstance(obj, recordclass.mutabletuple) or isinstance(obj, recordclass.dataobject)
[docs]def is_clz_record_class(clz: Type) -> bool:
return clz is not None \
and inspect.isclass(clz) \
and (issubclass(clz, recordclass.mutabletuple) or issubclass(clz, recordclass.dataobject))
[docs]class IOUtils:
"""
Utility functions for I/O.
"""
# ----------
# Directory operations
[docs] class cd:
"""
Change directory. Usage:
with IOUtils.cd(path):
<statements>
# end with
Using a string path is supported for backward compatibility.
Using pathlib.Path should be preferred.
"""
def __init__(self, path: Union[str, Path]):
if isinstance(path, str):
path = Path(path)
# end if
self.path = path # Path
self.old_path = Path.cwd() # Path
return
def __enter__(self):
os.chdir(self.path)
return
def __exit__(self, type, value, tb):
os.chdir(self.old_path)
return
# Deprecated
# Use pathlib.Path.is_dir() instead
[docs] @classmethod
def has_dir(cls, dirname) -> bool:
return os.path.isdir(dirname)
# Deprecated
# Use pathlib.Path.mkdir() instead
[docs] @classmethod
def mk_dir(cls, dirname, mode=0o777,
is_remove_if_exists: bool = False,
is_make_parent: bool = True):
"""
Makes the directory.
:param dirname: the name of the directory.
:param mode: mode of the directory.
:param is_remove_if_exists: if the directory with name already exists, whether to remove.
:param is_make_parent: if make parent directory if not exists.
"""
if cls.has_dir(dirname):
if is_remove_if_exists:
rm_cmd = "rm {} -rf".format(dirname)
subprocess.run(["bash", "-c", rm_cmd])
else:
return
# end if
parent_dir = os.path.dirname(dirname)
if not cls.has_dir(parent_dir):
if is_make_parent:
cls.mk_dir(parent_dir, mode, is_remove_if_exists=False, is_make_parent=True)
else:
raise FileNotFoundError("Path not found: {}".format(parent_dir))
# end if
os.mkdir(dirname, mode)
return
[docs] @classmethod
def rm_dir(
cls,
path: Path,
ignore_non_exist: bool = True,
force: bool = True,
):
"""
Removes the directory.
:param path: the name of the directory.
:param ignore_non_exist: ignores error if the directory does not exist.
:param force: force remove the directory even it's non-empty.
"""
if path.is_dir():
if force:
shutil.rmtree(path, ignore_errors=True)
else:
path.rmdir()
# end if
else:
if ignore_non_exist:
return
else:
raise FileNotFoundError("Trying to remove non-exist directory {}".format(path))
# end if
# end if
return
[docs] @classmethod
def rm(
cls,
path: Path,
ignore_non_exist: bool = True,
force: bool = True,
):
"""
Removes the file/dir.
:param path: the path to the file/dir to remove.
:param ignore_non_exist: ignores error if the file/dir does not exist.
:param force: force remove the file even it's protected / dir even it's non-empty.
"""
if path.is_dir():
cls.rm_dir(path, ignore_non_exist=ignore_non_exist, force=force)
else:
if path.exists():
path.unlink()
else:
if not ignore_non_exist:
raise IOError(f"{path} does not exist")
# ----------
# File operations
IO_FORMATS: Dict[Format, Dict] = defaultdict(lambda: {
"mode": "t",
"dumpf": (lambda obj, f: f.write(obj)),
"loadf": (lambda f: f.read())
})
IO_FORMATS[Format.pkl]["mode"] = "b"
IO_FORMATS[Format.pkl]["dumpf"] = lambda obj, f: pkl.dump(obj, f, protocol=pkl.HIGHEST_PROTOCOL)
IO_FORMATS[Format.pkl]["loadf"] = lambda f: pkl.load(f)
IO_FORMATS[Format.jsonPretty]["dumpf"] = lambda obj, f: json.dump(obj, f, indent=4, sort_keys=True)
IO_FORMATS[Format.jsonPretty]["loadf"] = lambda f: yaml.load(f, Loader=yaml.FullLoader) # allows some format errors (e.g., trailing commas)
IO_FORMATS[Format.jsonNoSort]["dumpf"] = lambda obj, f: json.dump(obj, f, indent=4)
IO_FORMATS[Format.jsonNoSort]["loadf"] = lambda f: yaml.load(f, Loader=yaml.FullLoader) # allows some format errors (e.g., trailing commas)
IO_FORMATS[Format.json]["dumpf"] = lambda obj, f: json.dump(obj, f, sort_keys=True)
IO_FORMATS[Format.json]["loadf"] = lambda f: json.load(f)
IO_FORMATS[Format.yaml]["dumpf"] = lambda obj, f: yaml.dump(obj, f)
IO_FORMATS[Format.yaml]["loadf"] = lambda f: yaml.load(f, Loader=yaml.FullLoader)
[docs] @classmethod
def dumpf_json_list(cls, obj, f):
for item in obj:
f.write(json.dumps(item) + "\n")
[docs] @classmethod
def loadf_json_list(cls, f) -> List:
obj = []
for line in f.readlines():
obj.append(json.loads(line))
return obj
IO_FORMATS[Format.jsonList]["dumpf"] = lambda obj, f: IOUtils.dumpf_json_list(obj, f)
IO_FORMATS[Format.jsonList]["loadf"] = lambda f: IOUtils.loadf_json_list(f)
[docs] @classmethod
def dumpf_txt_list(cls, obj, f):
for item in obj:
f.write(str(item) + "\n")
[docs] @classmethod
def loadf_txt_list(cls, f) -> List:
return f.read().splitlines()
IO_FORMATS[Format.txtList]["dumpf"] = lambda obj, f: IOUtils.dumpf_txt_list(obj, f)
IO_FORMATS[Format.txtList]["loadf"] = lambda f: IOUtils.loadf_txt_list(f)
[docs] @classmethod
def dump(
cls,
file_path: Union[str, Path],
obj: object,
fmt: Union[Format, str] = Format.jsonPretty,
append: bool = False,
) -> None:
"""
Saves an object to the file in the specified format.
By default, the format is json pretty-print, and the existing content in the file will be erased.
:param file_path: the file to save the object into.
:param obj: the object to save.
:param fmt: the format, one of IOUtils.Format.
:param append: if true, appends to the file instead of erasing existing content in the file.
"""
if isinstance(file_path, str):
file_path = Path(file_path)
file_path.touch(exist_ok=True)
if isinstance(fmt, str): fmt = cls.Format.from_str(fmt)
conf = cls.IO_FORMATS[fmt]
write_mode = "w" if not append else "a"
with open(file_path, write_mode + conf["mode"]) as f:
conf["dumpf"](obj, f)
return
[docs] @classmethod
def load(cls, file_path: Union[str, Path], fmt: Union[Format, str] = Format.jsonPretty) -> Any:
if isinstance(file_path, str):
file_path = Path(file_path)
# end if
if isinstance(fmt, str): fmt = cls.Format.from_str(fmt)
conf = cls.IO_FORMATS[fmt]
try:
with open(file_path, "r" + conf["mode"]) as f:
obj = conf["loadf"](f)
# end with
except FileNotFoundError as e:
raise FileNotFoundError(str(e) + " at {}".format(Path.cwd()))
# end try
return obj
[docs] @classmethod
def load_json_stream(cls, file_path: Union[str, Path], fmt: Union[Format, str] = Format.jsonPretty):
"""
Reads large json file containing a list of data iteratively. Returns a generator function.
"""
import ijson
if isinstance(file_path, str):
file_path = Path(file_path)
# end if
if isinstance(fmt, str): fmt = cls.Format.from_str(fmt)
conf = cls.IO_FORMATS[fmt]
try:
with open(file_path, "r" + conf["mode"]) as f:
objects = ijson.items(f, "item")
for obj in objects:
yield obj
# end with
except FileNotFoundError as e:
raise FileNotFoundError(str(e) + " at {}".format(Path.cwd()))
# end try
[docs] @classmethod
def update_json(cls, file_name, data):
"""
Updates the json data file. The data should be dict like (support update).
"""
try:
orig_data = cls.load(file_name)
except:
orig_data = dict()
# end try
orig_data.update(data)
cls.dump(file_name, orig_data)
return orig_data
[docs] @classmethod
def extend_json(cls, file_name, data):
"""
Updates the json data file. The data should be list like (support extend).
"""
try:
orig_data = cls.load(file_name)
except:
orig_data = list()
# end try
orig_data.extend(data)
cls.dump(file_name, orig_data)
return orig_data
JSONFY_FUNC_NAME = "jsonfy"
DEJSONFY_FUNC_NAME = "dejsonfy"
JSONFY_ATTR_FIELD_NAME = "jsonfy_attr"
[docs] @classmethod
def jsonfy(cls, obj):
"""
Turns an object to a json-compatible data structure.
A json-compatible data can only have list, dict (with str keys), str, int and float.
Any object of other classes will be casted through (try each option in order, if applicable):
1. JSONFY function, which takes no argument and returns a json-compatible data;
should have the name {@link IOUtils#JSONFY_FUNC_NAME};
2. JSONFY_ATTR field, which is a dict of attribute name-type pairs, that will be extracted from the object to a dict;
should have the name {@link IOUtils#JSONFY_ATTR_FIELD_NAME};
3. cast to a string.
"""
if obj is None:
return None
elif isinstance(obj, (int, float, str, bool)):
# primitive types
return obj
elif isinstance(obj, (list, set, tuple)):
# array
return [cls.jsonfy(item) for item in obj]
elif isinstance(obj, dict):
# dict
return {k: cls.jsonfy(v) for k, v in obj.items()}
elif isinstance(obj, Enum):
# Enum
return obj.value
elif hasattr(obj, cls.JSONFY_FUNC_NAME):
# with jsonfy function
return getattr(obj, cls.JSONFY_FUNC_NAME)()
elif hasattr(obj, cls.JSONFY_ATTR_FIELD_NAME):
# with jsonfy_attr annotations
return {attr: cls.jsonfy(getattr(obj, attr)) for attr in getattr(obj, cls.JSONFY_ATTR_FIELD_NAME).keys()}
elif is_obj_record_class(obj):
# RecordClass
if hasattr(obj, "__dict__"):
# Older versions of recordclass
return {k: cls.jsonfy(v) for k, v in obj.__dict__.items()}
else:
# Newer versions of recordclass
return {k: cls.jsonfy(getattr(obj, k)) for k in obj.__fields__}
else:
# Last effort: toString
return repr(obj)
[docs] @classmethod
def dejsonfy(cls, data, clz: Optional[Union[Type, str]] = None):
"""
Turns a json-compatible data structure to an object of class {@code clz}.
If {@code clz} is not assigned, the data will be casted to dict or list if possible.
Otherwise the data will be casted to the object through (try each option in order, if applicable):
1. DEJSONFY function, which takes the data as argument and returns a object;
should have the name {@link IOUtils#DEJSONFY_FUNC_NAME};
2. JSONFY_ATTR field, which is a dict of attribute name-type pairs, that will be extracted from the object to a dict;
should have the name {@link IOUtils#JSONFY_ATTR_FIELD_NAME};
"""
if isinstance(clz, str):
clz = pydoc.locate(clz)
if data is None:
# None value
return None
elif clz is not None and typing_inspect.get_origin(clz) == list:
# List[XXX]
return [cls.dejsonfy(item, clz.__args__[0]) for item in data]
elif clz is not None and typing_inspect.get_origin(clz) == tuple:
# Tuple[XXX]
return tuple([cls.dejsonfy(item, clz.__args__[min(i, len(clz.__args__)-1)]) for i, item in enumerate(data)])
elif clz is not None and typing_inspect.get_origin(clz) == set:
# Set[XXX]
return set([cls.dejsonfy(item, clz.__args__[0]) for item in data])
elif clz is not None and hasattr(clz, cls.DEJSONFY_FUNC_NAME):
# with dejsonfy function
return clz.dejsonfy(data)
elif isinstance(data, list):
# array
return [cls.dejsonfy(item, clz) for item in data]
elif clz is not None and hasattr(clz, cls.JSONFY_ATTR_FIELD_NAME):
# with jsonfy_attr annotations
obj = clz()
for attr, attr_clz in getattr(clz, cls.JSONFY_ATTR_FIELD_NAME).items():
if attr in data:
setattr(obj, attr, cls.dejsonfy(data[attr], attr_clz))
return obj
elif clz is not None and is_clz_record_class(clz):
# RecordClass
field_values = dict()
for f, t in get_type_hints(clz).items():
if f in data:
field_values[f] = cls.dejsonfy(data.get(f), t)
return clz(**field_values)
elif clz is not None and inspect.isclass(clz) and issubclass(clz, Enum):
# Enum
return clz(data)
elif isinstance(data, dict):
# dict
return {k: cls.dejsonfy(v, clz) for k, v in data.items()}
else:
# primitive types / unresolvable things
if clz is not None:
try:
return clz(data)
except:
pass
return data