Kkit.child_process

This module provide a simple and intuitive interface to control instance in child process.

The Remote Object in child process can be created by RemoteObjectProxy.

In sequential mode(paralle_execution=False), The instance's attributes and instance's function can be accessed and called like normal object in parent process. The Exception will be raised in parent process if the instance in child process raise Exception. The instance in child process is totally like a normal object in parent process, but it is actually running in a child process.

1. The instance's attributes can be accessed like normal object.
2. The instance's function can be called like normal object.
3. The Exception will be raised in parent process if the instance in child process raise Exception.

In parallel mode(paralle_execution=True), The instance's attributes can be accessed like normal object in parent process. The instance's function can be called like normal object in parent process, but the return value is a multiprocessing.Pipe object, you need call recv() to get the return value. The Exception will be returned to parent process instead of raised in parent process.

1. The instance's attributes can be accessed like normal object.
2. The instance's function can be called like normal object, but the return value is a `multiprocessing.Pipe` object.
3. The Exception will be returned to parent process instead of raised in parent process.

The ParallelResultFetcher is used to fetch the return value of parallel execution.

Example

from Kkit.child_process import RemoteObjectProxy, ParallelResultFetcher
import time

class TestObj:
    def __init__(self, time):
        self.time = time

    def sleep(self, add_time):
        time.sleep(self.time+add_time)
        return f"sleep {self.time+add_time} seconds"

def obj_creator(time):
    return TestObj(time)

# Sequential execution

test_obj = RemoteObjectProxy(obj_creator, paralle_execution=False, time=5)
print(test_obj.time)   # access attribute like normal object
print(test_obj.sleep(2))  # call method like normal object

# Parallel execution

num_processes = 10
test_objs_parallel = [RemoteObjectProxy(obj_creator, time=i) for i in range(num_processes)]
result_fetcher = ParallelResultFetcher(num_processes)

start_time = time.time()
for i in range(num_processes):
    result_fetcher[i] = test_objs_parallel[i].sleep(2)
res = result_fetcher.wait_all_results()
print(res)
print(time.time()-start_time)  # should be around 11 seconds(last object sleep the longest time, 9+2 seconds)
  1"""
  2This module provide a simple and intuitive interface to control instance in child process.
  3
  4The Remote Object in child process can be created by `RemoteObjectProxy`.
  5
  6**In sequential mode**(paralle_execution=False), The instance's attributes and instance's function can be accessed and called like normal object in parent process. The Exception will be raised in parent process if the instance in child process raise Exception. The instance in child process is totally like a normal object in parent process, but it is actually running in a child process.
  7
  8    1. The instance's attributes can be accessed like normal object.
  9    2. The instance's function can be called like normal object.
 10    3. The Exception will be raised in parent process if the instance in child process raise Exception.
 11
 12**In parallel mode**(paralle_execution=True), The instance's attributes can be accessed like normal object in parent process. The instance's function can be called like normal object in parent process, but the return value is a `multiprocessing.Pipe` object, you need call recv() to get the return value. The Exception will be returned to parent process instead of raised in parent process.
 13
 14    1. The instance's attributes can be accessed like normal object.
 15    2. The instance's function can be called like normal object, but the return value is a `multiprocessing.Pipe` object.
 16    3. The Exception will be returned to parent process instead of raised in parent process.
 17
 18The `ParallelResultFetcher` is used to fetch the return value of parallel execution.
 19
 20## Example
 21
 22```python
 23from Kkit.child_process import RemoteObjectProxy, ParallelResultFetcher
 24import time
 25
 26class TestObj:
 27    def __init__(self, time):
 28        self.time = time
 29
 30    def sleep(self, add_time):
 31        time.sleep(self.time+add_time)
 32        return f"sleep {self.time+add_time} seconds"
 33
 34def obj_creator(time):
 35    return TestObj(time)
 36
 37# Sequential execution
 38
 39test_obj = RemoteObjectProxy(obj_creator, paralle_execution=False, time=5)
 40print(test_obj.time)   # access attribute like normal object
 41print(test_obj.sleep(2))  # call method like normal object
 42
 43# Parallel execution
 44
 45num_processes = 10
 46test_objs_parallel = [RemoteObjectProxy(obj_creator, time=i) for i in range(num_processes)]
 47result_fetcher = ParallelResultFetcher(num_processes)
 48
 49start_time = time.time()
 50for i in range(num_processes):
 51    result_fetcher[i] = test_objs_parallel[i].sleep(2)
 52res = result_fetcher.wait_all_results()
 53print(res)
 54print(time.time()-start_time)  # should be around 11 seconds(last object sleep the longest time, 9+2 seconds)
 55```
 56"""
 57
 58import copy
 59import multiprocessing
 60
 61
 62class EmptyResult:
 63    """
 64    EmptyResult is a placeholder for empty result in `ParallelResultFetcher.results` list.
 65    """
 66    def __init__(self):
 67        pass
 68
 69    def __str__(self):
 70        return "Empty Result"
 71    
 72    def __repr__(self):
 73        return "Empty Result"
 74
 75class ParallelResultFetcher:
 76    """
 77    ParallelResultFetcher is used to fetch the return value of parallel execution
 78    """
 79    def __init__(self, num_of_processes:int):
 80        """
 81        This class is used to fetch the return value of parallel execution.
 82
 83        Parameters
 84        ----------
 85        num_of_processes : int
 86            number of processes to fetch result.
 87        """
 88        self.num_of_processes = num_of_processes
 89        """@private"""
 90        self.results = [EmptyResult()] * num_of_processes
 91        """@private"""
 92
 93    def __setitem__(self, index, value):
 94        if 0 <= index < self.num_of_processes:
 95            self.results[index] = value
 96        else:
 97            raise IndexError(f"Index {index} out of range (0~{self.num_of_processes-1})")
 98
 99    def __getitem__(self, index):
100        if 0 <= index < self.num_of_processes:
101            return self.results[index]
102        else:
103            raise IndexError(f"Index {index} out of range (0~{self.num_of_processes-1})")
104
105    def __len__(self):
106        return len(self.results)
107
108    def wait_all_results(self):
109        """
110        Wait for all results and return the results. After calling this function, the `ParallelResultFetcher.results` will be reset to `EmptyResult`.
111        """
112        for i in range(self.num_of_processes):
113            self.results[i] = self.results[i].recv()
114
115        res = copy.deepcopy(self.results)
116        self.results = [None] * self.num_of_processes
117        return res
118
119class RemoteObjectProxy:
120    """
121    RemoteObjectProxy is a proxy class for remote object.
122    """
123    class IsCallable:
124        def __init__(self):
125            pass
126
127    def __init__(self, remote_obj_creator, paralle_execution=True, *args, **kwargs):
128        """
129        This class is a proxy class for remote object.
130
131        Parameters
132        ----------
133        remote_obj_creator : function
134            A function to create the remote object, this function should return the object to be controlled in child process.
135
136        paralle_execution : bool, optional
137            Whether to run the function in parallel. Default is True.
138
139        *args
140            The arguments for `remote_obj_creator`.
141            
142        **kwargs
143            The keyword arguments for `remote_obj_creator
144        """
145        self.paralle_execution = paralle_execution
146        """@private"""
147        self.parent_conn, child_conn = multiprocessing.Pipe()
148        """@private"""
149
150        self.args = args
151        """@private"""
152        self.kwargs = kwargs
153        """@private"""
154
155        self.process = multiprocessing.Process(
156            target=self._run_remote_object,
157            args=(remote_obj_creator, child_conn),
158            daemon=True
159        )
160        """@private"""
161        self.process.start()
162        self.parent_conn.recv()  # wait for the remote object to be created
163
164    def _run_remote_object(self, remote_obj_creator, conn):
165        remote_obj = remote_obj_creator(*self.args, **self.kwargs)
166        conn.send("ready")
167        while True:
168            try:
169                cmd = conn.recv()
170                if cmd == "close":
171                    break
172
173                func_name, not_test, args, kwargs = cmd
174                try:
175                    attr = getattr(remote_obj, func_name)
176                    if not callable(attr):
177                        result = getattr(remote_obj, func_name)
178                        conn.send(result)
179                    else:
180                        if not_test:
181                            result = getattr(remote_obj, func_name)(*args, **kwargs)
182                            conn.send(result)
183                        else:
184                            is_callable = self.IsCallable()
185                            conn.send(is_callable)
186                except Exception as e:
187                    conn.send(e)
188            except (EOFError, ConnectionResetError):
189                break
190
191    def __getattr__(self, name):
192        def dynamic_call(*args, **kwargs):
193            self.parent_conn.send((name, True, args, kwargs))
194            
195            result = self.parent_conn.recv()
196            if isinstance(result, Exception):
197                raise result
198            return result
199        
200        def dynamic_call_parallel(*args, **kwargs):
201            self.parent_conn.send((name, True, args, kwargs))
202            return self.parent_conn
203
204        self.parent_conn.send((name, False, (), {}))
205        res = self.parent_conn.recv()
206        if isinstance(res, Exception):
207            if self.paralle_execution:
208                return res
209            else:
210                raise res
211        if isinstance(res, self.IsCallable):
212            if self.paralle_execution:
213                return dynamic_call_parallel
214            else:
215                return dynamic_call
216        else:
217            return res
218
219    def close(self):
220        if self.process.is_alive():
221            self.parent_conn.send("close")
222            self.process.join()
223        self.parent_conn.close()
class EmptyResult:
63class EmptyResult:
64    """
65    EmptyResult is a placeholder for empty result in `ParallelResultFetcher.results` list.
66    """
67    def __init__(self):
68        pass
69
70    def __str__(self):
71        return "Empty Result"
72    
73    def __repr__(self):
74        return "Empty Result"

EmptyResult is a placeholder for empty result in ParallelResultFetcher.results list.

class ParallelResultFetcher:
 76class ParallelResultFetcher:
 77    """
 78    ParallelResultFetcher is used to fetch the return value of parallel execution
 79    """
 80    def __init__(self, num_of_processes:int):
 81        """
 82        This class is used to fetch the return value of parallel execution.
 83
 84        Parameters
 85        ----------
 86        num_of_processes : int
 87            number of processes to fetch result.
 88        """
 89        self.num_of_processes = num_of_processes
 90        """@private"""
 91        self.results = [EmptyResult()] * num_of_processes
 92        """@private"""
 93
 94    def __setitem__(self, index, value):
 95        if 0 <= index < self.num_of_processes:
 96            self.results[index] = value
 97        else:
 98            raise IndexError(f"Index {index} out of range (0~{self.num_of_processes-1})")
 99
100    def __getitem__(self, index):
101        if 0 <= index < self.num_of_processes:
102            return self.results[index]
103        else:
104            raise IndexError(f"Index {index} out of range (0~{self.num_of_processes-1})")
105
106    def __len__(self):
107        return len(self.results)
108
109    def wait_all_results(self):
110        """
111        Wait for all results and return the results. After calling this function, the `ParallelResultFetcher.results` will be reset to `EmptyResult`.
112        """
113        for i in range(self.num_of_processes):
114            self.results[i] = self.results[i].recv()
115
116        res = copy.deepcopy(self.results)
117        self.results = [None] * self.num_of_processes
118        return res

ParallelResultFetcher is used to fetch the return value of parallel execution

ParallelResultFetcher(num_of_processes: int)
80    def __init__(self, num_of_processes:int):
81        """
82        This class is used to fetch the return value of parallel execution.
83
84        Parameters
85        ----------
86        num_of_processes : int
87            number of processes to fetch result.
88        """
89        self.num_of_processes = num_of_processes
90        """@private"""
91        self.results = [EmptyResult()] * num_of_processes
92        """@private"""

This class is used to fetch the return value of parallel execution.

Parameters

num_of_processes : int number of processes to fetch result.

def wait_all_results(self):
109    def wait_all_results(self):
110        """
111        Wait for all results and return the results. After calling this function, the `ParallelResultFetcher.results` will be reset to `EmptyResult`.
112        """
113        for i in range(self.num_of_processes):
114            self.results[i] = self.results[i].recv()
115
116        res = copy.deepcopy(self.results)
117        self.results = [None] * self.num_of_processes
118        return res

Wait for all results and return the results. After calling this function, the ParallelResultFetcher.results will be reset to EmptyResult.

class RemoteObjectProxy:
120class RemoteObjectProxy:
121    """
122    RemoteObjectProxy is a proxy class for remote object.
123    """
124    class IsCallable:
125        def __init__(self):
126            pass
127
128    def __init__(self, remote_obj_creator, paralle_execution=True, *args, **kwargs):
129        """
130        This class is a proxy class for remote object.
131
132        Parameters
133        ----------
134        remote_obj_creator : function
135            A function to create the remote object, this function should return the object to be controlled in child process.
136
137        paralle_execution : bool, optional
138            Whether to run the function in parallel. Default is True.
139
140        *args
141            The arguments for `remote_obj_creator`.
142            
143        **kwargs
144            The keyword arguments for `remote_obj_creator
145        """
146        self.paralle_execution = paralle_execution
147        """@private"""
148        self.parent_conn, child_conn = multiprocessing.Pipe()
149        """@private"""
150
151        self.args = args
152        """@private"""
153        self.kwargs = kwargs
154        """@private"""
155
156        self.process = multiprocessing.Process(
157            target=self._run_remote_object,
158            args=(remote_obj_creator, child_conn),
159            daemon=True
160        )
161        """@private"""
162        self.process.start()
163        self.parent_conn.recv()  # wait for the remote object to be created
164
165    def _run_remote_object(self, remote_obj_creator, conn):
166        remote_obj = remote_obj_creator(*self.args, **self.kwargs)
167        conn.send("ready")
168        while True:
169            try:
170                cmd = conn.recv()
171                if cmd == "close":
172                    break
173
174                func_name, not_test, args, kwargs = cmd
175                try:
176                    attr = getattr(remote_obj, func_name)
177                    if not callable(attr):
178                        result = getattr(remote_obj, func_name)
179                        conn.send(result)
180                    else:
181                        if not_test:
182                            result = getattr(remote_obj, func_name)(*args, **kwargs)
183                            conn.send(result)
184                        else:
185                            is_callable = self.IsCallable()
186                            conn.send(is_callable)
187                except Exception as e:
188                    conn.send(e)
189            except (EOFError, ConnectionResetError):
190                break
191
192    def __getattr__(self, name):
193        def dynamic_call(*args, **kwargs):
194            self.parent_conn.send((name, True, args, kwargs))
195            
196            result = self.parent_conn.recv()
197            if isinstance(result, Exception):
198                raise result
199            return result
200        
201        def dynamic_call_parallel(*args, **kwargs):
202            self.parent_conn.send((name, True, args, kwargs))
203            return self.parent_conn
204
205        self.parent_conn.send((name, False, (), {}))
206        res = self.parent_conn.recv()
207        if isinstance(res, Exception):
208            if self.paralle_execution:
209                return res
210            else:
211                raise res
212        if isinstance(res, self.IsCallable):
213            if self.paralle_execution:
214                return dynamic_call_parallel
215            else:
216                return dynamic_call
217        else:
218            return res
219
220    def close(self):
221        if self.process.is_alive():
222            self.parent_conn.send("close")
223            self.process.join()
224        self.parent_conn.close()

RemoteObjectProxy is a proxy class for remote object.

RemoteObjectProxy(remote_obj_creator, paralle_execution=True, *args, **kwargs)
128    def __init__(self, remote_obj_creator, paralle_execution=True, *args, **kwargs):
129        """
130        This class is a proxy class for remote object.
131
132        Parameters
133        ----------
134        remote_obj_creator : function
135            A function to create the remote object, this function should return the object to be controlled in child process.
136
137        paralle_execution : bool, optional
138            Whether to run the function in parallel. Default is True.
139
140        *args
141            The arguments for `remote_obj_creator`.
142            
143        **kwargs
144            The keyword arguments for `remote_obj_creator
145        """
146        self.paralle_execution = paralle_execution
147        """@private"""
148        self.parent_conn, child_conn = multiprocessing.Pipe()
149        """@private"""
150
151        self.args = args
152        """@private"""
153        self.kwargs = kwargs
154        """@private"""
155
156        self.process = multiprocessing.Process(
157            target=self._run_remote_object,
158            args=(remote_obj_creator, child_conn),
159            daemon=True
160        )
161        """@private"""
162        self.process.start()
163        self.parent_conn.recv()  # wait for the remote object to be created

This class is a proxy class for remote object.

Parameters

remote_obj_creator : function A function to create the remote object, this function should return the object to be controlled in child process.

paralle_execution : bool, optional Whether to run the function in parallel. Default is True.

*args The arguments for remote_obj_creator.

**kwargs The keyword arguments for `remote_obj_creator

def close(self):
220    def close(self):
221        if self.process.is_alive():
222            self.parent_conn.send("close")
223            self.process.join()
224        self.parent_conn.close()
class RemoteObjectProxy.IsCallable:
124    class IsCallable:
125        def __init__(self):
126            pass