文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一文了解:Python 并发、并行、同步、异步、阻塞、非阻塞

2024-11-29 19:15

关注

在 Python 中,理解并发(Concurrency)、并行(Parallelism)、同步(Synchronization)、异步(Asynchronous)、阻塞(Blocking)和非阻塞(Non-blocking)是非常重要的,因为它们是构建高性能应用程序的关键概念。

1. 并发(Concurrency)

并发是指程序在同一时间段内可以处理多个任务的能力。具体来说,程序看起来像是同时执行多个任务,但实际上它们是在交替执行。

1.1 示例:多线程

import threading
import time
def worker():
    print(f"Thread {threading.current_thread().name} started")
    time.sleep(2)
    print(f"Thread {threading.current_thread().name} finished")
# 创建多个线程
threads = []
for i in range(5):
    thread = threading.Thread(target=worker, name=f"Thread-{i}")
    threads.append(thread)
    thread.start()
# 等待所有线程完成
for thread in threads:
    thread.join()
print("All threads finished")

2. 并行(Parallelism)

并行是指程序在同一时间可以真正同时执行多个任务的能力。通常需要硬件支持,例如多核处理器。

2.1 示例:多进程

import multiprocessing
def worker():
    print(f"Process {multiprocessing.current_process().name} started")
    time.sleep(2)
    print(f"Process {multiprocessing.current_process().name} finished")
# 创建多个进程
processes = []
for i in range(5):
    process = multiprocessing.Process(target=worker, name=f"Process-{i}")
    processes.append(process)
    process.start()
# 等待所有进程完成
for process in processes:
    process.join()
print("All processes finished")

3. 同步(Synchronization)

同步是指在多线程或多进程环境中,通过锁或其他机制确保资源的安全访问。

3.1 示例:锁(Lock)

import threading
def worker(lock):
    with lock:
        print(f"Thread {threading.current_thread().name} started")
        time.sleep(2)
        print(f"Thread {threading.current_thread().name} finished")
lock = threading.Lock()
# 创建多个线程
threads = []
for i in range(5):
    thread = threading.Thread(target=worker, args=(lock,), name=f"Thread-{i}")
    threads.append(thread)
    thread.start()
# 等待所有线程完成
for thread in threads:
    thread.join()
print("All threads finished")

4. 异步(Asynchronous)

异步是指程序可以在等待某个操作完成的同时继续执行其他任务。异步编程通常使用回调函数或协程。

4.1 示例:异步 I/O(使用 asyncio)

import asyncio
async def worker():
    print(f"Worker {asyncio.current_task().get_name()} started")
    await asyncio.sleep(2)
    print(f"Worker {asyncio.current_task().get_name()} finished")
async def main():
    tasks = []
    for i in range(5):
        task = asyncio.create_task(worker(), name=f"Worker-{i}")
        tasks.append(task)
    await asyncio.gather(*tasks)
asyncio.run(main())

5. 阻塞(Blocking)

阻塞是指程序在执行某个操作时会暂停执行,直到该操作完成。例如,当执行一个阻塞的 I/O 操作时,程序会等待直到 I/O 操作完成。

5.1 示例:阻塞 I/O

import time
def blocking_io():
    print("Starting blocking IO")
    time.sleep(5)
    print("Finished blocking IO")
blocking_io()

6. 非阻塞(Non-blocking)

非阻塞是指程序在执行某个操作时不会暂停执行,而是继续执行其他任务。通常用于网络 I/O 或文件 I/O。

6.1 示例:非阻塞 I/O(使用 select)

import select
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('localhost', 8000))
server_socket.listen(5)
sockets_list = [server_socket]
def handle_client(client_socket):
    request = client_socket.recv(1024)
    print(f"Received: {request.decode()}")
    response = "Hello, World!\n"
    client_socket.send(response.encode())
    client_socket.close()
while True:
    read_sockets, _, exception_sockets = select.select(sockets_list, [], sockets_list)
    for notified_socket in read_sockets:
        if notified_socket == server_socket:
            client_socket, client_address = server_socket.accept()
            sockets_list.append(client_socket)
        else:
            handle_client(notified_socket)
    for notified_socket in exception_sockets:
        sockets_list.remove(notified_socket)
        notified_socket.close()
接口自动化相关代码示例,供参考:

1. 并发(Concurrency)

1.1 示例:多线程发送 HTTP 请求

import threading
import requests
import time
def send_request(url, headers, payload):
    response = requests.post(url, headers=headers, jsnotallow=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")
# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]
# 创建线程列表
threads = []
# 创建并启动线程
for test_case in test_cases:
    thread = threading.Thread(target=send_request, args=(test_case["url"], test_case["headers"], test_case["payload"]))
    threads.append(thread)
    thread.start()
# 等待所有线程完成
for thread in threads:
    thread.join()
print("All requests finished")

2. 并行(Parallelism)

2.1 示例:多进程发送 HTTP 请求

import multiprocessing
import requests
import time
def send_request(url, headers, payload):
    response = requests.post(url, headers=headers, jsnotallow=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")
# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]
# 创建进程列表
processes = []
# 创建并启动进程
for test_case in test_cases:
    process = multiprocessing.Process(target=send_request, args=(test_case["url"], test_case["headers"], test_case["payload"]))
    processes.append(process)
    process.start()
# 等待所有进程完成
for process in processes:
    process.join()
print("All requests finished")

3. 同步(Synchronization)

3.1 示例:使用锁同步多线程

import threading
import requests
import time
def send_request(lock, url, headers, payload):
    with lock:
        response = requests.post(url, headers=headers, jsnotallow=payload)
        print(f"Response status code: {response.status_code}")
        print(f"Response content: {response.text}")
# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]
# 创建锁
lock = threading.Lock()
# 创建线程列表
threads = []
# 创建并启动线程
for test_case in test_cases:
    thread = threading.Thread(target=send_request, args=(lock, test_case["url"], test_case["headers"], test_case["payload"]))
    threads.append(thread)
    thread.start()
# 等待所有线程完成
for thread in threads:
    thread.join()
print("All requests finished")

4. 异步(Asynchronous)

4.1 示例:使用 asyncio 发送异步 HTTP 请求

import asyncio
import aiohttp
async def send_request(url, headers, payload):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, jsnotallow=payload) as response:
            print(f"Response status code: {response.status}")
            response_text = await response.text()
            print(f"Response content: {response_text}")
# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]
async def main():
    tasks = []
    for test_case in test_cases:
        task = asyncio.create_task(send_request(test_case["url"], test_case["headers"], test_case["payload"]))
        tasks.append(task)
    await asyncio.gather(*tasks)
asyncio.run(main())

5. 阻塞(Blocking)

5.1 示例:阻塞式发送 HTTP 请求

import requests
import time
def send_request(url, headers, payload):
    response = requests.post(url, headers=headers, jsnotallow=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")
# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]
# 依次发送请求
for test_case in test_cases:
    send_request(test_case["url"], test_case["headers"], test_case["payload"])
print("All requests finished")

6. 非阻塞(Non-blocking)

6.1 示例:使用 select 发送非阻塞 HTTP 请求

import select
import socket
import requests
import time
def send_request(url, headers, payload):
    response = requests.post(url, headers=headers, jsnotallow=payload)
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.text}")
# 测试数据
test_cases = [
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token1"},
        "payload": {"key": "value1"}
    },
    {
        "url": "https://api.example.com/v1/resource",
        "headers": {"Authorization": "Bearer token2"},
        "payload": {"key": "value2"}
    }
]
# 创建套接字列表
sockets_list = []
# 创建并启动套接字
for test_case in test_cases:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(("localhost", 8000))
    sockets_list.append(sock)
# 监听套接字
while sockets_list:
    ready_to_read, _, _ = select.select(sockets_list, [], [])
    for sock in ready_to_read:
        send_request(test_cases[sockets_list.index(sock)]["url"], test_cases[sockets_list.index(sock)]["headers"], test_cases[sockets_list.index(sock)]["payload"])
        sockets_list.remove(sock)
print("All requests finished")

7. 总结

通过以上示例,我们详细介绍了 Python 中的几个关键概念:

并发(Concurrency):在同一时间段内处理多个任务。

并行(Parallelism):在同一时间真正同时执行多个任务。

同步(Synchronization):确保多线程或多进程环境下的资源安全访问。

异步(Asynchronous):在等待某个操作完成的同时继续执行其他任务。

阻塞(Blocking):在执行某个操作时会暂停执行,直到该操作完成。

非阻塞(Non-blocking):在执行某个操作时不会暂停执行,而是继续执行其他任务。

来源:测试开发学习交流内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯