!!!原创小文,转载请标明出处!!!

asyncio

asyncio 的编程模型就是一个消息循环。我们从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了异步IO。

1、 创建一个loop线程,将消费队列添加到loop队列

import asyncio
async def main():
    channel_session = await get_channel_pool()
    redis_session = await get_redis()
    db_session = await get_db_pool()
    async with channel_session:
        # 添加一个消费任务
        await loop.create_task(consume(db_session, redis_session, channel_session, "DBQ"))
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

# 其他消费、发布用例见: https://aio-pika.readthedocs.io/en/latest/quick-start.html

2、 创建消费队列

async def consume(db_session, redis_session, channel_pool, queue_name):
    :param db_session: mysql异步连接池
    :param redis_session: redis异步连接池
    :param channel_pool: rabbitMq异步连接池
    :param queue_name: 队列名
    :return:
    async with channel_pool.acquire() as channel:
        await channel.set_qos(20)
        queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                data_dict = json.loads(message.body.decode())
                    # 数据逻辑处理(自行添加)
                    await db_operation(db_session, redis_session, data_dict)
                    await message.ack()
                except Exception as e:
                    await message.ack()
                    logger.error(traceback.format_exc())

3、 创建相关连接池

(1)使用sqlalchemy创建mysql连接池

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config.config import DB
# 创建对象的基类:
ENGINE_DATABASE_URL = 'mysql+pymysql://%s:%s@%s:%s/%s' % (DB.get('user'),
                                                          DB.get('pass'),
                                                          DB.get('host'),
                                                          DB.get('port', '3306'),
                                                          DB.get('db'))
# 初始化数据库连接:
engine = create_engine(ENGINE_DATABASE_URL, encoding='utf-8')
Session = sessionmaker(bind=engine)
async def get_db_pool():
    db = Session()
        return db
    finally:
        db.close()

(2)使用aioredis创建redis连接池

import asyncio
from aioredis import create_redis_pool, Redis
from config.config import REDIS
async def get_redis() -> Redis:
        loop = asyncio.get_event_loop()
        redis = await create_redis_pool("redis://:%s@%s:%s/%s?encoding=utf-8" % (REDIS.get('pass'),
                                                                                 REDIS.get('host'),
                                                                                 REDIS.get('port', '6379'),
                                                                                 REDIS.get('db', '0')),
                                        minsize=5,
                                        maxsize=20,
                                        loop=loop)
        return redis
    except:
        print("Connect call Redis Error!")

(3)使用aio_pika创建RabbitMq连接池

import aio_pika
import asyncio
from aio_pika.pool import Pool
from config.config import RABBIT_URL
async def get_channel_pool() -> aio_pika.pool.Pool:
    loop = asyncio.get_event_loop()
    async def get_connection():
        return await aio_pika.connect_robust(RABBIT_URL)
    connection_pool = Pool(get_connection, max_size=20, loop=loop)
    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()
    return Pool(get_channel, max_size=20, loop=loop)
fail-fast 机制,即快速失败机制,是java集合(Collection)中的一种错误检测机制。当在迭代集合的过程中该集合在结构上发生改变的时候,就有可能会发生fail-fast,即抛出 ConcurrentModificationException异常。fail-fast机制并不保证在不同步的修改下一定会抛出异常,它只是尽最大努力去抛出,所以这种机制一般仅用于检测bug。 fail-fast出现场景 import java.util.ArrayList; import java.util.It RabbitMQ 是一个消息队列中间件,主要用于 异步 处理、解耦和缓冲。它 实现 了高级的消息队列协议(AMQP),并提供了可靠性、强大的管理界面和丰富的客户端库,是非常流行的分布式消息系统。 RabbitMQ 的基本思想是生产者将消息发送到队列中,然后 消费 者从队列中取出消息进行处理。在 RabbitMQ 中,生产者和 消费 者不知道对方的存在,并且服务于 RabbitMQ Broker 上,即使有一端暂停了也不会影响另外一端的工作,从而保证了整个系统的稳定性和可靠性。 def mq_conn(): credentials = pika .PlainCredentials('admin', 'admin') # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika .BlockingConnection( pika .Conn.. 环境是centos6.9 rabbitmq 依赖erlang语言,首先要安装erlang,然后安装 rabbitmq -server 启动 rabbitmq -server:service rabbitmq -server start python 3安装 pika 模块:pip3 install pika pika 连接 rabbitmq 示例: import pika , ti... async def main(): connection = await aio _ pika .connect_robust( “amqp://guest:guest@127.0.0.1/” # 建立连接 queue_name = "test_queue" async with connection: # 上下文管理,退出时自动关闭connection async def main() -> None: # Perform connection connection = await connect(host='127.0.0.1',port=5672,login='ai.litter', 串行的方式 在没有mq中间件之前,我们通常使用这种方式 实现 实现 起来很容易,比如先将用户信息插入数据库,然后发送成功注册的邮件、短信。以上三个任务完成后才会给用户响应,但我们应该都知道,对于邮件、短信,对于系统核心业务来说这都不是必须马上发送的,这样的 实现 方式无非会增加系统的响应时间,甚至给用户带来不好的体验。可以认为就是一个线程在串行执行三个任务。   众所周知, RabbitMQ 是一个开源的高性能的消息队列,支持多种开发语言:Java, Python ,.Net,C,C++,PHP等多种语言,那么如何通过 Python 语言调用 RabbitMQ 呢? Python pika 这个模块提供了完整的调用方法,通过这个包我们可以 实现 Rabbit的简单模式,交换机模式以及一些特殊的参数。那么我们如何使用 pika 模块呢,接下来,请看听我慢慢道来。 一、简单模式 简单模式下,有多个 消费 者时,采用轮询方式处理消息。 import pika if __name_ Ubuntu18.04安装一些较麻烦的 Python 第三方库 由于工作中会比较常用以下的第三方库,直接pip安装会报错,现总结一下(网上的教程鱼龙混杂),亲测ok 安装 aio _ pika 比较麻烦,ubuntu16.04默认安装的是 python 3.5.2 如果直接sudo pip install aio _ pika 会报错 AttributeError: module ‘typing’ has no attribute ‘Coroutine’ 如果在 python 3.6安装就不会出问题,看来是 python 3.5的ty 从上篇文章可知,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。 消息确认就是当工作者完成任务后,会反馈给 rabbitmq 修改receive.py的内容: 1 def callback(ch, method, properties, body): 3 ...