- 在VisualStudio中部署GDAL库的C++版本(包括SQLite、PROJ等依赖)
- Android开机流程介绍
- STM32CubeMX教程31USB_DEVICE-HID外设_模拟键盘或鼠标
- 深入浅出Java多线程(五):线程间通信
在当今计算机科学和软件工程的领域中,池化技术如线程池、连接池和对象池等已经成为优化资源利用率和提高软件性能的重要工具。然而,在 Python 的协程领域,我们却很少见到类似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。为什么会这样呢?
首先,Python Coroutine 的特性使得池化技术在协程中的应用相对较少。与像 Golang 这样支持有栈协程的语言不同,Python Coroutine 是无栈的,无法跨核执行,从而限制了协程池发挥多核优势的可能性.
其次,Python Coroutine 的轻量级和快速创建销毁的特性,使得频繁创建和销毁协程并不会带来显著的性能损耗。这也解释了为什么 Python 官方一直没有引入 CoroutinePoolExecutor.
然而,作为开发者,我们仍然可以在特定场景下考虑协程的池化。虽然 Python Coroutine 轻量,但在一些需要大量协程协同工作的应用中,池化技术能够提供更方便、统一的调度子协程的方式。尤其是在涉及到异步操作的同时需要控制并发数量时,协程池的优势就显而易见了.
关于 Python 官方是否会在未来引入类似于 TaskGroup 的 CoroutinePoolExecutor,这或许是一个悬而未决的问题。考虑到 Python 在异步编程方面的快速发展,我们不能排除未来可能性的存在。或许有一天,我们会看到 TaskGroup 引入一个 max_workers 的形参,以更好地支持对协程池的需求.
在实际开发中,我们也可以尝试编写自己的 CoroutinePoolExecutor,以满足特定业务场景的需求。通过合理的设计架构和对数据流的全局考虑,我们可以最大程度地发挥协程池的优势,提高系统的性能和响应速度.
在接下来的文章中,我们将探讨如何设计和实现一个简单的 CoroutinePoolExecutor,以及在实际项目中的应用场景。通过深入理解协程池的工作原理,我们或许能更好地利用这一技术,使我们的异步应用更为高效.
如何开始编写 CoroutinePoolExecutor,首先我们要明确出其适用范畴、考虑到使用方式和其潜在的风险点:
有了上述 3 点的考量,我们决定将 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。这样的好处在于,作为学习者一方面可以了解 ThreadPoolExecutor 的内部实现机制,另一方面站在巨人肩膀上的编程借鉴往往会事半功倍,对于自我的提升也是较为明显的.
在考虑这些因素的同时,我们将继续深入研究协程池的设计和实现。通过对适用范围和使用方式的明确,我们能更好地把握 CoroutinePoolExecutor 的潜在优势,为异步应用的性能提升做出更有针对性的贡献.
在这里我先贴出完整的代码实现,其中着重点已经用注释标明.
以下是 CoroutinePoolExecutor 的代码实现:
import os
import asyncio
import weakref
import logging
import itertools
async def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue):
try:
while True:
work_item = await work_queue.get()
if work_item is not None:
await work_item.run()
del work_item
executor = executor_reference()
if executor is not None:
# Notify available coroutines
executor._idle_semaphore.release()
del executor
continue
# Notifies the next coroutine task that it is time to exit
await work_queue.put(None)
break
except Exception as exc:
logging.critical('Exception in worker', exc_info=True)
class _WorkItem:
def __init__(self, future, coro):
self.future = future
self.coro = coro
async def run(self):
try:
result = await self.coro
except Exception as exc:
self.future.set_exception(exc)
else:
self.future.set_result(result)
class CoroutinePoolExecutor:
"""
Coroutine pool implemented based on ThreadPoolExecutor
Different from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutine
So you must use the shutdown method to wait for all subtasks and wait for them to complete execution
"""
# Used to assign unique thread names when coroutine_name_prefix is not supplied.
_counter = itertools.count().__next__
def __init__(self, max_workers, coroutine_name_prefix=""):
if max_workers is None:
max_workers = min(32, (os.cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = asyncio.Queue()
self._idle_semaphore = asyncio.Semaphore(0)
self._coroutines = set()
self._shutdown = False
self._shutdown_lock = asyncio.Lock()
self._coroutine_name_prefix = (coroutine_name_prefix or (
f"{__class__.__name__}-{self._counter()}"
))
async def submit(self, coro):
async with self._shutdown_lock:
# When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.
# This is because after shutdown, all sub-coroutines will end their work
# one after another. Even if there are new coroutine tasks, they will not
# be reactivated.
if self._shutdown:
raise RuntimeError('cannot schedule new coroutine task after shutdown')
f = asyncio.Future()
w = _WorkItem(
f,
coro
)
await self._work_queue.put(w)
await self._adjust_coroutine_count()
return f
async def _adjust_coroutine_count(self):
try:
# 2 functions:
# - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.
# - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified
# Since the Semaphore provided by asyncio does not have a timeout
# parameter, you can choose to use it with wait_for.
if await asyncio.wait_for(
self._idle_semaphore.acquire(),
0
):
return
except TimeoutError:
pass
num_coroutines = len(self._coroutines)
if num_coroutines < self._max_workers:
coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}"
t = asyncio.create_task(
coro=_worker(
weakref.ref(self),
self._work_queue
),
name=coroutine_name
)
self._coroutines.add(t)
async def shutdown(self, wait=True, *, cancel_futures=False):
async with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
while True:
try:
work_item = self._work_queue.get_nowait()
except asyncio.QueueEmpty:
break
if work_item is not None:
work_item.future.cancel()
# None is an exit signal, given by the shutdown method, when the shutdown method is called
# will notify the sub-coroutine to stop working and exit the loop
await self._work_queue.put(None)
if wait:
for t in self._coroutines:
await t
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.shutdown(wait=True)
return False
以下是 CoroutinePoolExecutor 的使用方式:
import asyncio
from coroutinepoolexecutor import CoroutinePoolExecutor
async def task(i):
await asyncio.sleep(1)
print(f"task-{i}")
async def main():
async with CoroutinePoolExecutor(2) as executor:
for i in range(10):
await executor.submit(task(i))
if __name__ == "__main__":
asyncio.run(main())
我们知道,在线程池中,工作线程一旦创建会不断的领取新的任务并执行,除开 shutdown() 调用,否则对于静态的线程池来讲工作线程不会自己结束.
在上述协程池代码实现中,CoroutinePoolExecutor 类包含了主要的对外调用功能的接口、内部提供了存储 task 的 Queue、工作协程自动生成 name 的计数器、保障协程的信号量锁等等.
而 _worker 函数是工作协程的运行函数,其会在工作协程启动后,不断的从 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具体执行 coro task.
剩下的 _WorkItem 是一个 future 对象与 coro task 的封装器,其功能是解耦 future 对象和 coro task、并在 coro task 运行时和运行后设置 future 的结果.
在此 CoroutinePoolExecutor 实现后,我其实又有了一个新的思考。Python 的 EventLoop 相较于 Node.js 的 EventLoop 来说其实更加的底层,它有感的暴露了出来.
具体体现在当 Python Event Loop 启动后,如果 main coroutine 停止运行,那么所有的 subtask coroutine 也会停止运行,尤其是对于一些需要清理资源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都会在某些情况显得无措,说的更具体点就是不知道在什么时候调用.
对于这些问题,我们可以继承 BaseEventLoop 自己手动对 EventLoop 的功能进行扩展,如在事件循环关闭之前添加 hook function,甚至可以限制整个 EventLoop 的 max_workers 或者做成动态的可调节 coroutine 数量的 EventLoop 都行.
无论如何,只要心里有想法,就可以去将它实现 .. 学习本身就是一个不断挑战的过程.
最后此篇关于PythonCoroutine池化实现的文章就讲到这里了,如果你想了解更多关于PythonCoroutine池化实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
Tomcat 6 下的 Servlets 101: 有人可以指点我对例如的最佳方式的一个很好的解释。在 servlet 启动时创建一个昂贵的 Foo 对象的集合,并将它们存储在我可以在处理每个请求时访
我想在我的网络中做最大池化,像这样: 我的网络是一维的(你可以认为上面的例子是一个句子有 6 个词,而每个词有 3 个嵌入)我不知道特征的长度(不是每个句子都有相同的长度),所以我不能' t 在 tf
关于SDL2的硬件加速纹理渲染,我有两个问题: 当使用SDL_Createtexture(...)时,当VRAM非常重要时,是否可以在系统RAM和VRAM之间自动合并/传输纹理?为了确保不会淹没VRA
我正在尝试通过池化 box2d 主体来生成 block ,我不知道 libgdx 池化是否适用于主体,但如果是,请有人解释我如何做到这一点以及我的代码有什么问题。 首先,我在单独的方法上创建了 Bod
我的主要目标是在静默或某种方式下池化一个httpclient,我想调用一个方法来给我一个httpclient...因为我认为它对于每个休息调用都使用资源,添加一个新的httpclient实例并且设置一
我有一个非常具体的应用程序架构问题。 我需要解析大量传入目的地,这些对象的解析是异步处理的,完成后需要将对象传递到下一阶段。 所以真正的问题归结为,处理大量 Future 对象的优雅方式是什么。 我是
我使用 protobuf 作为数据记录器的编码机制。远程客户端将通过 TCP 发送一条消息(在 protobuf 中编码),应用程序会将其写入磁盘。消息写入磁盘后,其唯一的内存引用将被删除,并将在适当
有些教程说使用 org.postgresql.ds.PGConnectionPoolDataSource 但有些教程说只是 org.postgresql.ds.PGSimpleDataSource..
我正在尝试在 this paper 中重新创建字符级 CNN并且在我需要创建 k-max 池化层的最后一步有点卡住,因为我使用的是 MXNet 但它没有这个。 An important differe
假设我必须在后台递归地迭代存储在树结构中的项目,并且我想使用线程池中的多个线程(每个“文件夹”节点一个线程)来遍历这棵树。我已经成功地使用 OmniThreadLibrary 提供的几种不同的低级和高
我想了解 timeToLive 属性是如何工作的? 这是当你从池中获取连接时,特意关闭连接并返回到池中的时间间隔? API 我希望使用持久连接的客户端每隔几秒关闭一次,这样对负载均衡器的请求每隔几秒就
我目前正在尝试使用 CloseableHttpClient 同时执行多个 HttpGet 请求。 我用谷歌搜索了如何做到这一点,答案是使用 PoolingHttpClientConnectionMan
我终于认输并寻求帮助。我想尽办法解决这个问题,但我似乎无能为力。 我正在与:VS2010 C#甲骨文 12cODP.Net 托管121012 我继承了一个同时使用托管和非托管数据访问 dll 的应用程
这是我看完Documents的理解: 池化,与许多其他数据库一样,我们只有一定数量的允许连接,所以你们都排好队等待空闲连接返回池中。 (连接在某种意义上就像一个 token ) 在任何给定时间,事件和
我有一个像 1x8x128x128 这样的 5D blob,并且我有一个能够处理我的 5D blob 的卷积层。当我想使用池层时,尽管它不起作用。如何将池层与 5D blob 一起使用? Check
我正在尝试实现一种使用 L2 池化的 CNN 架构。引用论文特别指出 L2 池化优于最大池化,因此我想在激活函数之后尝试 L2 池化。 但是,Tensorflow 似乎只提供了tf.nn.avg_po
我正在玩一款游戏,每 1-3 秒生成一个对象。该游戏对象包含一些用于渲染目的的资源和一个 Box2D 主体。问题是我不想创建数千个对象。相反,我想重用它们,重置其属性(位置、摩擦力等),而不是创建一个
将 Tomcat 与 MySQL 一起使用时,Tomcat 数据源配置中的 poolPreparedStatements 设置(我相信来自 DBCP)和 Connector/J 之间的关系是什么 ca
我们需要在 Java EE 应用程序中使用队列,并且由于它是一个基于云的应用程序(部署在 OpenShift Online 上),我们喜欢使用 amazon sqs。 如果我正确理解了 JMS/Jav
我是一名优秀的程序员,十分优秀!