10. 多任务


10. 多任务

多任务是指计算机系统在同一时间段内同时处理多个任务的能力,通过合理调度资源,实现任务之间的切换(并发)或并行执行,从而提高系统效率和资源利用率。

1. 基础概念

1. 并发与并行

并发和并行是多任务的两种表现形式

  • 并发:并发是一种逻辑上的同时执行概念。它强调的是在一个时间段内,多个任务能够交替执行,共享系统资源。并发并不要求任务真正意义上的同时执行,而是通过快速的任务切换,让用户感觉多个任务在同时进行。在单核 CPU 系统中,由于同一时刻只能执行一个任务,所以采用的就是并发的方式来处理多任务。Python 中的多线程和异步编程就是实现并发的常见手段。
  • 并行:并行是指多个任务在同一时刻真正地同时执行。这需要系统具备多个处理单元(如多核 CPU),每个处理单元可以独立地执行一个任务。并行能够充分利用多核处理器的优势,显著提高程序的执行效率,执行多任务,同时进行。Python 中的多进程就是实现并行的一种方式。

2. 进程和线程

进程:是程序在操作系统中的一次执行过程,是系统进行资源分配和调度运行的基本单位。简单来说,当你运行一个程序时,操作系统会为这个程序创建一个或多个进程,每个进程都有自己独立的内存空间、系统资源(如文件描述符、打开的网络连接等)以及执行状态。

线程:线程是进程内的一个执行单元,是程序执行的最小单位,是 CPU 调度和分派的基本单位。一个进程可以包含多个线程,这些线程共享进程的内存空间和系统资源,但每个线程有自己独立的执行栈和程序计数器,用于记录线程的执行状态。

3. 多线程、多进程与协程的对比

多线程

  • 概念:

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。多线程是指在一个进程内创建多个线程,这些线程可以并发执行不同的任务。

  • 工作原理:

在单核 CPU 中,多线程通过时间片轮转的方式实现并发。操作系统会为每个线程分配一定的时间片,当时间片用完后,就会切换到其他线程执行。在多核 CPU 中,多个线程可以并行地在不同的核心上执行。

  • 优点:

  • 线程间的通信较为简单,可以方便地共享全局变量。

  • 创建和销毁线程的开销相对较小,比进程更加轻量级。
  • 适合 I/O 密集型任务,在 I/O 操作时可以让出 CPU 资源,让其他线程执行。

  • 缺点:

  • 由于 Python 的全局解释器锁(GIL),同一时刻只有一个线程能执行 Python 字节码,对于 CPU 密集型任务,多线程无法充分利用多核 CPU 的优势。

  • 线程同步问题较为复杂,需要使用锁机制来保证数据的一致性,容易出现死锁等问题。

多进程

  • 概念: 进程是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位。多进程是指在一个操作系统中同时运行多个进程,每个进程都有自己独立的内存空间和系统资源。

  • 工作原理:

操作系统会为每个进程分配独立的内存空间和系统资源,进程之间通过进程间通信(IPC)机制进行数据交换和同步。多进程可以充分利用多核 CPU 的优势,实现真正的并行计算。

  • 优点:

  • 每个进程都有独立的内存空间,互不影响,避免了多线程中的数据共享问题和锁竞争问题。

  • 可以充分利用多核 CPU 的优势,适合 CPU 密集型任务。

  • 缺点:

  • 进程间的通信相对复杂,需要使用管道、队列等方式进行数据交换。

  • 创建和销毁进程的开销较大,比线程更加重量级。

协程

  • 概念:

协程,又称微线程,是一种用户态的轻量级线程。协程不像线程和进程那样由操作系统进行调度,而是由程序员自己控制协程的切换。

  • 工作原理:

协程在执行过程中可以暂停并保存当前的执行状态,然后在需要的时候恢复执行。协程的切换开销非常小,因为不需要像线程那样进行上下文切换。

  • 优点:

  • 协程是轻量级的线程,由用户自己控制上下文切换,不需要操作系统的干预,因此切换开销极小。

  • 适合 I/O 密集型任务,在 I/O 操作时可以主动让出控制权,提高程序的并发性能。
  • 代码结构清晰,避免了回调地狱的问题。

  • 缺点:

  • 协程本质上还是单线程,无法利用多核 CPU 的优势,对于 CPU 密集型任务,性能提升有限。

  • 协程的调试相对困难,因为协程的执行流程比较复杂。

4. Python 全局解释器锁(GIL)原理及影响

  • 原理:GIL 是 Python 解释器中的一个机制,它是一把互斥锁,确保在同一时刻只有一个线程能够执行 Python 字节码。这是为了保证 Python 的内存管理机制在多线程环境下的线程安全,因为 Python 的内存管理不是线程安全的。

  • 影响:对于 CPU 密集型任务,由于 GIL 的存在,多线程无法充分利用多核 CPU 的优势,甚至可能因为线程切换的开销而导致性能下降。而对于 I/O 密集型任务,GIL 不会成为瓶颈,因为在 I/O 操作时,线程会释放 GIL,让其他线程执行。

5. 适用场景分析

  • CPU 密集型任务:这类任务主要是进行大量的计算,如数值计算、图像处理等。由于 CPU 一直在忙碌,GIL 会成为性能瓶颈,因此适合使用多进程来充分利用多核 CPU 的优势。
  • I/O 密集型任务:这类任务主要是进行输入输出操作,如网络请求、文件读写等。在 I/O 操作时,线程会释放 GIL,让其他线程执行,因此适合使用多线程或协程来提高程序的并发性能。协程由于其轻量级和高效的上下文切换,在 I/O 密集型任务中表现更为出色。

2. 多进程

Python 的 multiprocessing 模块可让你生成新的进程,它提供了与 threading 模块相似的 API。下面是对 multiprocessing 模块的详细解释:

1. 主要特点

  • 并行处理:能够在多核 CPU 上并行运行多个进程,从而充分利用多核处理器的性能。
  • 资源隔离:每个进程都有自己独立的内存空间,避免了线程间共享数据可能带来的竞态条件。
  • 跨平台支持:在 Windows、Linux 和 macOS 等多个操作系统上都能正常工作。

2. 常用类和函数

1. Process

Process 类用于创建新的进程。其构造函数为:

Process(group=None, target=None, name=None, args=(), kwargs={})
  • 参数说明:
  • group:通常为 None,该参数是为了与 threading.Thread 兼容而保留的。
  • target:要在新进程中调用的可调用对象(如函数)。
  • name:进程的名称。
  • args:传递给 target 函数的位置参数元组。
  • kwargs:传递给 target 函数的关键字参数字典。
  • 常用方法:
  • start():启动进程。
  • join([timeout]):等待进程结束,timeout 为可选的超时时间。
  • terminate():强制终止进程。
  • is_alive():检查进程是否还在运行。

以下是一个简单的示例:

import multiprocessing

def worker(num):
    """进程要执行的任务"""
    print(f'Worker {num} started')
    print(f'Worker {num} finished')

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

在这个示例中:

  1. 定义了一个 worker 函数,它是每个进程要执行的任务。

  2. 使用 multiprocessing.Process 创建了 5 个进程,每个进程的目标函数是 worker,并传入不同的参数。

  3. 调用 start() 方法启动每个进程。

  4. 调用 join() 方法等待所有进程完成。

2. Queue

multiprocessing.Queue 类实现了一个多进程安全的队列,其工作方式和 Python 标准库中的 queue.Queue 类似。它提供了线程和进程安全的队列,可用于在多个进程之间传递消息和数据。该队列使用 pickle 模块来序列化对象,所以可以存储各种 Python 对象。其构造方法为:

Queue(maxsize=0)
Queue()
  • 参数说明:

  • maxsize:这是一个可选参数,用于指定队列的最大容量。其取值规则如下:

    • maxsize 被设置为大于 0 的整数时,队列就有了固定的容量限制。一旦队列中的元素数量达到这个上限,再调用 put 方法向队列中添加元素时,如果 block 参数为 True(默认值),那么该操作会被阻塞,直到队列中有元素被取出,从而腾出空间;如果 block 参数为 False,则会抛出 Full 异常。
    • maxsize 为 0 或者负数时,队列的大小没有限制,意味着可以不断地向队列中添加元素,不会因为队列满而阻塞。
  • 当调用无参构造方法时,它会创建一个没有大小限制的队列。

  • 常用方法:

  • put(item[, block[, timeout]]):将 item 放入队列。blockTrue 时,如果队列已满则阻塞,timeout 为可选的超时时间。

  • get([block[, timeout]]):从队列中取出一个元素。blockTrue 时,如果队列为空则阻塞,timeout 为可选的超时时间。
  • qsize():返回队列中元素的数量。
  • empty():判断队列是否为空。
  • full():判断队列是否已满。

下面是一个使用 Queue 的示例:

import multiprocessing

def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f'Produced {i}')

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'Consumed {item}')

if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=producer, args=(queue,))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))

    p1.start()
    p2.start()

    p1.join()
    queue.put(None)
    p2.join()

在这个示例中:

  1. producer 函数向队列中放入 5 个元素。
  2. consumer 函数从队列中取出元素,直到收到 None 为止。
  3. 使用 multiprocessing.Queue 创建一个队列,并在两个进程之间共享。

3. Pool

multiprocessing库中的Pool 类用于创建进程池,可方便地管理多个进程。构造函数为

Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
  • 参数说明:
  • processes:可选,进程池中的进程数量,默认为 CPU 核心数。
  • initializer:可选,每个工作进程启动时要调用的可调用对象。
  • initargs:传递给 initializer 的参数元组。
  • maxtasksperchild:可选,每个工作进程最多可完成的任务数,完成后会被新的进程替换。
  • context :为可选参数,用于指定创建进程池时使用的上下文。上下文可以影响进程的创建方式(如 spawnfork 等)。一般情况下,你无需手动指定该参数,使用默认值即可。
  • 常用方法:
  • apply(func, args=(), kwds={}):同步执行函数,会阻塞直到结果返回。
  • apply_async(func, args=(), kwds={}, callback=None, error_callback=None):异步执行函数,立即返回一个 AsyncResult 对象。
  • map(func, iterable[, chunksize]):将 func 应用到 iterable 的每个元素上,同步执行。
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]]):异步执行 map 操作,返回一个 AsyncResult 对象。
  • close():关闭进程池,不再接受新的任务。
  • terminate():立即终止进程池中的所有进程。
  • join():等待进程池中的所有进程完成,必须在 close()terminate() 之后调用。

以下是一个使用 Pool 的示例:

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(square, [1, 2, 3, 4, 5])
        print(results)

在这个示例中:

  1. 定义了一个 square 函数,用于计算一个数的平方。
  2. 使用 multiprocessing.Pool 创建一个包含 4 个进程的进程池。
  3. 调用 pool.map() 方法将 square 函数应用到列表中的每个元素上,并返回结果列表。

3. 注意事项

  • if __name__ == '__main__'::在 Windows 和一些其他操作系统上,使用 multiprocessing 模块时,必须将创建和启动进程的代码放在 if __name__ == '__main__': 语句块中,以避免递归创建进程。
  • 资源管理:要确保在使用完进程、队列等资源后进行适当的清理,避免资源泄漏。

3. 多线程

1. 主要特点

threading 模块是 Python 标准库中用于实现多线程编程的模块。多线程允许程序在同一时间执行多个任务,从而提高程序的执行效率,特别是在 I/O 密集型任务中。

2 . 常用类和函数

1. Thread

Thread 类是 threading 模块中最核心的类,用于创建和管理线程。其构造方法为:

Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
  • 参数说明:

  • group:通常为 None,该参数是为了与 threading 模块的早期版本兼容而保留的。

  • target:要在新线程中调用的可调用对象(如函数)。
  • name:线程的名称,默认是 Thread-N 的形式,N 是一个递增的整数。
  • args:传递给 target 函数的位置参数元组。
  • kwargs:传递给 target 函数的关键字参数字典。
  • daemon:布尔值,指定线程是否为守护线程。如果为 True,主线程退出时,守护线程会自动终止;如果为 False 或未指定,则线程是非守护线程。

  • 常用方法

  • start():启动线程,调用线程的 run() 方法。一个线程只能启动一次。

  • run():定义线程要执行的任务,通常不需要直接调用,而是通过 start() 方法间接调用。
  • join([timeout]):等待线程结束,timeout 为可选的超时时间。如果不指定 timeout,则会一直等待直到线程结束。
  • is_alive():检查线程是否还在运行。
  • getName():返回线程的名称。
  • setName(name):设置线程的名称。
  • isDaemon():检查线程是否为守护线程。
  • setDaemon(daemonic):设置线程是否为守护线程,必须在 start() 方法之前调用。

示例代码

```python import threading

def worker(): print('Worker thread is running')

if name == 'main': t = threading.Thread(target=worker) t.start() t.join() print('Main thread is done') ```

创建线程有两种方法:

1.通过Thread类,并传入目标函数作为线程体来创建线程

2.继承threading.Thread类并重写run()方法,run()方法中的内容作为线程体来创建线程

示例:

import threading
import time

# 自定义函数作为线程体
def thread_body():
    t = threading.current_thread()
    for n in range(5):
        print('第{}次执行线程{}'.format(n+1, t.name))
        time.sleep(1)
    print('线程{}执行完成'.format(t.name))
def main():
    t1 = threading.Thread(target=thread_body)
    t1.start()

    t2 = threading.Thread(target=thread_body, name='MyThread')
    t2.start()

if __name__ == '__main__':
    main()
import threading
import time

# 继承Thread类并重写run()方法,作为线程体
class MyThread(threading.Thread):
    def __init__(self, name = None):
        super().__init__(name = name)

    # 重写run()方法
    def run(self):
        t = threading.current_thread()
        for n in range(5):
            print('第{}次执行线程{}'.format(n + 1, t.name))
            time.sleep(1)
        print('线程{}执行完成'.format(t.name))

def main():
    t1 = MyThread()
    t1.start()

    t2 = MyThread(name='MyThread')
    t2.start()

if __name__ == '__main__':
    main()

2. Lock

Lock 类用于实现线程同步,确保同一时间只有一个线程可以访问共享资源,避免数据竞争和不一致的问题,实现多线程同步,多线程同步是保证线程安全的重要手段,但线程同步客观上会导致性能下降。构造方法为:

Lock()

常用方法:

  • acquire([blocking=True, timeout=-1]):获取锁。如果 blockingTrue(默认值),则会阻塞直到锁被释放;如果 blockingFalse,则会立即返回,若锁已被占用则返回 False,否则返回 Truetimeout 为可选的超时时间,若为 -1 则表示无限等待。
  • release():释放锁。只有持有锁的线程才能释放锁。

示例代码

import threading

lock = threading.Lock()
shared_variable = 0

def increment():
    global shared_variable
    for _ in range(100000):
        lock.acquire()
        try:
            shared_variable += 1
        finally:
            lock.release()

threads = []
for _ in range(2):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Final value of shared_variable: {shared_variable}")

多个线程启动后,会同时尝试通过 lock.acquire() 方法获取锁。若锁未锁定,线程可获取并将其状态变为锁定,接着执行线程,最后再释放锁;若已锁定,线程会被阻塞,等待其他线程。

with 语句:可以使用 with 语句简化 Lock 的使用,with lock: 会自动处理锁的获取和释放,避免因忘记释放锁而导致死锁。

异常处理:使用 try...finally 结构确保在发生异常时也能正确释放锁,保证程序的健壮性,避免因为异常而无法释放锁而造成死锁。

3. RLock

RLock 类(可重入锁)与 Lock 类类似,但允许同一个线程多次获取锁而不会造成死锁。构造方法为:

RLock()

RLock 的方法与 Lock 类的方法相同,包括 acquire()release()

示例代码:

import threading

rlock = threading.RLock()

def recursive_function(level):
    rlock.acquire()
    try:
        if level > 0:
            print(f"Level {level}: Lock acquired")
            recursive_function(level - 1)
        else:
            print("Reached base level")
    finally:
        rlock.release()
        print(f"Level {level}: Lock released")

thread = threading.Thread(target=recursive_function, args=(3,))
thread.start()
thread.join()

4. Semaphore

Semaphore 类用于控制同时访问某个资源的线程数量。构造方法为:

Semaphore(value=1)

value 表示允许同时访问资源的线程数量,默认为 1。

常用方法

  • acquire([blocking=True, timeout=-1]):获取信号量。如果信号量的值大于 0,则将其减 1 并返回 True;如果信号量的值为 0 且 blockingTrue,则会阻塞直到有其他线程释放信号量;如果 blockingFalse,则会立即返回 Falsetimeout 为可选的超时时间。
  • release():释放信号量,将信号量的值加 1。

示例代码

import threading
import time

semaphore = threading.Semaphore(2)

def worker(id):
    semaphore.acquire()
    try:
        print(f"Thread {id} is working")
        time.sleep(2)
    finally:
        semaphore.release()
        print(f"Thread {id} is done")

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

5. Condition

在 Python 里,如果两个线程之间有依赖关系,线程之间必须进行通信,互相协调才能完成工作。实现线程间通信,可以使用threading模块中的Condition类和Event类。threading.Condition 类是一个用于线程同步的工具,它结合了锁和条件变量的功能,除了提供与Lock类似的acquire()release()方法外,还提供了wait(),notify(),notify_all()方法。 Condition 类的构造方法与相关函数:

构造方法:

threading.Condition(lock=None)
  • 参数:
  • lock:这是一个可选参数,代表要使用的锁对象。若不传入该参数,会默认创建一个 RLock 对象。

相关方法:

1. acquire(*args)

该函数用于获取锁,其行为和底层锁的 acquire() 方法一致。

2. release()

此函数用于释放锁,其行为和底层锁的 release() 方法相同。

3. wait(timeout=None)

该函数会让当前线程等待,直至被其他线程使用 notify() 或者 notify_all() 唤醒,或者达到指定的超时时间。

import threading

condition = threading.Condition()

def worker():
    with condition:
        print("Worker is waiting...")
        condition.wait()
        print("Worker is awake!")

t = threading.Thread(target=worker)
t.start()

# 主线程唤醒工作线程
with condition:
    condition.notify()
t.join()

4. notify(n=1)

该函数用于唤醒至少 n 个在该条件变量上等待的线程。默认情况下,n 为 1。

5. notify_all()

此函数用于唤醒所有在该条件变量上等待的线程。

下面是一个综合示例,展示了如何使用 Condition 类:

import threading
import time

# 创建一个 Condition 对象
condition = threading.Condition()
# 共享资源
shared_resource = False

def producer():
    global shared_resource
    while True:
        with condition:
            # 生产资源
            print("Producer is producing...")
            time.sleep(2)
            shared_resource = True
            # 通知消费者资源已准备好
            condition.notify()
            # 等待消费者消费资源
            condition.wait()

def consumer():
    global shared_resource
    while True:
        with condition:
            # 等待资源
            while not shared_resource:
                condition.wait()
            # 消费资源
            print("Consumer is consuming...")
            time.sleep(2)
            shared_resource = False
            # 通知生产者资源已消费
            condition.notify()

# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()

6. Event

Event 类用于线程间的通信,一个线程可以设置事件,其他线程可以等待事件的发生。构造方法为:

Event()

创建一个事件对象,初始状态为未设置。

常用方法

  • set():设置事件,将事件的内部标志设置为 True
  • clear():清除事件,将事件的内部标志设置为 False
  • is_set():检查事件的内部标志是否为 True
  • wait([timeout]):等待事件被设置,若事件的内部标志为 True 则立即返回;若为 FalsetimeoutNone(默认值),则会一直阻塞直到事件被设置;若 timeout 不为 None,则会阻塞指定的时间,超时后返回 False

示例代码

import threading
import time

event = threading.Event()

def waiter():
    print("Waiting for the event...")
    event.wait()
    print("Event is set!")

thread = threading.Thread(target=waiter)
thread.start()

time.sleep(3)
event.set()
thread.join()

7. ThreadPoolExecutor

concurrent.futures.ThreadPoolExecutor 是 Python 3.2 引入的高级线程池实现,它位于标准库的 concurrent.futures 模块中。该类提供了简洁易用的接口,可用于管理和执行线程池

ThreadPoolExecutor 类的构造方法如下:

ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
  • 参数说明:
  • max_workers:线程池中的最大线程数。如果不指定,在 Python 3.8 及以上版本中,默认值为 min(32, os.cpu_count() + 4);在 Python 3.7 及以下版本中,默认值为 os.cpu_count() or 1
  • thread_name_prefix:线程名称的前缀,用于区分不同线程池中的线程。默认是空字符串。
  • initializer:可选的可调用对象,在每个工作线程启动时调用。
  • initargs:传递给 initializer 的参数元组。

常用方法:

  1. submit(fn, *args, **kwargs)

  2. 功能:把一个可调用对象 fn 及其参数 *args**kwargs 提交到线程池来执行,并且返回一个 Future 对象。

  3. 返回值Future 对象,可用于获取任务的执行结果、检查任务状态等。
  4. 示例
import concurrent.futures

def add(a, b):
    return a + b

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(add, 3, 5)
    result = future.result()
    print(result)
  1. map(func, *iterables, timeout=None, chunksize=1)

  2. 功能:类似于 Python 内置的 map 函数,将可调用对象 func 依次应用到 iterables 中的每个元素上,返回一个迭代器,迭代器中的元素是每个任务的执行结果。

  3. 参数:
  4. func:要执行的可调用对象。
  5. *iterables:可迭代对象,作为 func 的参数。
  6. timeout:可选参数,指定等待结果的最大时间。
  7. chunksize:指定将 iterables 分割成的块大小,仅在处理较大的可迭代对象时有效。
  8. 返回值:一个迭代器,按顺序包含每个任务的结果。
  9. 示例
import concurrent.futures

def square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(square, numbers)
    for result in results:
        print(result)
  1. shutdown(wait=True, cancel_futures=False)

  2. 功能:关闭线程池,阻止新任务提交到线程池。

  3. 参数:
  4. wait:布尔值,若为 True(默认),则会等待所有已提交的任务完成;若为 False,则会立即返回。
  5. cancel_futures:布尔值,若为 True,会尝试取消所有未开始的任务。
  6. 示例
import concurrent.futures
import time

def task():
    time.sleep(2)
    return "Task completed"

executor = concurrent.futures.ThreadPoolExecutor()
future = executor.submit(task)
executor.shutdown(wait=True)
print("ThreadPoolExecutor is shut down.")
  1. __enter__()__exit__()

  2. 功能:这两个方法使得 ThreadPoolExecutor 可以使用 with 语句来管理。__enter__() 方法会返回 ThreadPoolExecutor 实例本身,而 __exit__() 方法会在 with 代码块结束时调用 shutdown(wait=True) 方法,确保资源被正确释放。

  3. 示例
import concurrent.futures

def multiply(a, b):
    return a * b

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(multiply, 4, 6)
    result = future.result()
    print(result)
# 离开 with 代码块时,线程池会自动关闭
  1. Future 对象相关的操作

虽然这些操作不是 ThreadPoolExecutor 类直接的方法,但在使用线程池时经常会用到 Future 对象的方法:

  • Future.result(timeout=None):获取任务的执行结果。如果任务还未完成,会阻塞当前线程直到任务完成或超时。
  • Future.exception(timeout=None):获取任务抛出的异常。如果任务还未完成,会阻塞当前线程直到任务完成或超时。
  • Future.done():检查任务是否已经完成。
  • Future.cancel():尝试取消任务。如果任务已经开始或已经完成,则无法取消。
  • Future.running():检查任务是否正在运行。

3. 注意事项

  1. 线程安全与锁机制 多线程共享同一进程的内存空间,对共享数据(如全局变量、对象属性)的并发访问可能导致竞态条件(Race Condition)。因此,需要通过锁(如 threading.Lock)、信号量(Semaphore)或队列(Queue)等同步机制来保证线程安全。若未正确使用同步,可能导致数据不一致或程序崩溃。
  2. GIL(全局解释器锁)的限制 在 CPython 中,由于 GIL 的存在,同一时间只有一个线程能执行 Python 字节码。这使得多线程在 CPU 密集型任务中性能提升有限(甚至可能因线程切换开销而降低性能),更适合 I/O 密集型任务(如网络请求、文件读写)。若需充分利用多核 CPU,建议使用多进程。
  3. 资源管理 虽然线程的资源(如内存)由进程统一管理,但若线程中存在未释放的外部资源(如文件句柄、网络连接),仍可能导致资源泄漏。应确保在线程结束时正确释放资源,例如使用 try...finally 块或上下文管理器。
  4. 线程的正确终止 Python 中没有直接强制终止线程的方法(threading.Thread 没有类似 terminate 的函数),通常需要通过设置标志位(如 Event 或共享变量)让线程自行退出,以避免数据不一致或资源未清理的问题。
  5. 死锁与活锁 多线程中若多个线程相互等待对方释放锁,可能导致死锁。需谨慎设计锁的获取顺序,或使用超时机制(如 acquire(timeout))避免死锁。此外,也要注意活锁(线程反复重试但无法推进)的情况。
  6. 线程数量与性能 过多线程会增加上下文切换开销,可能降低程序性能。需根据任务类型和系统资源合理控制线程数量,例如使用线程池(concurrent.futures.ThreadPoolExecutor)来管理线程生命周期。

总结:多线程的核心注意事项集中在线程安全GIL 的影响资源管理上,而多进程更关注进程隔离与系统资源的显式管理。在实际开发中,需根据任务特性(I/O 密集型 vs. CPU 密集型)和场景选择合适的并发模型。

4. 异步编程

Python 异步编程能让程序在等待 I/O 操作时执行其他任务,从而提升效率。以下为步编程里几个关键的概念以及相关示例。

1. asyncio

asyncio是 Python 自带的用于异步编程的库,它借助协程(coroutine)、事件循环(event loop)等机制达成异步操作。

2. 协程(Coroutine)

协程是一种特殊的函数,可在执行过程中暂停并在之后恢复。在 Python 里,使用async def来定义协程函数。

3. await关键字

await用于暂停协程的执行,直至等待的异步操作完成。

0 条评论

发表评论

暂无评论,欢迎发表您的观点!