Kkit.utils

  1import time
  2import pickle
  3import os
  4from logging import Logger
  5from functools import reduce
  6
  7def print_list(Alist, num_of_columns=None, separator_in_line=" , ", separator_between_line="\n", prefix="", show_length=False, align=True):
  8    """
  9    Print a list in a pretty table format.
 10    
 11    You can specify the number of columns, the separator between elements in a line, the separator between lines, the prefix of the table, whether to show the length of the list, and whether to align the elements in the list. The str() of each element should be in one line for pretty.
 12
 13    Parameters
 14    ----------
 15    Alist : list
 16        the list to be printed
 17
 18    num_of_columns : int or None, default None
 19        the number of columns to be printed in one line. If None, the number of columns will be the length of the list
 20
 21    separator_in_line : str, default " , "
 22        the separator between elements in a line
 23
 24    separator_between_line : str, default "\\n"
 25        the separator between lines
 26
 27    prefix : str, default ""
 28        the prefix of the table
 29
 30    show_length : bool, default False
 31        whether to show the length of the list
 32
 33    align : bool, default True
 34        whether to align the elements in the list
 35
 36    Examples
 37    --------
 38    >>> a = [1,12,123,1234,12345,"123456",1234567,12345678,"123456789"]
 39    >>> print_list(a, num_of_columns=5, prefix="The example list is:")
 40    The example list is:        1 ,        12 ,       123 ,      1234 ,     12345
 41                           123456 ,   1234567 ,  12345678 , 123456789
 42    """
 43    align_length = 0
 44    if align:
 45        for i in Alist:
 46            if len(str(i))>align_length:
 47                align_length = len(str(i))
 48    length_perfix = len(prefix)+align_length
 49    length = len(Alist)
 50    if num_of_columns == None:
 51        num_of_columns = length
 52    print(prefix, end="")
 53    for i in range(length):
 54        line_index = i%num_of_columns
 55        if i == 0 or line_index != 0:
 56            alignL = align_length
 57        else:
 58            alignL = length_perfix
 59        if line_index == (num_of_columns-1) or i == (length-1):
 60            separator = separator_between_line
 61        else:
 62            separator = separator_in_line
 63        print(f"{Alist[i]:>{alignL}}", end=separator)
 64    if show_length:
 65        print("\nlength: %d"%length)
 66
 67def conclude_list(Alist):
 68    result = {}
 69    for i in Alist:
 70        if i in result:
 71            result[i] += 1
 72        else:
 73            result[i] = 1
 74    return result
 75
 76def conclude_list2(a_list):
 77    no_duplicate = []
 78    for i in a_list:
 79        if i not in no_duplicate:
 80            no_duplicate.append(i)
 81    count_list = []
 82    for i in no_duplicate:
 83        count_list.append(a_list.count(i))
 84    return no_duplicate, count_list
 85
 86def time_string():
 87    return time.strftime("%Y-%m-%d-%H%M%S", time.localtime())
 88
 89def load(path_name, encoding="b", lines=False, removeCL=True):
 90    """
 91    load the object from the file, binary or text.
 92
 93    Parameters
 94    ----------
 95    path_name : str
 96        The path of the file
 97
 98    encoding : str, default "b"
 99        The encoding of the file, "b" for binary, text encoding ("utf-8", "gbk", etc.) for text
100
101    lines : bool, default False
102        Whether to read the file as lines. If True, return a list of lines. If False, return the whole content.
103
104    removeCL : bool, default True
105        Whether to remove the line break character at the end of each line. Only valid when lines is True.
106    """
107    if encoding=="b":
108        with open(path_name, "rb") as f:
109            return pickle.load(f)
110    else:
111        with open(path_name, "r", encoding=encoding) as f:
112            if lines:
113                content = f.readlines()
114                if removeCL:
115                    content = [i.rstrip("\n") for i in content]
116            else:
117                content = f.read()
118            return content
119    
120def store(path_name, Aobject=None, encoding="b"):
121    """
122    store the object to the file, binary or text.
123
124    Parameters
125    ----------
126    path_name : str
127        The path of the file
128
129    Aobject : Any, default None
130        The object to be stored, if None, do nothing. Should be serializable if encoding is "b".
131
132    encoding : str, default "b"
133        The encoding of the file, "b" for binary, text encoding ("utf-8", "gbk", etc.) for text
134    """
135    if path_name==None:
136        return
137    path = os.path.dirname(path_name)
138    if path!="" and path!="./" and os.path.exists(path)==False:
139        os.makedirs(path)
140    if encoding=="b":
141        with open(path_name, "wb") as f:
142            pickle.dump(Aobject, f)
143    else:
144        with open(path_name, "w", encoding=encoding) as f:
145            f.write(Aobject)
146
147def sort_dic_by_value(dic,my_reverse=False):
148    return {k: v for k, v in sorted (dic.items(), key=lambda item: item[1], reverse=my_reverse)}
149
150def sort_multi_list(*multi_list,by = 1,my_reverse=False):
151    li = zip(*multi_list)
152    length = len(multi_list)
153    if length == 1:
154        raise Exception("please input multiple lists, not just 1")
155    else:
156        if by in range(0,length):
157            x = sorted(li, key=lambda i:i[by],reverse=my_reverse)
158            return x
159        else:
160            raise Exception("the value of 'by' can only be 0 or 1")
161           
162def find_key_by_value(value, dic):
163    for k,v in dic.items():
164        if v == value:
165            return k
166
167def merge_list_to_dic(list1,list2):
168    li = zip(list1,list2)
169    return {k:v for (k,v) in li}
170
171def list_in_list(list1, list2, X=None):
172    boolean_list = []
173    for i in list1:
174        boolean_list.append(i in list2)
175    if X==None:
176        return boolean_list
177    elif X=="all":
178        return False not in boolean_list
179    elif X=="any":
180        return True in boolean_list
181    
182def klistdir(path, with_prefix=True):
183    if with_prefix:
184        return [os.path.join(path, i) for i in os.listdir(path)]
185    else:
186        return os.listdir(path)
187    
188class PathJoin:
189    def __init___(self, base_path):
190        self.base_path = base_path
191    def path(self, path):
192        return os.path.join(self.base_path ,path)
193
194def kstrip(string, key):
195    s = string
196    if len(s)<len(key):
197        return s
198    if s[0:len(key)] == key:
199        s = s[len(key):]
200    if s[-len(key):] == key:
201        s = s[0:-len(key)]
202    return s
203
204def retry(retry_times=3, raise_exception=False, record_retry=False):
205    """
206    Retry decorator: retry the function for retry_times times if exception occurs
207
208    Parameters
209    ----------
210    retry_times : int
211        the number of times to retry the function
212
213    raise_exception : bool or logging.Logger
214        whether to raise the exception after retry_times retries, or the logger to record the exception. If False, the exception will not be raised, If True, the exception will be raised. If a logger is provided, the exception will be recorded in the logger.
215        
216    record_retry : bool or logging.Logger
217        whether to record the retry information, or the logger to record the retry information. If False, the retry information will not be recorded, If True, the retry information will be printed. If a logger is provided, the retry information will be recorded in the logger.
218
219    Examples
220    --------
221    ```python
222    @retry(retry_times=3)
223    def test():
224        print("test")
225        raise Exception("test")
226    ```
227    """
228    def decorator(func):
229        def wrapper(*args, **kwargs):
230            for i in range(retry_times):
231                try:
232                    return func(*args, **kwargs)
233                except Exception as e:
234                    if isinstance(record_retry, Logger):
235                        record_retry.exception(f"function {func.__name__} failed, retrying {i+1}/{retry_times}")
236                    elif record_retry:
237                        print(f"function {func.__name__} failed, retrying {i+1}/{retry_times}")
238                    if i == retry_times-1:
239                        if isinstance(raise_exception, Logger):
240                            raise_exception.exception(f"function {func.__name__} failed after {retry_times} retries")
241                        elif raise_exception:
242                            raise e
243        return wrapper
244    return decorator
245
246def Pipeline(*funcs):
247    """
248    Combine multiple functions into a pipeline function.
249    Execute the functions in order from left to right.
250    """
251    def pipeline_two(f, g):
252        return lambda x: g(f(x))
253    return reduce(pipeline_two, funcs, lambda x: x)
254
255if __name__=="__main__":
256    kstrip("asd-asd","asd")
def conclude_list(Alist):
68def conclude_list(Alist):
69    result = {}
70    for i in Alist:
71        if i in result:
72            result[i] += 1
73        else:
74            result[i] = 1
75    return result
def conclude_list2(a_list):
77def conclude_list2(a_list):
78    no_duplicate = []
79    for i in a_list:
80        if i not in no_duplicate:
81            no_duplicate.append(i)
82    count_list = []
83    for i in no_duplicate:
84        count_list.append(a_list.count(i))
85    return no_duplicate, count_list
def time_string():
87def time_string():
88    return time.strftime("%Y-%m-%d-%H%M%S", time.localtime())
def load(path_name, encoding='b', lines=False, removeCL=True):
 90def load(path_name, encoding="b", lines=False, removeCL=True):
 91    """
 92    load the object from the file, binary or text.
 93
 94    Parameters
 95    ----------
 96    path_name : str
 97        The path of the file
 98
 99    encoding : str, default "b"
100        The encoding of the file, "b" for binary, text encoding ("utf-8", "gbk", etc.) for text
101
102    lines : bool, default False
103        Whether to read the file as lines. If True, return a list of lines. If False, return the whole content.
104
105    removeCL : bool, default True
106        Whether to remove the line break character at the end of each line. Only valid when lines is True.
107    """
108    if encoding=="b":
109        with open(path_name, "rb") as f:
110            return pickle.load(f)
111    else:
112        with open(path_name, "r", encoding=encoding) as f:
113            if lines:
114                content = f.readlines()
115                if removeCL:
116                    content = [i.rstrip("\n") for i in content]
117            else:
118                content = f.read()
119            return content

load the object from the file, binary or text.

Parameters

path_name : str The path of the file

encoding : str, default "b" The encoding of the file, "b" for binary, text encoding ("utf-8", "gbk", etc.) for text

lines : bool, default False Whether to read the file as lines. If True, return a list of lines. If False, return the whole content.

removeCL : bool, default True Whether to remove the line break character at the end of each line. Only valid when lines is True.

def store(path_name, Aobject=None, encoding='b'):
121def store(path_name, Aobject=None, encoding="b"):
122    """
123    store the object to the file, binary or text.
124
125    Parameters
126    ----------
127    path_name : str
128        The path of the file
129
130    Aobject : Any, default None
131        The object to be stored, if None, do nothing. Should be serializable if encoding is "b".
132
133    encoding : str, default "b"
134        The encoding of the file, "b" for binary, text encoding ("utf-8", "gbk", etc.) for text
135    """
136    if path_name==None:
137        return
138    path = os.path.dirname(path_name)
139    if path!="" and path!="./" and os.path.exists(path)==False:
140        os.makedirs(path)
141    if encoding=="b":
142        with open(path_name, "wb") as f:
143            pickle.dump(Aobject, f)
144    else:
145        with open(path_name, "w", encoding=encoding) as f:
146            f.write(Aobject)

store the object to the file, binary or text.

Parameters

path_name : str The path of the file

Aobject : Any, default None The object to be stored, if None, do nothing. Should be serializable if encoding is "b".

encoding : str, default "b" The encoding of the file, "b" for binary, text encoding ("utf-8", "gbk", etc.) for text

def sort_dic_by_value(dic, my_reverse=False):
148def sort_dic_by_value(dic,my_reverse=False):
149    return {k: v for k, v in sorted (dic.items(), key=lambda item: item[1], reverse=my_reverse)}
def sort_multi_list(*multi_list, by=1, my_reverse=False):
151def sort_multi_list(*multi_list,by = 1,my_reverse=False):
152    li = zip(*multi_list)
153    length = len(multi_list)
154    if length == 1:
155        raise Exception("please input multiple lists, not just 1")
156    else:
157        if by in range(0,length):
158            x = sorted(li, key=lambda i:i[by],reverse=my_reverse)
159            return x
160        else:
161            raise Exception("the value of 'by' can only be 0 or 1")
def find_key_by_value(value, dic):
163def find_key_by_value(value, dic):
164    for k,v in dic.items():
165        if v == value:
166            return k
def merge_list_to_dic(list1, list2):
168def merge_list_to_dic(list1,list2):
169    li = zip(list1,list2)
170    return {k:v for (k,v) in li}
def list_in_list(list1, list2, X=None):
172def list_in_list(list1, list2, X=None):
173    boolean_list = []
174    for i in list1:
175        boolean_list.append(i in list2)
176    if X==None:
177        return boolean_list
178    elif X=="all":
179        return False not in boolean_list
180    elif X=="any":
181        return True in boolean_list
def klistdir(path, with_prefix=True):
183def klistdir(path, with_prefix=True):
184    if with_prefix:
185        return [os.path.join(path, i) for i in os.listdir(path)]
186    else:
187        return os.listdir(path)
class PathJoin:
189class PathJoin:
190    def __init___(self, base_path):
191        self.base_path = base_path
192    def path(self, path):
193        return os.path.join(self.base_path ,path)
def path(self, path):
192    def path(self, path):
193        return os.path.join(self.base_path ,path)
def kstrip(string, key):
195def kstrip(string, key):
196    s = string
197    if len(s)<len(key):
198        return s
199    if s[0:len(key)] == key:
200        s = s[len(key):]
201    if s[-len(key):] == key:
202        s = s[0:-len(key)]
203    return s
def retry(retry_times=3, raise_exception=False, record_retry=False):
205def retry(retry_times=3, raise_exception=False, record_retry=False):
206    """
207    Retry decorator: retry the function for retry_times times if exception occurs
208
209    Parameters
210    ----------
211    retry_times : int
212        the number of times to retry the function
213
214    raise_exception : bool or logging.Logger
215        whether to raise the exception after retry_times retries, or the logger to record the exception. If False, the exception will not be raised, If True, the exception will be raised. If a logger is provided, the exception will be recorded in the logger.
216        
217    record_retry : bool or logging.Logger
218        whether to record the retry information, or the logger to record the retry information. If False, the retry information will not be recorded, If True, the retry information will be printed. If a logger is provided, the retry information will be recorded in the logger.
219
220    Examples
221    --------
222    ```python
223    @retry(retry_times=3)
224    def test():
225        print("test")
226        raise Exception("test")
227    ```
228    """
229    def decorator(func):
230        def wrapper(*args, **kwargs):
231            for i in range(retry_times):
232                try:
233                    return func(*args, **kwargs)
234                except Exception as e:
235                    if isinstance(record_retry, Logger):
236                        record_retry.exception(f"function {func.__name__} failed, retrying {i+1}/{retry_times}")
237                    elif record_retry:
238                        print(f"function {func.__name__} failed, retrying {i+1}/{retry_times}")
239                    if i == retry_times-1:
240                        if isinstance(raise_exception, Logger):
241                            raise_exception.exception(f"function {func.__name__} failed after {retry_times} retries")
242                        elif raise_exception:
243                            raise e
244        return wrapper
245    return decorator

Retry decorator: retry the function for retry_times times if exception occurs

Parameters

retry_times : int the number of times to retry the function

raise_exception : bool or logging.Logger whether to raise the exception after retry_times retries, or the logger to record the exception. If False, the exception will not be raised, If True, the exception will be raised. If a logger is provided, the exception will be recorded in the logger.

record_retry : bool or logging.Logger whether to record the retry information, or the logger to record the retry information. If False, the retry information will not be recorded, If True, the retry information will be printed. If a logger is provided, the retry information will be recorded in the logger.

Examples

@retry(retry_times=3)
def test():
    print("test")
    raise Exception("test")
def Pipeline(*funcs):
247def Pipeline(*funcs):
248    """
249    Combine multiple functions into a pipeline function.
250    Execute the functions in order from left to right.
251    """
252    def pipeline_two(f, g):
253        return lambda x: g(f(x))
254    return reduce(pipeline_two, funcs, lambda x: x)

Combine multiple functions into a pipeline function. Execute the functions in order from left to right.