Simple Concurrency in Python

微信扫一扫,分享到朋友圈

Simple Concurrency in Python

Created on April 29, 2018, 11:26 a.m.

Whenever I attempt concurrency in Python it seems that something like this happens:

KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "

 ", line 1, in 
 
  
F  File "
  
   ", line 1, in 
   
    
  File "
    
     ", line 1, in 
     
      
  File "C:Python35libmultiprocessing__init__.py", line 16, in 
      
       
a  File "C:Python35libmultiprocessing__init__.py", line 16, in 
       
        
  File "C:Python35libmultiprocessing__init__.py", line 16, in 
        
         
i    from . import context
    from . import context
l  File "C:Python35libmultiprocessingcontext.py", line 5, in 
         
          
    from . import context
  File "C:Python35libmultiprocessingcontext.py", line 3, in 
          
           
e    from . import process
    import threading
d  File "C:Python35libmultiprocessingcontext.py", line 3, in 
           
            
  File "
            
             ", line 969, in _find_and_load
  File "C:Python35libthreading.py", line 7, in 
             
              
     import threading
  File "
              
               ", line 958, in _find_and_load_unlocked
t    from traceback import format_exc as _format_exc
  File "C:Python35libthreading.py", line 7, in 
               
                
  File "
                
                 ", line 673, in _load_unlocked
o  File "C:Python35libtraceback.py", line 5, in 
                 
                  
    from traceback import format_exc as _format_exc
  File "
                  
                   ", line 669, in exec_module
     import linecache
  File "C:Python35libtraceback.py", line 5, in 
                   
                    
  File "
                    
                     ", line 773, in get_code
i  File "C:Python35liblinecache.py", line 11, in 
                     
                      
    import linecache
    import tokenize
m  File "
                      
                       ", line 484, in _compile_bytecode
  File "C:Python35liblinecache.py", line 11, in 
                       
                        
  File "C:Python35libtokenize.py", line 32, in 
                        
                         
pKeyboardInterrupt
    import tokenize
o    import re
  File "C:Python35libtokenize.py", line 32, in 
                         
                          
r  File "
                          
                           ", line 969, in _find_and_load
t    import re
  File "
                           
                            ", line 954, in _find_and_load_unlocked
   File "C:Python35libre.py", line 335, in 
                            
                             
t  File "
                             
                              ", line 896, in _find_spec
h  File "
                              
                               ", line 1147, in find_spec
e  File "
                               
                                ", line 1121, in _get_spec
   File "
                                
                                 ", line 1229, in find_spec
s  File "
                                 
                                  ", line 82, in _path_stat
iKeyboardInterrupt
te module
Traceback (most recent call last):
  File "C:Python35libsite.py", line 563, in 
                                  
                                   
    main()
  File "C:Python35libsite.py", line 550, in main
    known_paths = addsitepackages(known_paths)
  File "C:Python35libsite.py", line 327, in addsitepackages
    addsitedir(sitedir, known_paths)
  File "C:Python35libsite.py", line 206, in addsitedir
    addpackage(sitedir, name, known_paths)
  File "C:Python35libsite.py", line 167, in addpackage
    import copyreg
  File "
                                   
                                    ", line 969, in _find_and_load
    exec(line)
  File "
                                    
                                     ", line 958, in _find_and_load_unlocked
  File "
                                     
                                      ", line 1, in 
                                      
                                       
  File "
                                       
                                        ", line 673, in _load_unlocked

                                       
                                      
                                     
                                    
                                   
                                  
                                 
                                
                               
                              
                             
                            
                           
                          
                         
                        
                       
                      
                     
                    
                   
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  
 

And I’m sure it isn’t just me – the simple fact is that doing concurrency in Python sucks – and even the libraries which try to make it nicer often don’t do that good a job or only make your life easy if you structure your program in a very particular way.

This is all made worse by the fact that threads
in Python are not really executed concurrent but just have their operations interleaved by the Python interpreter. This means most forms of concurrency in python don’t actually make use of multiple CPU cores and are therefore fairly useless from a performance perspective. To get true concurrency in Python you have to use the multiprocessing
module, which at first looks like a drop-in replacement for threads, but in reality works in a completely different way – by launching multiple Python interpreters in separate processes and allowing some communication between them.

The saving grace of all of this is that the Python multiprocessing
module actually provides some pretty nice basic tools, and if you understand how it works at a high level then making use of these tools to achieve concurrency is actually a fairly painless experience.

But first a little revision on how Python multiprocessing actually works. Let’s take a look at the following basic Python script.

from multiprocessing import Process
    
def greet(name):
    print('Hello %s!' % (name))
    
if __name__ == '__main__':
    p = Process(target=greet, args=('Daniel',))
    p.start()
    p.join()

What actually happens when we run this script? Well everything runs basically as expected until we reach the line p.start()
.

When this line is executed a few important things happen. First, the arguments we supplied to the args
parameter are pickled
, which basically means they are converted into a raw stream of bytes and saved for later. Next, the script currently being run is run again
by a new instance of Python. This second run of the script is given a different value for the __name__
variable, and has all of its output re-directed to the original process. The new run of the script executes everything as usual until it reaches the end of the script. At this point it unpickles the arguments that were saved and executes the function that was specified by the target
parameter. Once it finishes executing this function it ends.

We can see this behavior if we run the same script as before but include another print statement in the middle:

from multiprocessing import Process

def greet(name):
    print('Hello %s!' % (name))
    
print(__name__)
    
if __name__ == '__main__':
    p = Process(target=greet, args=('Daniel',))
    p.start()
    p.join()

When we run this version it outputs something like this:

__main__
__mp_main__
Hello Daniel!

We can see the different values given to __name__
variable for the different runs, and if we were to look in task manager while it was executing we would see two instances of python.exe
too (on Windows).

This little experiment highlights the behavior and also the main two limitations of Python’s multiprocessing
module – namely that any values given as the args
parameter must be pickle
able, which usually means simple to serializable data – and secondly that the target
parameter must always be a top level function in the script and independent from anything that happens inside of the if __name__ == '__main__':
part of the script.

(It is worth noting that while the multiprocessing
module was designed to have the exact same interface as the Python threading
module, and as such appears like it can be used as a drop-in replacement – I find it better to think about the two as completely separately entities as the above limitations mean this is almost always not the case.)

So here is my little recipe for multiprocessing
in Python. It starts with a function which calls another function with a given set of arguments and key word arguments asynchronous
– that is, it calls the provided function without waiting for it to finish. Instead what it returns is a handle
, something we can use later to wait for the called function to finish and get the return value. This asynchronous
call function looks something like this:

def call_async(f, *args, **kwargs):
    pipe_parent, pipe_child = Pipe()
    process = Process(target=call_dispatch, args=(pipe_child,))
    process.start()
    pipe_parent.send((f.__name__, args, kwargs))
    return (process, pipe_parent)

What this function does is creates a new Process
as well as a Pipe
(something that will allow us to communicate with that Process
) and runs the new process on a function called call_dispatch
(which we will define later). Once this function is running it sends to the new process the name of the desired function we wish to call, as well as the arguments we want to use.

The call_dispatch
function is itself very simple. It just waits for the function name and arguments to come from the pipe, calls the function at the top level with the given name, and then sends the return value back into the pipe once it is done.

def call_dispatch(pipe):
    name, args, kwargs = pipe.recv()
    output = globals()[name](*args, **kwargs)
    pipe.send(output)

Later on, when we want to wait for this function to finish, and get return value of it, we can use another function called call_await
, which waits for the called function to finish and then returns its return value.

def call_await(h):
    process, pipe_parent = h
    output = pipe_parent.recv()
    process.join()
    return output

Using these little functions makes some thing easy, such as asynchronously calling a function for each element of an array and then gathering the return values once they are all ready.

import time

def greet(i, name):
    print('%i Hello %s!' % (i, name))
    time.sleep(1)
    return i
    
if __name__ == '__main__':
    
    names = ['Dan', 'Chess', 'Tom', 'Mike']
    
    handles = []
    for i, name in enumerate(names):
        h = call_async(greet, i, name)
        handles.append(h)
        
    for h in handles:
        print(call_await(h))

Notice how evaluating this script takes only one second instead of four since each call to `greet` is performed in a separate process. Also notice how although the evaluation order can very, the return order is always the same since we await
for each of the processes to finish in sequence.

1 Hello Chess!
2 Hello Tom!
0 Hello Dan!
3 Hello Mike!
0
1
2
3

One little limitation to this approach is that each time we want to call a function asynchronous we have to spin up a whole new process. Unless the function you are calling takes at least a few seconds to execute, most of the time is going to be spent starting up new processes. To avoid this issue we can actually start each of our processes ahead of time before using them to call functions asynchronous. Let us make a new function which does this called fork
:

def fork():
    pipe_parent, pipe_child = Pipe()
    process = Process(target=call_dispatch, args=(pipe_child,))
    process.start()
    return (process, pipe_parent)

And we’ll change what happens in the call_dispatch
function. Now the dispatch function will run in an infinite loop, each time waiting for a new top level function to call with a given set of arguments.

def call_dispatch(pipe):
    while True:
        name, args, kwargs = pipe.recv()
        if name == 'exit':
            break
        else:
            output = globals()[name](*args, **kwargs)
            pipe.send(output)

You’ll notice that if this process get asked to call the exit
function it will just break out of this loop and finish gracefully. We can make another function called join
which does this for any new process allocated with fork
.

def join(h):
    process, pipe_parent = h
    pipe_parent.send(('exit', (), {}))
    process.join()

Now we just need to define call_async
and call_await
. These are similar to before but just assume the process has already been started:

def call_async(h, f, *args, **kwargs):
    process, pipe_parent = h
    pipe_parent.send((f.__name__, args, kwargs))

def call_await(h):
    process, pipe_parent = h
    return pipe_parent.recv()

Now we can start processes our ahead of time, and re-use them to call multiple different functions in parallel.

import time
    
def greet(i, name):
    print('%i Hello %s!' % (i, name))
    time.sleep(1)
    return i
    
def square(i):
    return i*i
    
if __name__ == '__main__':
    
    names = ['Dan', 'Chess', 'Tom', 'Mike']
    
    handles = [fork() for _ in range(len(names))]
    
    # Greet in Parallel
    
    for h, (i, name) in zip(handles, enumerate(names)):
        call_async(h, greet, i, name)
    
    for h in handles:
        print(call_await(h))

    # Square in Parallel
        
    for h, (i, name) in zip(handles, enumerate(names)):
        call_async(h, square, i)
    
    for h in handles:
        print(call_await(h))
        
    # Finish
        
    for h in handles:
        join(h)

Of course it isn’t perfect, and there are times when this abstraction will fail, but I hope I’ve shown how it isn’t too difficult to build simple useful things with the basic tools provided by the multiprocessing
module as long as you have a high level understanding of how it works. With this sort of setup you can quickly start to think how you might build worker pools and other sorts of useful structures for parallel programming.

With that all said, good luck and happy concurrent programming!

微信扫一扫,分享到朋友圈

Simple Concurrency in Python

T-Mobile and Sprint officially announce merger plans, new company to be called T-Mobile

上一篇

How the digital economy shapes American cities

下一篇

你也可能喜欢

Simple Concurrency in Python

长按储存图像,分享给朋友