Module pprocess
[hide private]
[frames] | no frames]

Source Code for Module pprocess

   1  """ 
   2  A simple parallel processing API for Python, inspired somewhat by the thread 
   3  module, slightly less by pypar, and slightly less still by pypvm. 
   4   
   5  Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk> 
   6   
   7  This program is free software; you can redistribute it and/or modify it under 
   8  the terms of the GNU Lesser General Public License as published by the Free 
   9  Software Foundation; either version 3 of the License, or (at your option) any 
  10  later version. 
  11   
  12  This program is distributed in the hope that it will be useful, but WITHOUT 
  13  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  14  FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
  15  details. 
  16   
  17  You should have received a copy of the GNU Lesser General Public License along 
  18  with this program.  If not, see <http://www.gnu.org/licenses/>. 
  19  """ 
  20   
  21  __version__ = "0.5" 
  22   
  23  import os 
  24  import sys 
  25  import select 
  26  import socket 
  27  import platform 
  28   
  29  from warnings import warn 
  30   
  31  try: 
  32      import cPickle as pickle 
  33  except ImportError: 
  34      import pickle 
  35   
  36  try: 
  37      set 
  38  except NameError: 
  39      from sets import Set as set 
  40   
  41  # Special values. 
  42   
43 -class Undefined: pass
44 45 # Communications. 46
47 -class AcknowledgementError(Exception):
48 pass
49
50 -class Channel:
51 52 "A communications channel." 53
54 - def __init__(self, pid, read_pipe, write_pipe):
55 56 """ 57 Initialise the channel with a process identifier 'pid', a 'read_pipe' 58 from which messages will be received, and a 'write_pipe' into which 59 messages will be sent. 60 """ 61 62 self.pid = pid 63 self.read_pipe = read_pipe 64 self.write_pipe = write_pipe
65
66 - def __del__(self):
67 68 # Since signals don't work well with I/O, we close pipes and wait for 69 # created processes upon finalisation. 70 71 self.close()
72
73 - def close(self):
74 75 "Explicitly close the channel." 76 77 if self.read_pipe is not None: 78 self.read_pipe.close() 79 self.read_pipe = None 80 if self.write_pipe is not None: 81 self.write_pipe.close() 82 self.write_pipe = None
83 #self.wait(os.WNOHANG) 84
85 - def wait(self, options=0):
86 87 "Wait for the created process, if any, to exit." 88 89 if self.pid != 0: 90 try: 91 os.waitpid(self.pid, options) 92 except OSError: 93 pass
94
95 - def _send(self, obj):
96 97 "Send the given object 'obj' through the channel." 98 99 pickle.dump(obj, self.write_pipe) 100 self.write_pipe.flush()
101
102 - def send(self, obj):
103 104 """ 105 Send the given object 'obj' through the channel. Then wait for an 106 acknowledgement. (The acknowledgement makes the caller wait, thus 107 preventing processes from exiting and disrupting the communications 108 channel and losing data.) 109 """ 110 111 self._send(obj) 112 if self._receive() != "OK": 113 raise AcknowledgementError, obj
114
115 - def _receive(self):
116 117 "Receive an object through the channel, returning the object." 118 119 obj = pickle.load(self.read_pipe) 120 if isinstance(obj, Exception): 121 raise obj 122 else: 123 return obj
124
125 - def receive(self):
126 127 """ 128 Receive an object through the channel, returning the object. Send an 129 acknowledgement of receipt. (The acknowledgement makes the sender wait, 130 thus preventing processes from exiting and disrupting the communications 131 channel and losing data.) 132 """ 133 134 try: 135 obj = self._receive() 136 return obj 137 finally: 138 self._send("OK")
139
140 -class PersistentChannel(Channel):
141 142 """ 143 A persistent communications channel which can handle peer disconnection, 144 acting as a server, meaning that this channel is associated with a specific 145 address which can be contacted by other processes. 146 """ 147
148 - def __init__(self, pid, endpoint, address):
149 Channel.__init__(self, pid, None, None) 150 self.endpoint = endpoint 151 self.address = address 152 self.poller = select.poll() 153 154 # Listen for connections before this process is interested in 155 # communicating. It is not desirable to wait for connections at this 156 # point because this will block the process. 157 158 self.endpoint.listen(1)
159
160 - def close(self):
161 162 "Close the persistent channel and remove the socket file." 163 164 Channel.close(self) 165 try: 166 os.unlink(self.address) 167 except OSError: 168 pass
169
170 - def _ensure_pipes(self):
171 172 "Ensure that the channel is capable of communicating." 173 174 if self.read_pipe is None or self.write_pipe is None: 175 176 # Accept any incoming connections. 177 178 endpoint, address = self.endpoint.accept() 179 self.read_pipe = endpoint.makefile("r", 0) 180 self.write_pipe = endpoint.makefile("w", 0) 181 182 # Monitor the write pipe for error conditions. 183 184 fileno = self.write_pipe.fileno() 185 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
186
187 - def _reset_pipes(self):
188 189 "Discard the existing connection." 190 191 fileno = self.write_pipe.fileno() 192 self.poller.unregister(fileno) 193 self.read_pipe = None 194 self.write_pipe = None 195 self.endpoint.listen(1)
196
197 - def _ensure_communication(self, timeout=None):
198 199 "Ensure that sending and receiving are possible." 200 201 while 1: 202 self._ensure_pipes() 203 fileno = self.write_pipe.fileno() 204 fds = self.poller.poll(timeout) 205 for fd, status in fds: 206 if fd != fileno: 207 continue 208 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 209 210 # Broken connection: discard it and start all over again. 211 212 self._reset_pipes() 213 break 214 else: 215 return
216
217 - def _send(self, obj):
218 219 "Send the given object 'obj' through the channel." 220 221 self._ensure_communication() 222 Channel._send(self, obj)
223
224 - def _receive(self):
225 226 "Receive an object through the channel, returning the object." 227 228 self._ensure_communication() 229 return Channel._receive(self)
230 231 # Management of processes and communications. 232
233 -class Exchange:
234 235 """ 236 A communications exchange that can be used to detect channels which are 237 ready to communicate. Subclasses of this class can define the 'store_data' 238 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 239 240 Once exchanges are populated with active channels, use of the principal 241 methods of the exchange typically cause the 'store' method to be invoked, 242 resulting in the processing of any incoming data. 243 """ 244
245 - def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1):
246 247 """ 248 Initialise the exchange with an optional list of 'channels'. 249 250 If the optional 'limit' is specified, restrictions on the addition of 251 new channels can be enforced and observed through the 'add_wait', 'wait' 252 and 'finish' methods. To make use of these methods, create a subclass of 253 this class and define a working 'store_data' method. 254 255 If the optional 'reuse' parameter is set to a true value, channels and 256 processes will be reused for waiting computations, but the callable will 257 be invoked for each computation. 258 259 If the optional 'continuous' parameter is set to a true value, channels 260 and processes will be retained after receiving data sent from such 261 processes, since it will be assumed that they will communicate more 262 data. 263 264 If the optional 'autoclose' parameter is set to a false value, channels 265 will not be closed automatically when they are removed from the exchange 266 - by default they are closed when removed. 267 """ 268 269 self.limit = limit 270 self.reuse = reuse 271 self.autoclose = autoclose 272 self.continuous = continuous 273 274 self.waiting = [] 275 self.readables = {} 276 self.removed = [] 277 self.poller = select.poll() 278 279 for channel in channels or []: 280 self.add(channel)
281 282 # Core methods, registering and reporting on channels. 283
284 - def add(self, channel):
285 286 "Add the given 'channel' to the exchange." 287 288 fileno = channel.read_pipe.fileno() 289 self.readables[fileno] = channel 290 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
291
292 - def active(self):
293 294 "Return a list of active channels." 295 296 return self.readables.values()
297
298 - def ready(self, timeout=None):
299 300 """ 301 Wait for a period of time specified by the optional 'timeout' in 302 milliseconds (or until communication is possible) and return a list of 303 channels which are ready to be read from. 304 """ 305 306 fds = self.poller.poll(timeout) 307 readables = [] 308 self.removed = [] 309 310 for fd, status in fds: 311 channel = self.readables[fd] 312 removed = 0 313 314 # Remove ended/error channels. 315 316 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 317 self.remove(channel) 318 self.removed.append(channel) 319 removed = 1 320 321 # Record readable channels. 322 323 if status & select.POLLIN: 324 if not (removed and self.autoclose): 325 readables.append(channel) 326 327 return readables
328
329 - def remove(self, channel):
330 331 """ 332 Remove the given 'channel' from the exchange. 333 """ 334 335 fileno = channel.read_pipe.fileno() 336 del self.readables[fileno] 337 self.poller.unregister(fileno) 338 if self.autoclose: 339 channel.close() 340 channel.wait()
341 342 # Enhanced exchange methods involving channel limits. 343
344 - def unfinished(self):
345 346 "Return whether the exchange still has work scheduled or in progress." 347 348 return self.active() or self.waiting
349
350 - def busy(self):
351 352 "Return whether the exchange uses as many channels as it is allowed to." 353 354 return self.limit is not None and len(self.active()) >= self.limit
355
356 - def add_wait(self, channel):
357 358 """ 359 Add the given 'channel' to the exchange, waiting if the limit on active 360 channels would be exceeded by adding the channel. 361 """ 362 363 self.wait() 364 self.add(channel)
365
366 - def wait(self):
367 368 """ 369 Test for the limit on channels, blocking and reading incoming data until 370 the number of channels is below the limit. 371 """ 372 373 # If limited, block until channels have been closed. 374 375 while self.busy(): 376 self.store()
377
378 - def finish(self):
379 380 """ 381 Finish the use of the exchange by waiting for all channels to complete. 382 """ 383 384 while self.unfinished(): 385 self.store()
386
387 - def store(self, timeout=None):
388 389 """ 390 For each ready channel, process the incoming data. If the optional 391 'timeout' parameter (a duration in milliseconds) is specified, wait only 392 for the specified duration if no channels are ready to provide data. 393 """ 394 395 # Either process input from active channels. 396 397 if self.active(): 398 for channel in self.ready(timeout): 399 try: 400 self.store_data(channel) 401 self.start_waiting(channel) 402 except IOError, e: 403 self.remove(channel) 404 warn("Removed channel %s since caught an exception %s" 405 % (channel, e)) 406 407 # Or schedule new processes and channels. 408 409 else: 410 while self.waiting and not self.busy(): 411 callable, args, kw = self.waiting.pop() 412 self.start(callable, *args, **kw)
413
414 - def store_data(self, channel):
415 416 """ 417 Store incoming data from the specified 'channel'. In subclasses of this 418 class, such data could be stored using instance attributes. 419 """ 420 421 raise NotImplementedError, "store_data"
422 423 # Support for the convenience methods. 424
425 - def _get_waiting(self, channel):
426 427 """ 428 Get waiting callable and argument information for new processes, given 429 the reception of data on the given 'channel'. 430 """ 431 432 # For continuous channels, no scheduling is requested. 433 434 if self.waiting and not self.continuous: 435 436 # Schedule this callable and arguments. 437 438 callable, args, kw = self.waiting.pop() 439 440 # Try and reuse existing channels if possible. 441 442 if self.reuse: 443 444 # Re-add the channel - this may update information related to 445 # the channel in subclasses. 446 447 self.add(channel) 448 channel.send((args, kw)) 449 450 else: 451 return callable, args, kw 452 453 # Where channels are being reused, but where no processes are waiting 454 # any more, send a special value to tell them to quit. 455 456 elif self.reuse: 457 channel.send(None) 458 459 return None
460
461 - def _set_waiting(self, callable, args, kw):
462 463 """ 464 Support process creation by returning whether the given 'callable' has 465 been queued for later invocation. 466 """ 467 468 if self.busy(): 469 self.waiting.insert(0, (callable, args, kw)) 470 return 1 471 else: 472 return 0
473
474 - def _get_channel_for_process(self, channel):
475 476 """ 477 Support process creation by returning the given 'channel' to the 478 creating process, and None to the created process. 479 """ 480 481 if channel.pid == 0: 482 return channel 483 else: 484 self.add_wait(channel) 485 return None
486 487 # Methods for overriding, related to the convenience methods. 488
489 - def start_waiting(self, channel):
490 491 """ 492 Start a waiting process given the reception of data on the given 493 'channel'. 494 """ 495 496 details = self._get_waiting(channel) 497 if details is not None: 498 callable, args, kw = details 499 self.add(start(callable, *args, **kw))
500 501 # Convenience methods. 502
503 - def start(self, callable, *args, **kw):
504 505 """ 506 Create a new process for the given 'callable' using any additional 507 arguments provided. Then, monitor the channel created between this 508 process and the created process. 509 """ 510 511 if self._set_waiting(callable, args, kw): 512 return 513 514 self.add_wait(start(callable, *args, **kw))
515
516 - def create(self):
517 518 """ 519 Create a new process and return the created communications channel to 520 the created process. In the creating process, return None - the channel 521 receiving data from the created process will be automatically managed by 522 this exchange. 523 """ 524 525 channel = create() 526 return self._get_channel_for_process(channel)
527
528 - def manage(self, callable):
529 530 """ 531 Wrap the given 'callable' in an object which can then be called in the 532 same way as 'callable', but with new processes and communications 533 managed automatically. 534 """ 535 536 return ManagedCallable(callable, self)
537
538 -class Persistent:
539 540 """ 541 A mix-in class providing methods to exchanges for the management of 542 persistent communications. 543 """ 544
545 - def start_waiting(self, channel):
546 547 """ 548 Start a waiting process given the reception of data on the given 549 'channel'. 550 """ 551 552 details = self._get_waiting(channel) 553 if details is not None: 554 callable, args, kw = details 555 self.add(start_persistent(channel.address, callable, *args, **kw))
556
557 - def start(self, address, callable, *args, **kw):
558 559 """ 560 Create a new process, located at the given 'address', for the given 561 'callable' using any additional arguments provided. Then, monitor the 562 channel created between this process and the created process. 563 """ 564 565 if self._set_waiting(callable, args, kw): 566 return 567 568 start_persistent(address, callable, *args, **kw)
569
570 - def create(self, address):
571 572 """ 573 Create a new process, located at the given 'address', and return the 574 created communications channel to the created process. In the creating 575 process, return None - the channel receiving data from the created 576 process will be automatically managed by this exchange. 577 """ 578 579 channel = create_persistent(address) 580 return self._get_channel_for_process(channel)
581
582 - def manage(self, address, callable):
583 584 """ 585 Using the given 'address', publish the given 'callable' in an object 586 which can then be called in the same way as 'callable', but with new 587 processes and communications managed automatically. 588 """ 589 590 return PersistentCallable(address, callable, self)
591
592 - def connect(self, address):
593 594 "Connect to a process which is contactable via the given 'address'." 595 596 channel = connect_persistent(address) 597 self.add_wait(channel)
598
599 -class ManagedCallable:
600 601 "A callable managed by an exchange." 602
603 - def __init__(self, callable, exchange):
604 605 """ 606 Wrap the given 'callable', using the given 'exchange' to monitor the 607 channels created for communications between this and the created 608 processes. Note that the 'callable' must be parallel-aware (that is, 609 have a 'channel' parameter). Use the MakeParallel class to wrap other 610 kinds of callable objects. 611 """ 612 613 self.callable = callable 614 self.exchange = exchange
615
616 - def __call__(self, *args, **kw):
617 618 "Invoke the callable with the supplied arguments." 619 620 self.exchange.start(self.callable, *args, **kw)
621
622 -class PersistentCallable:
623 624 "A callable which sets up a persistent communications channel." 625
626 - def __init__(self, address, callable, exchange):
627 628 """ 629 Using the given 'address', wrap the given 'callable', using the given 630 'exchange' to monitor the channels created for communications between 631 this and the created processes, so that when it is called, a background 632 process is started within which the 'callable' will run. Note that the 633 'callable' must be parallel-aware (that is, have a 'channel' parameter). 634 Use the MakeParallel class to wrap other kinds of callable objects. 635 """ 636 637 self.callable = callable 638 self.exchange = exchange 639 self.address = address
640
641 - def __call__(self, *args, **kw):
642 643 "Invoke the callable with the supplied arguments." 644 645 self.exchange.start(self.address, self.callable, *args, **kw)
646
647 -class BackgroundCallable:
648 649 """ 650 A callable which sets up a persistent communications channel, but is 651 unmanaged by an exchange. 652 """ 653
654 - def __init__(self, address, callable):
655 656 """ 657 Using the given 'address', wrap the given 'callable'. This object can 658 then be invoked, but the wrapped callable will be run in a background 659 process. Note that the 'callable' must be parallel-aware (that is, have 660 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 661 of callable objects. 662 """ 663 664 self.callable = callable 665 self.address = address
666
667 - def __call__(self, *args, **kw):
668 669 "Invoke the callable with the supplied arguments." 670 671 start_persistent(self.address, self.callable, *args, **kw)
672 673 # Abstractions and utilities. 674
675 -class Map(Exchange):
676 677 "An exchange which can be used like the built-in 'map' function." 678
679 - def __init__(self, *args, **kw):
680 Exchange.__init__(self, *args, **kw) 681 self.init()
682
683 - def init(self):
684 685 "Remember the channel addition order to order output." 686 687 self.channel_number = 0 688 self.channels = {} 689 self.results = [] 690 self.current_index = 0
691
692 - def add(self, channel):
693 694 "Add the given 'channel' to the exchange." 695 696 Exchange.add(self, channel) 697 self.channels[channel] = self.channel_number 698 self.channel_number += 1
699
700 - def start(self, callable, *args, **kw):
701 702 """ 703 Create a new process for the given 'callable' using any additional 704 arguments provided. Then, monitor the channel created between this 705 process and the created process. 706 """ 707 708 self.results.append(Undefined) # placeholder 709 Exchange.start(self, callable, *args, **kw)
710
711 - def create(self):
712 713 """ 714 Create a new process and return the created communications channel to 715 the created process. In the creating process, return None - the channel 716 receiving data from the created process will be automatically managed by 717 this exchange. 718 """ 719 720 self.results.append(Undefined) # placeholder 721 return Exchange.create(self)
722
723 - def __call__(self, callable, sequence):
724 725 "Wrap and invoke 'callable' for each element in the 'sequence'." 726 727 if not isinstance(callable, MakeParallel): 728 wrapped = MakeParallel(callable) 729 else: 730 wrapped = callable 731 732 self.init() 733 734 # Start processes for each element in the sequence. 735 736 for i in sequence: 737 self.start(wrapped, i) 738 739 # Access to the results occurs through this object. 740 741 return self
742
743 - def store_data(self, channel):
744 745 "Accumulate the incoming data, associating results with channels." 746 747 data = channel.receive() 748 self.results[self.channels[channel]] = data 749 del self.channels[channel]
750
751 - def __iter__(self):
752 return self
753
754 - def next(self):
755 756 "Return the next element in the map." 757 758 try: 759 return self._next() 760 except IndexError: 761 pass 762 763 while self.unfinished(): 764 self.store() 765 try: 766 return self._next() 767 except IndexError: 768 pass 769 else: 770 raise StopIteration
771
772 - def __getitem__(self, i):
773 774 "Return element 'i' from the map." 775 776 try: 777 return self._get(i) 778 except IndexError: 779 pass 780 781 while self.unfinished(): 782 self.store() 783 try: 784 return self._get(i) 785 except IndexError: 786 pass 787 else: 788 raise IndexError, i
789 790 # Helper methods for the above access methods. 791
792 - def _next(self):
793 result = self._get(self.current_index) 794 self.current_index += 1 795 return result
796
797 - def _get(self, i):
798 result = self.results[i] 799 if result is Undefined or isinstance(i, slice) and Undefined in result: 800 raise IndexError, i 801 return result
802
803 -class Queue(Exchange):
804 805 """ 806 An exchange acting as a queue, making data from created processes available 807 in the order in which it is received. 808 """ 809
810 - def __init__(self, *args, **kw):
811 Exchange.__init__(self, *args, **kw) 812 self.queue = []
813
814 - def store_data(self, channel):
815 816 "Accumulate the incoming data, associating results with channels." 817 818 data = channel.receive() 819 self.queue.insert(0, data)
820
821 - def __iter__(self):
822 return self
823
824 - def next(self):
825 826 "Return the next element in the queue." 827 828 if self.queue: 829 return self.queue.pop() 830 831 while self.unfinished(): 832 self.store() 833 if self.queue: 834 return self.queue.pop() 835 else: 836 raise StopIteration
837
838 - def __len__(self):
839 840 "Return the current length of the queue." 841 842 return len(self.queue)
843
844 -class MakeParallel:
845 846 "A wrapper around functions making them able to communicate results." 847
848 - def __init__(self, callable):
849 850 """ 851 Initialise the wrapper with the given 'callable'. This object will then 852 be able to accept a 'channel' parameter when invoked, and to forward the 853 result of the given 'callable' via the channel provided back to the 854 invoking process. 855 """ 856 857 self.callable = callable
858
859 - def __call__(self, channel, *args, **kw):
860 861 "Invoke the callable and return its result via the given 'channel'." 862 863 channel.send(self.callable(*args, **kw))
864
865 -class MakeReusable(MakeParallel):
866 867 """ 868 A wrapper around functions making them able to communicate results in a 869 reusable fashion. 870 """ 871
872 - def __call__(self, channel, *args, **kw):
873 874 "Invoke the callable and return its result via the given 'channel'." 875 876 channel.send(self.callable(*args, **kw)) 877 t = channel.receive() 878 while t is not None: 879 args, kw = t 880 channel.send(self.callable(*args, **kw)) 881 t = channel.receive()
882 883 # Persistent variants. 884
885 -class PersistentExchange(Persistent, Exchange):
886 887 "An exchange which manages persistent communications." 888 889 pass
890
891 -class PersistentQueue(Persistent, Queue):
892 893 "A queue which manages persistent communications." 894 895 pass
896 897 # Convenience functions. 898
899 -def BackgroundQueue(address):
900 901 """ 902 Connect to a process reachable via the given 'address', making the results 903 of which accessible via a queue. 904 """ 905 906 queue = PersistentQueue(limit=1) 907 queue.connect(address) 908 return queue
909
910 -def pmap(callable, sequence, limit=None):
911 912 """ 913 A parallel version of the built-in map function with an optional process 914 'limit'. The given 'callable' should not be parallel-aware (that is, have a 915 'channel' parameter) since it will be wrapped for parallel communications 916 before being invoked. 917 918 Return the processed 'sequence' where each element in the sequence is 919 processed by a different process. 920 """ 921 922 mymap = Map(limit=limit) 923 return mymap(callable, sequence)
924 925 # Utility functions. 926 927 _cpuinfo_fields = "processor", "physical id", "core id" 928
929 -def _get_number_of_cores():
930 931 """ 932 Return the number of distinct, genuine processor cores. If the platform is 933 not supported by this function, None is returned. 934 """ 935 936 try: 937 f = open("/proc/cpuinfo") 938 try: 939 processors = set() 940 941 # Use the _cpuinfo_field values as "digits" in a larger unique 942 # core identifier. 943 944 processor = [None, None, None] 945 946 for line in f.xreadlines(): 947 for i, field in enumerate(_cpuinfo_fields): 948 949 # Where the field is found, insert the value into the 950 # appropriate location in the processor identifier. 951 952 if line.startswith(field): 953 t = line.split(":") 954 processor[i] = int(t[1].strip()) 955 break 956 957 # Where a new processor description is started, record the 958 # identifier. 959 960 if line.startswith("processor") and processor[0] is not None: 961 processors.add(tuple(processor)) 962 processor = [None, None, None] 963 964 # At the end of reading the file, add any unrecorded processors. 965 966 if processor[0] is not None: 967 processors.add(tuple(processor)) 968 969 return len(processors) 970 971 finally: 972 f.close() 973 974 except OSError: 975 return None
976
977 -def _get_number_of_cores_solaris():
978 979 """ 980 Return the number of cores for OpenSolaris 2008.05 and possibly other 981 editions of Solaris. 982 """ 983 984 f = os.popen("psrinfo -p") 985 try: 986 return int(f.read().strip()) 987 finally: 988 f.close()
989 990 # Low-level functions. 991
992 -def create_socketpair():
993 994 """ 995 Create a new process, returning a communications channel to both the 996 creating process and the created process. 997 """ 998 999 parent, child = socket.socketpair() 1000 for s in [parent, child]: 1001 s.setblocking(1) 1002 1003 pid = os.fork() 1004 if pid == 0: 1005 parent.close() 1006 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 1007 else: 1008 child.close() 1009 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1010
1011 -def create_pipes():
1012 1013 """ 1014 Create a new process, returning a communications channel to both the 1015 creating process and the created process. 1016 1017 This function uses pipes instead of a socket pair, since some platforms 1018 seem to have problems with poll and such socket pairs. 1019 """ 1020 1021 pr, cw = os.pipe() 1022 cr, pw = os.pipe() 1023 1024 pid = os.fork() 1025 if pid == 0: 1026 os.close(pr) 1027 os.close(pw) 1028 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 1029 else: 1030 os.close(cr) 1031 os.close(cw) 1032 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0))
1033 1034 if platform.system() == "SunOS": 1035 create = create_pipes 1036 get_number_of_cores = _get_number_of_cores_solaris 1037 else: 1038 create = create_socketpair 1039 get_number_of_cores = _get_number_of_cores 1040
1041 -def create_persistent(address):
1042 1043 """ 1044 Create a new process, returning a persistent communications channel between 1045 the creating process and the created process. This channel can be 1046 disconnected from the creating process and connected to another process, and 1047 thus can be used to collect results from daemon processes. 1048 1049 In order to be able to reconnect to created processes, the 'address' of the 1050 communications endpoint for the created process needs to be provided. This 1051 should be a filename. 1052 """ 1053 1054 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1055 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1056 child.bind(address) 1057 1058 for s in [parent, child]: 1059 s.setblocking(1) 1060 1061 pid = os.fork() 1062 if pid == 0: 1063 parent.close() 1064 return PersistentChannel(pid, child, address) 1065 else: 1066 child.close() 1067 #parent.connect(address) 1068 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1069
1070 -def connect_persistent(address):
1071 1072 """ 1073 Connect via a persistent channel to an existing created process, reachable 1074 at the given 'address'. 1075 """ 1076 1077 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1078 parent.setblocking(1) 1079 parent.connect(address) 1080 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
1081
1082 -def exit(channel):
1083 1084 """ 1085 Terminate a created process, closing the given 'channel'. 1086 """ 1087 1088 channel.close() 1089 os._exit(0)
1090
1091 -def start(callable, *args, **kw):
1092 1093 """ 1094 Create a new process which shall start running in the given 'callable'. 1095 Additional arguments to the 'callable' can be given as additional arguments 1096 to this function. 1097 1098 Return a communications channel to the creating process. For the created 1099 process, supply a channel as the 'channel' parameter in the given 'callable' 1100 so that it may send data back to the creating process. 1101 """ 1102 1103 channel = create() 1104 if channel.pid == 0: 1105 try: 1106 try: 1107 callable(channel, *args, **kw) 1108 except: 1109 exc_type, exc_value, exc_traceback = sys.exc_info() 1110 channel.send(exc_value) 1111 finally: 1112 exit(channel) 1113 else: 1114 return channel
1115
1116 -def start_persistent(address, callable, *args, **kw):
1117 1118 """ 1119 Create a new process which shall be reachable using the given 'address' and 1120 which will start running in the given 'callable'. Additional arguments to 1121 the 'callable' can be given as additional arguments to this function. 1122 1123 Return a communications channel to the creating process. For the created 1124 process, supply a channel as the 'channel' parameter in the given 'callable' 1125 so that it may send data back to the creating process. 1126 1127 Note that the created process employs a channel which is persistent: it can 1128 withstand disconnection from the creating process and subsequent connections 1129 from other processes. 1130 """ 1131 1132 channel = create_persistent(address) 1133 if channel.pid == 0: 1134 close_streams() 1135 try: 1136 try: 1137 callable(channel, *args, **kw) 1138 except: 1139 exc_type, exc_value, exc_traceback = sys.exc_info() 1140 channel.send(exc_value) 1141 finally: 1142 exit(channel) 1143 else: 1144 return channel
1145
1146 -def close_streams():
1147 1148 """ 1149 Close streams which keep the current process attached to any creating 1150 processes. 1151 """ 1152 1153 os.close(sys.stdin.fileno()) 1154 os.close(sys.stdout.fileno()) 1155 os.close(sys.stderr.fileno())
1156
1157 -def waitall():
1158 1159 "Wait for all created processes to terminate." 1160 1161 try: 1162 while 1: 1163 os.wait() 1164 except OSError: 1165 pass
1166 1167 # vim: tabstop=4 expandtab shiftwidth=4 1168