Kkit.child_process
This module provide a simple and intuitive interface to control instance in child process.
The Remote Object in child process can be created by RemoteObjectProxy.
In sequential mode(paralle_execution=False), The instance's attributes and instance's function can be accessed and called like normal object in parent process. The Exception will be raised in parent process if the instance in child process raise Exception. The instance in child process is totally like a normal object in parent process, but it is actually running in a child process.
1. The instance's attributes can be accessed like normal object.
2. The instance's function can be called like normal object.
3. The Exception will be raised in parent process if the instance in child process raise Exception.
In parallel mode(paralle_execution=True), The instance's attributes can be accessed like normal object in parent process. The instance's function can be called like normal object in parent process, but the return value is a multiprocessing.Pipe object, you need call recv() to get the return value. The Exception will be returned to parent process instead of raised in parent process.
1. The instance's attributes can be accessed like normal object.
2. The instance's function can be called like normal object, but the return value is a `multiprocessing.Pipe` object.
3. The Exception will be returned to parent process instead of raised in parent process.
The ParallelResultFetcher is used to fetch the return value of parallel execution.
Example
from Kkit.child_process import RemoteObjectProxy, ParallelResultFetcher
import time
class TestObj:
def __init__(self, time):
self.time = time
def sleep(self, add_time):
time.sleep(self.time+add_time)
return f"sleep {self.time+add_time} seconds"
def obj_creator(time):
return TestObj(time)
# Sequential execution
test_obj = RemoteObjectProxy(obj_creator, paralle_execution=False, time=5)
print(test_obj.time) # access attribute like normal object
print(test_obj.sleep(2)) # call method like normal object
# Parallel execution
num_processes = 10
test_objs_parallel = [RemoteObjectProxy(obj_creator, time=i) for i in range(num_processes)]
result_fetcher = ParallelResultFetcher(num_processes)
start_time = time.time()
for i in range(num_processes):
result_fetcher[i] = test_objs_parallel[i].sleep(2)
res = result_fetcher.wait_all_results()
print(res)
print(time.time()-start_time) # should be around 11 seconds(last object sleep the longest time, 9+2 seconds)
1""" 2This module provide a simple and intuitive interface to control instance in child process. 3 4The Remote Object in child process can be created by `RemoteObjectProxy`. 5 6**In sequential mode**(paralle_execution=False), The instance's attributes and instance's function can be accessed and called like normal object in parent process. The Exception will be raised in parent process if the instance in child process raise Exception. The instance in child process is totally like a normal object in parent process, but it is actually running in a child process. 7 8 1. The instance's attributes can be accessed like normal object. 9 2. The instance's function can be called like normal object. 10 3. The Exception will be raised in parent process if the instance in child process raise Exception. 11 12**In parallel mode**(paralle_execution=True), The instance's attributes can be accessed like normal object in parent process. The instance's function can be called like normal object in parent process, but the return value is a `multiprocessing.Pipe` object, you need call recv() to get the return value. The Exception will be returned to parent process instead of raised in parent process. 13 14 1. The instance's attributes can be accessed like normal object. 15 2. The instance's function can be called like normal object, but the return value is a `multiprocessing.Pipe` object. 16 3. The Exception will be returned to parent process instead of raised in parent process. 17 18The `ParallelResultFetcher` is used to fetch the return value of parallel execution. 19 20## Example 21 22```python 23from Kkit.child_process import RemoteObjectProxy, ParallelResultFetcher 24import time 25 26class TestObj: 27 def __init__(self, time): 28 self.time = time 29 30 def sleep(self, add_time): 31 time.sleep(self.time+add_time) 32 return f"sleep {self.time+add_time} seconds" 33 34def obj_creator(time): 35 return TestObj(time) 36 37# Sequential execution 38 39test_obj = RemoteObjectProxy(obj_creator, paralle_execution=False, time=5) 40print(test_obj.time) # access attribute like normal object 41print(test_obj.sleep(2)) # call method like normal object 42 43# Parallel execution 44 45num_processes = 10 46test_objs_parallel = [RemoteObjectProxy(obj_creator, time=i) for i in range(num_processes)] 47result_fetcher = ParallelResultFetcher(num_processes) 48 49start_time = time.time() 50for i in range(num_processes): 51 result_fetcher[i] = test_objs_parallel[i].sleep(2) 52res = result_fetcher.wait_all_results() 53print(res) 54print(time.time()-start_time) # should be around 11 seconds(last object sleep the longest time, 9+2 seconds) 55``` 56""" 57 58import copy 59import multiprocessing 60 61 62class EmptyResult: 63 """ 64 EmptyResult is a placeholder for empty result in `ParallelResultFetcher.results` list. 65 """ 66 def __init__(self): 67 pass 68 69 def __str__(self): 70 return "Empty Result" 71 72 def __repr__(self): 73 return "Empty Result" 74 75class ParallelResultFetcher: 76 """ 77 ParallelResultFetcher is used to fetch the return value of parallel execution 78 """ 79 def __init__(self, num_of_processes:int): 80 """ 81 This class is used to fetch the return value of parallel execution. 82 83 Parameters 84 ---------- 85 num_of_processes : int 86 number of processes to fetch result. 87 """ 88 self.num_of_processes = num_of_processes 89 """@private""" 90 self.results = [EmptyResult()] * num_of_processes 91 """@private""" 92 93 def __setitem__(self, index, value): 94 if 0 <= index < self.num_of_processes: 95 self.results[index] = value 96 else: 97 raise IndexError(f"Index {index} out of range (0~{self.num_of_processes-1})") 98 99 def __getitem__(self, index): 100 if 0 <= index < self.num_of_processes: 101 return self.results[index] 102 else: 103 raise IndexError(f"Index {index} out of range (0~{self.num_of_processes-1})") 104 105 def __len__(self): 106 return len(self.results) 107 108 def wait_all_results(self): 109 """ 110 Wait for all results and return the results. After calling this function, the `ParallelResultFetcher.results` will be reset to `EmptyResult`. 111 """ 112 for i in range(self.num_of_processes): 113 self.results[i] = self.results[i].recv() 114 115 res = copy.deepcopy(self.results) 116 self.results = [None] * self.num_of_processes 117 return res 118 119class RemoteObjectProxy: 120 """ 121 RemoteObjectProxy is a proxy class for remote object. 122 """ 123 class IsCallable: 124 def __init__(self): 125 pass 126 127 def __init__(self, remote_obj_creator, paralle_execution=True, *args, **kwargs): 128 """ 129 This class is a proxy class for remote object. 130 131 Parameters 132 ---------- 133 remote_obj_creator : function 134 A function to create the remote object, this function should return the object to be controlled in child process. 135 136 paralle_execution : bool, optional 137 Whether to run the function in parallel. Default is True. 138 139 *args 140 The arguments for `remote_obj_creator`. 141 142 **kwargs 143 The keyword arguments for `remote_obj_creator 144 """ 145 self.paralle_execution = paralle_execution 146 """@private""" 147 self.parent_conn, child_conn = multiprocessing.Pipe() 148 """@private""" 149 150 self.args = args 151 """@private""" 152 self.kwargs = kwargs 153 """@private""" 154 155 self.process = multiprocessing.Process( 156 target=self._run_remote_object, 157 args=(remote_obj_creator, child_conn), 158 daemon=True 159 ) 160 """@private""" 161 self.process.start() 162 self.parent_conn.recv() # wait for the remote object to be created 163 164 def _run_remote_object(self, remote_obj_creator, conn): 165 remote_obj = remote_obj_creator(*self.args, **self.kwargs) 166 conn.send("ready") 167 while True: 168 try: 169 cmd = conn.recv() 170 if cmd == "close": 171 break 172 173 func_name, not_test, args, kwargs = cmd 174 try: 175 attr = getattr(remote_obj, func_name) 176 if not callable(attr): 177 result = getattr(remote_obj, func_name) 178 conn.send(result) 179 else: 180 if not_test: 181 result = getattr(remote_obj, func_name)(*args, **kwargs) 182 conn.send(result) 183 else: 184 is_callable = self.IsCallable() 185 conn.send(is_callable) 186 except Exception as e: 187 conn.send(e) 188 except (EOFError, ConnectionResetError): 189 break 190 191 def __getattr__(self, name): 192 def dynamic_call(*args, **kwargs): 193 self.parent_conn.send((name, True, args, kwargs)) 194 195 result = self.parent_conn.recv() 196 if isinstance(result, Exception): 197 raise result 198 return result 199 200 def dynamic_call_parallel(*args, **kwargs): 201 self.parent_conn.send((name, True, args, kwargs)) 202 return self.parent_conn 203 204 self.parent_conn.send((name, False, (), {})) 205 res = self.parent_conn.recv() 206 if isinstance(res, Exception): 207 if self.paralle_execution: 208 return res 209 else: 210 raise res 211 if isinstance(res, self.IsCallable): 212 if self.paralle_execution: 213 return dynamic_call_parallel 214 else: 215 return dynamic_call 216 else: 217 return res 218 219 def close(self): 220 if self.process.is_alive(): 221 self.parent_conn.send("close") 222 self.process.join() 223 self.parent_conn.close()
63class EmptyResult: 64 """ 65 EmptyResult is a placeholder for empty result in `ParallelResultFetcher.results` list. 66 """ 67 def __init__(self): 68 pass 69 70 def __str__(self): 71 return "Empty Result" 72 73 def __repr__(self): 74 return "Empty Result"
EmptyResult is a placeholder for empty result in ParallelResultFetcher.results list.
76class ParallelResultFetcher: 77 """ 78 ParallelResultFetcher is used to fetch the return value of parallel execution 79 """ 80 def __init__(self, num_of_processes:int): 81 """ 82 This class is used to fetch the return value of parallel execution. 83 84 Parameters 85 ---------- 86 num_of_processes : int 87 number of processes to fetch result. 88 """ 89 self.num_of_processes = num_of_processes 90 """@private""" 91 self.results = [EmptyResult()] * num_of_processes 92 """@private""" 93 94 def __setitem__(self, index, value): 95 if 0 <= index < self.num_of_processes: 96 self.results[index] = value 97 else: 98 raise IndexError(f"Index {index} out of range (0~{self.num_of_processes-1})") 99 100 def __getitem__(self, index): 101 if 0 <= index < self.num_of_processes: 102 return self.results[index] 103 else: 104 raise IndexError(f"Index {index} out of range (0~{self.num_of_processes-1})") 105 106 def __len__(self): 107 return len(self.results) 108 109 def wait_all_results(self): 110 """ 111 Wait for all results and return the results. After calling this function, the `ParallelResultFetcher.results` will be reset to `EmptyResult`. 112 """ 113 for i in range(self.num_of_processes): 114 self.results[i] = self.results[i].recv() 115 116 res = copy.deepcopy(self.results) 117 self.results = [None] * self.num_of_processes 118 return res
ParallelResultFetcher is used to fetch the return value of parallel execution
80 def __init__(self, num_of_processes:int): 81 """ 82 This class is used to fetch the return value of parallel execution. 83 84 Parameters 85 ---------- 86 num_of_processes : int 87 number of processes to fetch result. 88 """ 89 self.num_of_processes = num_of_processes 90 """@private""" 91 self.results = [EmptyResult()] * num_of_processes 92 """@private"""
This class is used to fetch the return value of parallel execution.
Parameters
num_of_processes : int number of processes to fetch result.
109 def wait_all_results(self): 110 """ 111 Wait for all results and return the results. After calling this function, the `ParallelResultFetcher.results` will be reset to `EmptyResult`. 112 """ 113 for i in range(self.num_of_processes): 114 self.results[i] = self.results[i].recv() 115 116 res = copy.deepcopy(self.results) 117 self.results = [None] * self.num_of_processes 118 return res
Wait for all results and return the results. After calling this function, the ParallelResultFetcher.results will be reset to EmptyResult.
120class RemoteObjectProxy: 121 """ 122 RemoteObjectProxy is a proxy class for remote object. 123 """ 124 class IsCallable: 125 def __init__(self): 126 pass 127 128 def __init__(self, remote_obj_creator, paralle_execution=True, *args, **kwargs): 129 """ 130 This class is a proxy class for remote object. 131 132 Parameters 133 ---------- 134 remote_obj_creator : function 135 A function to create the remote object, this function should return the object to be controlled in child process. 136 137 paralle_execution : bool, optional 138 Whether to run the function in parallel. Default is True. 139 140 *args 141 The arguments for `remote_obj_creator`. 142 143 **kwargs 144 The keyword arguments for `remote_obj_creator 145 """ 146 self.paralle_execution = paralle_execution 147 """@private""" 148 self.parent_conn, child_conn = multiprocessing.Pipe() 149 """@private""" 150 151 self.args = args 152 """@private""" 153 self.kwargs = kwargs 154 """@private""" 155 156 self.process = multiprocessing.Process( 157 target=self._run_remote_object, 158 args=(remote_obj_creator, child_conn), 159 daemon=True 160 ) 161 """@private""" 162 self.process.start() 163 self.parent_conn.recv() # wait for the remote object to be created 164 165 def _run_remote_object(self, remote_obj_creator, conn): 166 remote_obj = remote_obj_creator(*self.args, **self.kwargs) 167 conn.send("ready") 168 while True: 169 try: 170 cmd = conn.recv() 171 if cmd == "close": 172 break 173 174 func_name, not_test, args, kwargs = cmd 175 try: 176 attr = getattr(remote_obj, func_name) 177 if not callable(attr): 178 result = getattr(remote_obj, func_name) 179 conn.send(result) 180 else: 181 if not_test: 182 result = getattr(remote_obj, func_name)(*args, **kwargs) 183 conn.send(result) 184 else: 185 is_callable = self.IsCallable() 186 conn.send(is_callable) 187 except Exception as e: 188 conn.send(e) 189 except (EOFError, ConnectionResetError): 190 break 191 192 def __getattr__(self, name): 193 def dynamic_call(*args, **kwargs): 194 self.parent_conn.send((name, True, args, kwargs)) 195 196 result = self.parent_conn.recv() 197 if isinstance(result, Exception): 198 raise result 199 return result 200 201 def dynamic_call_parallel(*args, **kwargs): 202 self.parent_conn.send((name, True, args, kwargs)) 203 return self.parent_conn 204 205 self.parent_conn.send((name, False, (), {})) 206 res = self.parent_conn.recv() 207 if isinstance(res, Exception): 208 if self.paralle_execution: 209 return res 210 else: 211 raise res 212 if isinstance(res, self.IsCallable): 213 if self.paralle_execution: 214 return dynamic_call_parallel 215 else: 216 return dynamic_call 217 else: 218 return res 219 220 def close(self): 221 if self.process.is_alive(): 222 self.parent_conn.send("close") 223 self.process.join() 224 self.parent_conn.close()
RemoteObjectProxy is a proxy class for remote object.
128 def __init__(self, remote_obj_creator, paralle_execution=True, *args, **kwargs): 129 """ 130 This class is a proxy class for remote object. 131 132 Parameters 133 ---------- 134 remote_obj_creator : function 135 A function to create the remote object, this function should return the object to be controlled in child process. 136 137 paralle_execution : bool, optional 138 Whether to run the function in parallel. Default is True. 139 140 *args 141 The arguments for `remote_obj_creator`. 142 143 **kwargs 144 The keyword arguments for `remote_obj_creator 145 """ 146 self.paralle_execution = paralle_execution 147 """@private""" 148 self.parent_conn, child_conn = multiprocessing.Pipe() 149 """@private""" 150 151 self.args = args 152 """@private""" 153 self.kwargs = kwargs 154 """@private""" 155 156 self.process = multiprocessing.Process( 157 target=self._run_remote_object, 158 args=(remote_obj_creator, child_conn), 159 daemon=True 160 ) 161 """@private""" 162 self.process.start() 163 self.parent_conn.recv() # wait for the remote object to be created
This class is a proxy class for remote object.
Parameters
remote_obj_creator : function A function to create the remote object, this function should return the object to be controlled in child process.
paralle_execution : bool, optional Whether to run the function in parallel. Default is True.
*args
The arguments for remote_obj_creator.
**kwargs The keyword arguments for `remote_obj_creator