Skip to content

Process and threads

  • Program: Sequence of instructions written in a programming language.
  • Process: Process is an instance of a program that is being executed.
  • Separated memory spaces.
  • One process cannot corrupt another one.
  • Thread: Is an unity of execution within a process.

Import packages

1
2
3
4
import multiprocessing
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

Multithreading

  • Concurrent execution
  • I/O bound tasks

Simulate some I/O expensive task.

1
2
3
4
def expensive_io_operation(index, sleep_time):
    print(f"Task {index}: Starting I/O operation...")
    time.sleep(sleep_time)  # Simulate the I/O wait time
    print(f"Task {index}: Finished I/O operation.")

Execute synchronously.

def do_something_that_takes_time():
    start_time = time.time()

    io_operation_execution_time_in_seconds = 1
    for index in range(5):
        # some I/O expensive task
        expensive_io_operation(index, io_operation_execution_time_in_seconds)

    end_time = time.time()
    print(f"Operation took {end_time - start_time:.0f} seconds")


do_something_that_takes_time()

Task 0: Starting I/O operation...

Task 0: Finished I/O operation.

Task 1: Starting I/O operation...

Task 1: Finished I/O operation.

Task 2: Starting I/O operation...

Task 2: Finished I/O operation.

Task 3: Starting I/O operation...

Task 3: Finished I/O operation.

Task 4: Starting I/O operation...

Task 4: Finished I/O operation.

Operation took 5 seconds

Execute the same tasks in parallel with threads.

def do_something_that_takes_time_using_threads():
    start_time = time.time()

    threads = []
    io_operation_execution_time_in_seconds = 1

    for index in range(5):
        thread = threading.Thread(
            target=expensive_io_operation,
            args=(index, io_operation_execution_time_in_seconds),
        )
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete (join)
    for thread in threads:
        # Blocks the main program until this specific thread
        # terminates
        thread.join()

    end_time = time.time()

    print(f"Operation took {end_time - start_time:.0f} seconds")


do_something_that_takes_time_using_threads()

Task 0: Starting I/O operation...

Task 1: Starting I/O operation...

Task 2: Starting I/O operation...

Task 3: Starting I/O operation...

Task 4: Starting I/O operation...

Task 0: Finished I/O operation.

Task 1: Finished I/O operation.

Task 2: Finished I/O operation.

Task 3: Finished I/O operation.

Task 4: Finished I/O operation.

Operation took 1 seconds

Thread Pool Executor

def log_a_number(number: int):
    time.sleep(1)
    return f"Number: {number}"


with ThreadPoolExecutor(max_workers=3) as executor:
    start_time = time.time()

    numbers = [index for index in range(5)]
    results = executor.map(log_a_number, numbers)

    for result in results:
        print(result)

    end_time = time.time()

    print(f"Operation took {end_time - start_time:.0f} seconds")

Number: 0

Number: 1

Number: 2

Number: 3

Number: 4

Operation took 2 seconds

Multiprocessing

  • Run processes in parallel
  • CPU bound tasks
  • Use multiple cores of CPU
1
2
3
4
def expensive_cpu_operation(index, sleep_time):
    print(f"Task {index}: Starting CPU operation...")
    time.sleep(sleep_time)  # Simulate the CPU wait time
    print(f"Task {index}: Finished CPU operation.")
def do_something_that_takes_time_using_cpus():
    start_time = time.time()

    processes = []
    cpu_operation_execution_time_in_seconds = 1

    for index in range(5):
        process = multiprocessing.Process(
            target=expensive_io_operation,
            args=(index, cpu_operation_execution_time_in_seconds),
        )
        processes.append(process)
        process.start()

    # Wait for all processes to complete (join)
    for process in processes:
        # Blocks the main program until this specific process
        # terminates
        process.join()

    end_time = time.time()

    print(f"Operation took {end_time - start_time:.0f} seconds")


do_something_that_takes_time_using_cpus()

Task 0: Starting I/O operation...

Task 1: Starting I/O operation...

Task 2: Starting I/O operation...

Task 3: Starting I/O operation...

Task 4: Starting I/O operation...

Task 0: Finished I/O operation.

Task 1: Finished I/O operation.

Task 2: Finished I/O operation.Task 3: Finished I/O operation.

Task 4: Finished I/O operation.

Operation took 1 seconds

Process Pool Executor

def square_of_number(number: int):
    time.sleep(1)
    return f"Square: {number ** 2}"


with ProcessPoolExecutor(max_workers=3) as executor:
    start_time = time.time()

    numbers = [index for index in range(5)]
    results = executor.map(square_of_number, numbers)

    for result in results:
        print(result)

    end_time = time.time()

    print(f"Operation took {end_time - start_time:.0f} seconds")

Square: 0

Square: 1

Square: 4

Square: 9

Square: 16

Operation took 2 seconds