1 """
2 A simple parallel processing API for Python, inspired somewhat by the thread
3 module, slightly less by pypar, and slightly less still by pypvm.
4
5 Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk>
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU Lesser General Public License as published by the Free
9 Software Foundation; either version 3 of the License, or (at your option) any
10 later version.
11
12 This program is distributed in the hope that it will be useful, but WITHOUT
13 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 details.
16
17 You should have received a copy of the GNU Lesser General Public License along
18 with this program. If not, see <http://www.gnu.org/licenses/>.
19 """
20
21 __version__ = "0.5"
22
23 import os
24 import sys
25 import select
26 import socket
27 import platform
28
29 from warnings import warn
30
31 try:
32 import cPickle as pickle
33 except ImportError:
34 import pickle
35
36 try:
37 set
38 except NameError:
39 from sets import Set as set
40
41
42
44
45
46
49
51
52 "A communications channel."
53
54 - def __init__(self, pid, read_pipe, write_pipe):
55
56 """
57 Initialise the channel with a process identifier 'pid', a 'read_pipe'
58 from which messages will be received, and a 'write_pipe' into which
59 messages will be sent.
60 """
61
62 self.pid = pid
63 self.read_pipe = read_pipe
64 self.write_pipe = write_pipe
65
67
68
69
70
71 self.close()
72
74
75 "Explicitly close the channel."
76
77 if self.read_pipe is not None:
78 self.read_pipe.close()
79 self.read_pipe = None
80 if self.write_pipe is not None:
81 self.write_pipe.close()
82 self.write_pipe = None
83
84
85 - def wait(self, options=0):
86
87 "Wait for the created process, if any, to exit."
88
89 if self.pid != 0:
90 try:
91 os.waitpid(self.pid, options)
92 except OSError:
93 pass
94
96
97 "Send the given object 'obj' through the channel."
98
99 pickle.dump(obj, self.write_pipe)
100 self.write_pipe.flush()
101
102 - def send(self, obj):
103
104 """
105 Send the given object 'obj' through the channel. Then wait for an
106 acknowledgement. (The acknowledgement makes the caller wait, thus
107 preventing processes from exiting and disrupting the communications
108 channel and losing data.)
109 """
110
111 self._send(obj)
112 if self._receive() != "OK":
113 raise AcknowledgementError, obj
114
116
117 "Receive an object through the channel, returning the object."
118
119 obj = pickle.load(self.read_pipe)
120 if isinstance(obj, Exception):
121 raise obj
122 else:
123 return obj
124
126
127 """
128 Receive an object through the channel, returning the object. Send an
129 acknowledgement of receipt. (The acknowledgement makes the sender wait,
130 thus preventing processes from exiting and disrupting the communications
131 channel and losing data.)
132 """
133
134 try:
135 obj = self._receive()
136 return obj
137 finally:
138 self._send("OK")
139
141
142 """
143 A persistent communications channel which can handle peer disconnection,
144 acting as a server, meaning that this channel is associated with a specific
145 address which can be contacted by other processes.
146 """
147
148 - def __init__(self, pid, endpoint, address):
149 Channel.__init__(self, pid, None, None)
150 self.endpoint = endpoint
151 self.address = address
152 self.poller = select.poll()
153
154
155
156
157
158 self.endpoint.listen(1)
159
161
162 "Close the persistent channel and remove the socket file."
163
164 Channel.close(self)
165 try:
166 os.unlink(self.address)
167 except OSError:
168 pass
169
171
172 "Ensure that the channel is capable of communicating."
173
174 if self.read_pipe is None or self.write_pipe is None:
175
176
177
178 endpoint, address = self.endpoint.accept()
179 self.read_pipe = endpoint.makefile("r", 0)
180 self.write_pipe = endpoint.makefile("w", 0)
181
182
183
184 fileno = self.write_pipe.fileno()
185 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
186
188
189 "Discard the existing connection."
190
191 fileno = self.write_pipe.fileno()
192 self.poller.unregister(fileno)
193 self.read_pipe = None
194 self.write_pipe = None
195 self.endpoint.listen(1)
196
198
199 "Ensure that sending and receiving are possible."
200
201 while 1:
202 self._ensure_pipes()
203 fileno = self.write_pipe.fileno()
204 fds = self.poller.poll(timeout)
205 for fd, status in fds:
206 if fd != fileno:
207 continue
208 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
209
210
211
212 self._reset_pipes()
213 break
214 else:
215 return
216
223
230
231
232
234
235 """
236 A communications exchange that can be used to detect channels which are
237 ready to communicate. Subclasses of this class can define the 'store_data'
238 method in order to enable the 'add_wait', 'wait' and 'finish' methods.
239
240 Once exchanges are populated with active channels, use of the principal
241 methods of the exchange typically cause the 'store' method to be invoked,
242 resulting in the processing of any incoming data.
243 """
244
245 - def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1):
246
247 """
248 Initialise the exchange with an optional list of 'channels'.
249
250 If the optional 'limit' is specified, restrictions on the addition of
251 new channels can be enforced and observed through the 'add_wait', 'wait'
252 and 'finish' methods. To make use of these methods, create a subclass of
253 this class and define a working 'store_data' method.
254
255 If the optional 'reuse' parameter is set to a true value, channels and
256 processes will be reused for waiting computations, but the callable will
257 be invoked for each computation.
258
259 If the optional 'continuous' parameter is set to a true value, channels
260 and processes will be retained after receiving data sent from such
261 processes, since it will be assumed that they will communicate more
262 data.
263
264 If the optional 'autoclose' parameter is set to a false value, channels
265 will not be closed automatically when they are removed from the exchange
266 - by default they are closed when removed.
267 """
268
269 self.limit = limit
270 self.reuse = reuse
271 self.autoclose = autoclose
272 self.continuous = continuous
273
274 self.waiting = []
275 self.readables = {}
276 self.removed = []
277 self.poller = select.poll()
278
279 for channel in channels or []:
280 self.add(channel)
281
282
283
284 - def add(self, channel):
285
286 "Add the given 'channel' to the exchange."
287
288 fileno = channel.read_pipe.fileno()
289 self.readables[fileno] = channel
290 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
291
293
294 "Return a list of active channels."
295
296 return self.readables.values()
297
298 - def ready(self, timeout=None):
299
300 """
301 Wait for a period of time specified by the optional 'timeout' in
302 milliseconds (or until communication is possible) and return a list of
303 channels which are ready to be read from.
304 """
305
306 fds = self.poller.poll(timeout)
307 readables = []
308 self.removed = []
309
310 for fd, status in fds:
311 channel = self.readables[fd]
312 removed = 0
313
314
315
316 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
317 self.remove(channel)
318 self.removed.append(channel)
319 removed = 1
320
321
322
323 if status & select.POLLIN:
324 if not (removed and self.autoclose):
325 readables.append(channel)
326
327 return readables
328
330
331 """
332 Remove the given 'channel' from the exchange.
333 """
334
335 fileno = channel.read_pipe.fileno()
336 del self.readables[fileno]
337 self.poller.unregister(fileno)
338 if self.autoclose:
339 channel.close()
340 channel.wait()
341
342
343
345
346 "Return whether the exchange still has work scheduled or in progress."
347
348 return self.active() or self.waiting
349
351
352 "Return whether the exchange uses as many channels as it is allowed to."
353
354 return self.limit is not None and len(self.active()) >= self.limit
355
357
358 """
359 Add the given 'channel' to the exchange, waiting if the limit on active
360 channels would be exceeded by adding the channel.
361 """
362
363 self.wait()
364 self.add(channel)
365
367
368 """
369 Test for the limit on channels, blocking and reading incoming data until
370 the number of channels is below the limit.
371 """
372
373
374
375 while self.busy():
376 self.store()
377
379
380 """
381 Finish the use of the exchange by waiting for all channels to complete.
382 """
383
384 while self.unfinished():
385 self.store()
386
387 - def store(self, timeout=None):
388
389 """
390 For each ready channel, process the incoming data. If the optional
391 'timeout' parameter (a duration in milliseconds) is specified, wait only
392 for the specified duration if no channels are ready to provide data.
393 """
394
395
396
397 if self.active():
398 for channel in self.ready(timeout):
399 try:
400 self.store_data(channel)
401 self.start_waiting(channel)
402 except IOError, e:
403 self.remove(channel)
404 warn("Removed channel %s since caught an exception %s"
405 % (channel, e))
406
407
408
409 else:
410 while self.waiting and not self.busy():
411 callable, args, kw = self.waiting.pop()
412 self.start(callable, *args, **kw)
413
415
416 """
417 Store incoming data from the specified 'channel'. In subclasses of this
418 class, such data could be stored using instance attributes.
419 """
420
421 raise NotImplementedError, "store_data"
422
423
424
426
427 """
428 Get waiting callable and argument information for new processes, given
429 the reception of data on the given 'channel'.
430 """
431
432
433
434 if self.waiting and not self.continuous:
435
436
437
438 callable, args, kw = self.waiting.pop()
439
440
441
442 if self.reuse:
443
444
445
446
447 self.add(channel)
448 channel.send((args, kw))
449
450 else:
451 return callable, args, kw
452
453
454
455
456 elif self.reuse:
457 channel.send(None)
458
459 return None
460
462
463 """
464 Support process creation by returning whether the given 'callable' has
465 been queued for later invocation.
466 """
467
468 if self.busy():
469 self.waiting.insert(0, (callable, args, kw))
470 return 1
471 else:
472 return 0
473
475
476 """
477 Support process creation by returning the given 'channel' to the
478 creating process, and None to the created process.
479 """
480
481 if channel.pid == 0:
482 return channel
483 else:
484 self.add_wait(channel)
485 return None
486
487
488
490
491 """
492 Start a waiting process given the reception of data on the given
493 'channel'.
494 """
495
496 details = self._get_waiting(channel)
497 if details is not None:
498 callable, args, kw = details
499 self.add(start(callable, *args, **kw))
500
501
502
503 - def start(self, callable, *args, **kw):
504
505 """
506 Create a new process for the given 'callable' using any additional
507 arguments provided. Then, monitor the channel created between this
508 process and the created process.
509 """
510
511 if self._set_waiting(callable, args, kw):
512 return
513
514 self.add_wait(start(callable, *args, **kw))
515
517
518 """
519 Create a new process and return the created communications channel to
520 the created process. In the creating process, return None - the channel
521 receiving data from the created process will be automatically managed by
522 this exchange.
523 """
524
525 channel = create()
526 return self._get_channel_for_process(channel)
527
529
530 """
531 Wrap the given 'callable' in an object which can then be called in the
532 same way as 'callable', but with new processes and communications
533 managed automatically.
534 """
535
536 return ManagedCallable(callable, self)
537
539
540 """
541 A mix-in class providing methods to exchanges for the management of
542 persistent communications.
543 """
544
546
547 """
548 Start a waiting process given the reception of data on the given
549 'channel'.
550 """
551
552 details = self._get_waiting(channel)
553 if details is not None:
554 callable, args, kw = details
555 self.add(start_persistent(channel.address, callable, *args, **kw))
556
557 - def start(self, address, callable, *args, **kw):
558
559 """
560 Create a new process, located at the given 'address', for the given
561 'callable' using any additional arguments provided. Then, monitor the
562 channel created between this process and the created process.
563 """
564
565 if self._set_waiting(callable, args, kw):
566 return
567
568 start_persistent(address, callable, *args, **kw)
569
571
572 """
573 Create a new process, located at the given 'address', and return the
574 created communications channel to the created process. In the creating
575 process, return None - the channel receiving data from the created
576 process will be automatically managed by this exchange.
577 """
578
579 channel = create_persistent(address)
580 return self._get_channel_for_process(channel)
581
582 - def manage(self, address, callable):
583
584 """
585 Using the given 'address', publish the given 'callable' in an object
586 which can then be called in the same way as 'callable', but with new
587 processes and communications managed automatically.
588 """
589
590 return PersistentCallable(address, callable, self)
591
593
594 "Connect to a process which is contactable via the given 'address'."
595
596 channel = connect_persistent(address)
597 self.add_wait(channel)
598
600
601 "A callable managed by an exchange."
602
603 - def __init__(self, callable, exchange):
604
605 """
606 Wrap the given 'callable', using the given 'exchange' to monitor the
607 channels created for communications between this and the created
608 processes. Note that the 'callable' must be parallel-aware (that is,
609 have a 'channel' parameter). Use the MakeParallel class to wrap other
610 kinds of callable objects.
611 """
612
613 self.callable = callable
614 self.exchange = exchange
615
617
618 "Invoke the callable with the supplied arguments."
619
620 self.exchange.start(self.callable, *args, **kw)
621
623
624 "A callable which sets up a persistent communications channel."
625
626 - def __init__(self, address, callable, exchange):
627
628 """
629 Using the given 'address', wrap the given 'callable', using the given
630 'exchange' to monitor the channels created for communications between
631 this and the created processes, so that when it is called, a background
632 process is started within which the 'callable' will run. Note that the
633 'callable' must be parallel-aware (that is, have a 'channel' parameter).
634 Use the MakeParallel class to wrap other kinds of callable objects.
635 """
636
637 self.callable = callable
638 self.exchange = exchange
639 self.address = address
640
642
643 "Invoke the callable with the supplied arguments."
644
645 self.exchange.start(self.address, self.callable, *args, **kw)
646
648
649 """
650 A callable which sets up a persistent communications channel, but is
651 unmanaged by an exchange.
652 """
653
655
656 """
657 Using the given 'address', wrap the given 'callable'. This object can
658 then be invoked, but the wrapped callable will be run in a background
659 process. Note that the 'callable' must be parallel-aware (that is, have
660 a 'channel' parameter). Use the MakeParallel class to wrap other kinds
661 of callable objects.
662 """
663
664 self.callable = callable
665 self.address = address
666
668
669 "Invoke the callable with the supplied arguments."
670
671 start_persistent(self.address, self.callable, *args, **kw)
672
673
674
675 -class Map(Exchange):
676
677 "An exchange which can be used like the built-in 'map' function."
678
682
684
685 "Remember the channel addition order to order output."
686
687 self.channel_number = 0
688 self.channels = {}
689 self.results = []
690 self.current_index = 0
691
692 - def add(self, channel):
693
694 "Add the given 'channel' to the exchange."
695
696 Exchange.add(self, channel)
697 self.channels[channel] = self.channel_number
698 self.channel_number += 1
699
700 - def start(self, callable, *args, **kw):
701
702 """
703 Create a new process for the given 'callable' using any additional
704 arguments provided. Then, monitor the channel created between this
705 process and the created process.
706 """
707
708 self.results.append(Undefined)
709 Exchange.start(self, callable, *args, **kw)
710
712
713 """
714 Create a new process and return the created communications channel to
715 the created process. In the creating process, return None - the channel
716 receiving data from the created process will be automatically managed by
717 this exchange.
718 """
719
720 self.results.append(Undefined)
721 return Exchange.create(self)
722
723 - def __call__(self, callable, sequence):
724
725 "Wrap and invoke 'callable' for each element in the 'sequence'."
726
727 if not isinstance(callable, MakeParallel):
728 wrapped = MakeParallel(callable)
729 else:
730 wrapped = callable
731
732 self.init()
733
734
735
736 for i in sequence:
737 self.start(wrapped, i)
738
739
740
741 return self
742
744
745 "Accumulate the incoming data, associating results with channels."
746
747 data = channel.receive()
748 self.results[self.channels[channel]] = data
749 del self.channels[channel]
750
753
755
756 "Return the next element in the map."
757
758 try:
759 return self._next()
760 except IndexError:
761 pass
762
763 while self.unfinished():
764 self.store()
765 try:
766 return self._next()
767 except IndexError:
768 pass
769 else:
770 raise StopIteration
771
773
774 "Return element 'i' from the map."
775
776 try:
777 return self._get(i)
778 except IndexError:
779 pass
780
781 while self.unfinished():
782 self.store()
783 try:
784 return self._get(i)
785 except IndexError:
786 pass
787 else:
788 raise IndexError, i
789
790
791
793 result = self._get(self.current_index)
794 self.current_index += 1
795 return result
796
798 result = self.results[i]
799 if result is Undefined or isinstance(i, slice) and Undefined in result:
800 raise IndexError, i
801 return result
802
804
805 """
806 An exchange acting as a queue, making data from created processes available
807 in the order in which it is received.
808 """
809
813
815
816 "Accumulate the incoming data, associating results with channels."
817
818 data = channel.receive()
819 self.queue.insert(0, data)
820
823
825
826 "Return the next element in the queue."
827
828 if self.queue:
829 return self.queue.pop()
830
831 while self.unfinished():
832 self.store()
833 if self.queue:
834 return self.queue.pop()
835 else:
836 raise StopIteration
837
839
840 "Return the current length of the queue."
841
842 return len(self.queue)
843
845
846 "A wrapper around functions making them able to communicate results."
847
849
850 """
851 Initialise the wrapper with the given 'callable'. This object will then
852 be able to accept a 'channel' parameter when invoked, and to forward the
853 result of the given 'callable' via the channel provided back to the
854 invoking process.
855 """
856
857 self.callable = callable
858
859 - def __call__(self, channel, *args, **kw):
860
861 "Invoke the callable and return its result via the given 'channel'."
862
863 channel.send(self.callable(*args, **kw))
864
866
867 """
868 A wrapper around functions making them able to communicate results in a
869 reusable fashion.
870 """
871
872 - def __call__(self, channel, *args, **kw):
873
874 "Invoke the callable and return its result via the given 'channel'."
875
876 channel.send(self.callable(*args, **kw))
877 t = channel.receive()
878 while t is not None:
879 args, kw = t
880 channel.send(self.callable(*args, **kw))
881 t = channel.receive()
882
883
884
886
887 "An exchange which manages persistent communications."
888
889 pass
890
892
893 "A queue which manages persistent communications."
894
895 pass
896
897
898
900
901 """
902 Connect to a process reachable via the given 'address', making the results
903 of which accessible via a queue.
904 """
905
906 queue = PersistentQueue(limit=1)
907 queue.connect(address)
908 return queue
909
910 -def pmap(callable, sequence, limit=None):
911
912 """
913 A parallel version of the built-in map function with an optional process
914 'limit'. The given 'callable' should not be parallel-aware (that is, have a
915 'channel' parameter) since it will be wrapped for parallel communications
916 before being invoked.
917
918 Return the processed 'sequence' where each element in the sequence is
919 processed by a different process.
920 """
921
922 mymap = Map(limit=limit)
923 return mymap(callable, sequence)
924
925
926
927 _cpuinfo_fields = "processor", "physical id", "core id"
928
930
931 """
932 Return the number of distinct, genuine processor cores. If the platform is
933 not supported by this function, None is returned.
934 """
935
936 try:
937 f = open("/proc/cpuinfo")
938 try:
939 processors = set()
940
941
942
943
944 processor = [None, None, None]
945
946 for line in f.xreadlines():
947 for i, field in enumerate(_cpuinfo_fields):
948
949
950
951
952 if line.startswith(field):
953 t = line.split(":")
954 processor[i] = int(t[1].strip())
955 break
956
957
958
959
960 if line.startswith("processor") and processor[0] is not None:
961 processors.add(tuple(processor))
962 processor = [None, None, None]
963
964
965
966 if processor[0] is not None:
967 processors.add(tuple(processor))
968
969 return len(processors)
970
971 finally:
972 f.close()
973
974 except OSError:
975 return None
976
978
979 """
980 Return the number of cores for OpenSolaris 2008.05 and possibly other
981 editions of Solaris.
982 """
983
984 f = os.popen("psrinfo -p")
985 try:
986 return int(f.read().strip())
987 finally:
988 f.close()
989
990
991
993
994 """
995 Create a new process, returning a communications channel to both the
996 creating process and the created process.
997 """
998
999 parent, child = socket.socketpair()
1000 for s in [parent, child]:
1001 s.setblocking(1)
1002
1003 pid = os.fork()
1004 if pid == 0:
1005 parent.close()
1006 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0))
1007 else:
1008 child.close()
1009 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1010
1012
1013 """
1014 Create a new process, returning a communications channel to both the
1015 creating process and the created process.
1016
1017 This function uses pipes instead of a socket pair, since some platforms
1018 seem to have problems with poll and such socket pairs.
1019 """
1020
1021 pr, cw = os.pipe()
1022 cr, pw = os.pipe()
1023
1024 pid = os.fork()
1025 if pid == 0:
1026 os.close(pr)
1027 os.close(pw)
1028 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0))
1029 else:
1030 os.close(cr)
1031 os.close(cw)
1032 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0))
1033
1034 if platform.system() == "SunOS":
1035 create = create_pipes
1036 get_number_of_cores = _get_number_of_cores_solaris
1037 else:
1038 create = create_socketpair
1039 get_number_of_cores = _get_number_of_cores
1040
1042
1043 """
1044 Create a new process, returning a persistent communications channel between
1045 the creating process and the created process. This channel can be
1046 disconnected from the creating process and connected to another process, and
1047 thus can be used to collect results from daemon processes.
1048
1049 In order to be able to reconnect to created processes, the 'address' of the
1050 communications endpoint for the created process needs to be provided. This
1051 should be a filename.
1052 """
1053
1054 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1055 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1056 child.bind(address)
1057
1058 for s in [parent, child]:
1059 s.setblocking(1)
1060
1061 pid = os.fork()
1062 if pid == 0:
1063 parent.close()
1064 return PersistentChannel(pid, child, address)
1065 else:
1066 child.close()
1067
1068 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1069
1071
1072 """
1073 Connect via a persistent channel to an existing created process, reachable
1074 at the given 'address'.
1075 """
1076
1077 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1078 parent.setblocking(1)
1079 parent.connect(address)
1080 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
1081
1083
1084 """
1085 Terminate a created process, closing the given 'channel'.
1086 """
1087
1088 channel.close()
1089 os._exit(0)
1090
1091 -def start(callable, *args, **kw):
1092
1093 """
1094 Create a new process which shall start running in the given 'callable'.
1095 Additional arguments to the 'callable' can be given as additional arguments
1096 to this function.
1097
1098 Return a communications channel to the creating process. For the created
1099 process, supply a channel as the 'channel' parameter in the given 'callable'
1100 so that it may send data back to the creating process.
1101 """
1102
1103 channel = create()
1104 if channel.pid == 0:
1105 try:
1106 try:
1107 callable(channel, *args, **kw)
1108 except:
1109 exc_type, exc_value, exc_traceback = sys.exc_info()
1110 channel.send(exc_value)
1111 finally:
1112 exit(channel)
1113 else:
1114 return channel
1115
1117
1118 """
1119 Create a new process which shall be reachable using the given 'address' and
1120 which will start running in the given 'callable'. Additional arguments to
1121 the 'callable' can be given as additional arguments to this function.
1122
1123 Return a communications channel to the creating process. For the created
1124 process, supply a channel as the 'channel' parameter in the given 'callable'
1125 so that it may send data back to the creating process.
1126
1127 Note that the created process employs a channel which is persistent: it can
1128 withstand disconnection from the creating process and subsequent connections
1129 from other processes.
1130 """
1131
1132 channel = create_persistent(address)
1133 if channel.pid == 0:
1134 close_streams()
1135 try:
1136 try:
1137 callable(channel, *args, **kw)
1138 except:
1139 exc_type, exc_value, exc_traceback = sys.exc_info()
1140 channel.send(exc_value)
1141 finally:
1142 exit(channel)
1143 else:
1144 return channel
1145
1147
1148 """
1149 Close streams which keep the current process attached to any creating
1150 processes.
1151 """
1152
1153 os.close(sys.stdin.fileno())
1154 os.close(sys.stdout.fileno())
1155 os.close(sys.stderr.fileno())
1156
1158
1159 "Wait for all created processes to terminate."
1160
1161 try:
1162 while 1:
1163 os.wait()
1164 except OSError:
1165 pass
1166
1167
1168