Parallel Processing advice

Associate
Joined
30 Mar 2004
Posts
1,148
Location
West Wing
Hi,

I have a python application that uses threading to connect with a range of TCP endpoints in parallel. The application imports a list of say 100 IP addresses, splits the list into chunks of 10, then starts threads for each chunk of 10 to poll the sockets sequentially. Thereby doings its job in a small space of time instead of reading the big list sequentially which would take ages.

The problem is when the list gets really big, like over 10,000 sockets. Then you have potentially hundreds of threads and things start to break down.

Whats the best way of approaching this? Should I split the main list into smaller chunks then feed into multiple instances of the application? Then it would have say 10 instances of the application, each one splitting that list into threads. Or do i need a beefier machine and just run 10,000 threads at once in the same application. Appreciate any advice!
 
I've only dabbled with python so can't suggest how to do it in that language. But as a general principle you should look at having a pool of objects which each connect in parallel. E.g. a pool of 20 objects, each with a connection, which means you can connect up to 20 times in parallel. Another object feeds them the next item of work as they become free.

As each object in the pool becomes free it is available to pick up, or be given, the next item to work on. So you progress through the queue at a known rate.
 
Your limiting factors are going to be number of threads and possibly socket connections. Depending on what you're doing with the remote sockets then maybe asynchronous I/O will give you better performance as you can try and dodge threads being blocked on connection timeouts stuffing up your throughput.

I don't know python, but a quick google of 'python asyncio nonblocking' throws up some interesting looking stuff.

In java I would just keep it as a single process rather than running multiple instances on the same machine, I would hazard a guess that python would be similar. Spreading your load across multiple machine will help obviously.

EDIT: added nonblocking as its important for the problem
 
Last edited:
I've only dabbled with python so can't suggest how to do it in that language. But as a general principle you should look at having a pool of objects which each connect in parallel. E.g. a pool of 20 objects, each with a connection, which means you can connect up to 20 times in parallel. Another object feeds them the next item of work as they become free.

As each object in the pool becomes free it is available to pick up, or be given, the next item to work on. So you progress through the queue at a known rate.

The most obvious thing to do is use a thread pool - basically what Hades said. I don't use Python but I would imagine it must must have some API/infrastructure that supports that.
 
Look into Python multiprocessing. I used this the other week to extract / resize 20,000 photos from a network share. I read image names in from a csv, carved the data up into partitions (chunks) of around 100 then fed it through multiprocessing to speed things up. Multiprocessing calls the process() method which does whatever it needs to and returns the results. Those results are then fed to handleOutput() where you can dump them out to disk or whatever. It's important to note that handleOutput is ran on the main thread; trying to write to disk etc from within process() will cause a whole workd of problems because multiple threads may try and write to disk at the same time and you get garbled output.

It took me ages to get it working so I've ripped all the bits out my script and pulled a quick sample for you, hopefully it should help. If not, let me know.

It creates a fake ips.txt with 100,000 lines in it then processes them in batches of 1000. After each batch completes it dumps the result to results.txt.

Code:
import multiprocessing

import datetime
from random import randint

import numpy
import os
import time

import pandas as pd


def printProgressBar(iteration, total, prefix='', suffix='', decimals=1, length=100, fill='X'):
    """
    Call in a loop to create terminal progress bar
    @params:
        iteration   - Required  : current iteration (Int)
        total       - Required  : total iterations (Int)
        prefix      - Optional  : prefix string (Str)
        suffix      - Optional  : suffix string (Str)
        decimals    - Optional  : positive number of decimals in percent complete (Int)
        length      - Optional  : character length of bar (Int)
        fill        - Optional  : bar fill character (Str)
   https://gist.github.com/snakers4/91fa21b9dda9d055a02ecd23f24fbc3d
    """
    percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
    filledLength = int(length * iteration // total)
    bar = fill * filledLength + '-' * (length - filledLength)
    print('\r%s: %s |%s| %s%% %s' % (datetime.datetime.now(), prefix, bar, percent, suffix))
    # Print New Line on Complete
    if iteration == total:
        print()

def process(df):
    """Process partitioned data and do something to generate your results"""

    # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    # Simulate work, obviously remove this
    # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    time.sleep(randint(0, 2))

    result = []

    for item in df['IP']:
        result.append("Done processing %s" % item)

    return result;

def handleOutput(result):
    """Handle results from process(). Uses single-thread to append to result file safely."""
    f = open('results.txt', 'a')
    for item in result:
        f.write(item + '\n')
    f.close()

if __name__ == '__main__':

    # Cleanup from previous run
    os.remove("results.txt")

    # Create a fake workload, you'll probably want to supply your own list
    f = open('ips.csv', 'w')
    f.write('IP\n')
    for i in range(100000):
        f.write(str(i) + '\n')
    f.close()

    df = pd.read_csv("ips.csv", header=0)

    # Number of threads to use
    threads = 16
    # Partition work so that each thread gets around 100 things to work on at once before it finishes and calls handleOutput()
    partition_size_target = 1000

    partitions = int(len(df) / partition_size_target)
    df_split = numpy.array_split(df, partitions)
    task_count = len(df_split)
    pool = multiprocessing.Pool(threads)
    result_objs = []
    for d in df_split:
        result = pool.apply_async(process, (d,), callback=handleOutput)
        result_objs.append(result)

    # Initial call to print 0% progress
    previous_incomplete_count = -1
    while True:
        incomplete_count = sum(1 for x in result_objs if not x.ready())
        # This saves memory by removing complete jobs from result_objs.
        # I was processing images so needed to do this as it contained the bitmap data, you may not
        result_objs = [x for x in result_objs if not x.ready()]

        if incomplete_count == 0:
            print 'Complete'
            break

        if (incomplete_count != previous_incomplete_count):
            printProgressBar(task_count - incomplete_count, task_count, prefix='Progress:', decimals=3, suffix='Complete', length=50)

        previous_incomplete_count = incomplete_count
        time.sleep(.5)

    pool.close()
    pool.join()


https://pastebin.com/yjak9NEw




I'm very much not a Python expert though!
 
Dunno if it's anything alike but when playing with parallel threads in powershell I found things started to break when dealing with large numbers.
Ended up taking the initial list, splitting in into smaller chunks and then putting those chunks through the parallel loops.
 
Remember python has a global interpreter lock (GIL). So if you use threading, it won't scale well potentially as technically only one thread can run at a time and it's context switching when sleeps or system calls occurs. You'd need to look into multiprocessing to get better performance and perhaps the asyncio/threading within each subprocess to avoid too many processes. A healthy balance between the two.
 
As others have said... There should be an upper limit of the number of threads. Don't just create one thread per 10. So maybe 100 threads and when they're finished with one they get the next value and process / poll it.
 
Back
Top Bottom