!!!原创小文,转载请标明出处!!!
asyncio
asyncio
的编程模型就是一个消息循环。我们从
asyncio
模块中直接获取一个
EventLoop
的引用,然后把需要执行的协程扔到
EventLoop
中执行,就实现了异步IO。
1、 创建一个loop线程,将消费队列添加到loop队列
import asyncio
async def main():
channel_session = await get_channel_pool()
redis_session = await get_redis()
db_session = await get_db_pool()
async with channel_session:
# 添加一个消费任务
await loop.create_task(consume(db_session, redis_session, channel_session, "DBQ"))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
# 其他消费、发布用例见:
https://aio-pika.readthedocs.io/en/latest/quick-start.html
2、 创建消费队列
async def consume(db_session, redis_session, channel_pool, queue_name):
:param db_session: mysql异步连接池
:param redis_session: redis异步连接池
:param channel_pool: rabbitMq异步连接池
:param queue_name: 队列名
:return:
async with channel_pool.acquire() as channel:
await channel.set_qos(20)
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
data_dict = json.loads(message.body.decode())
# 数据逻辑处理(自行添加)
await db_operation(db_session, redis_session, data_dict)
await message.ack()
except Exception as e:
await message.ack()
logger.error(traceback.format_exc())
3、 创建相关连接池
(1)使用sqlalchemy创建mysql连接池
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config.config import DB
# 创建对象的基类:
ENGINE_DATABASE_URL = 'mysql+pymysql://%s:%s@%s:%s/%s' % (DB.get('user'),
DB.get('pass'),
DB.get('host'),
DB.get('port', '3306'),
DB.get('db'))
# 初始化数据库连接:
engine = create_engine(ENGINE_DATABASE_URL, encoding='utf-8')
Session = sessionmaker(bind=engine)
async def get_db_pool():
db = Session()
return db
finally:
db.close()
(2)使用aioredis创建redis连接池
import asyncio
from aioredis import create_redis_pool, Redis
from config.config import REDIS
async def get_redis() -> Redis:
loop = asyncio.get_event_loop()
redis = await create_redis_pool("redis://:%s@%s:%s/%s?encoding=utf-8" % (REDIS.get('pass'),
REDIS.get('host'),
REDIS.get('port', '6379'),
REDIS.get('db', '0')),
minsize=5,
maxsize=20,
loop=loop)
return redis
except:
print("Connect call Redis Error!")
(3)使用aio_pika创建RabbitMq连接池
import aio_pika
import asyncio
from aio_pika.pool import Pool
from config.config import RABBIT_URL
async def get_channel_pool() -> aio_pika.pool.Pool:
loop = asyncio.get_event_loop()
async def get_connection():
return await aio_pika.connect_robust(RABBIT_URL)
connection_pool = Pool(get_connection, max_size=20, loop=loop)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()
return Pool(get_channel, max_size=20, loop=loop)
fail-fast 机制,即快速失败机制,是java集合(Collection)中的一种错误检测机制。当在迭代集合的过程中该集合在结构上发生改变的时候,就有可能会发生fail-fast,即抛出 ConcurrentModificationException异常。fail-fast机制并不保证在不同步的修改下一定会抛出异常,它只是尽最大努力去抛出,所以这种机制一般仅用于检测bug。
fail-fast出现场景
import java.util.ArrayList;
import java.util.It
RabbitMQ
是一个消息队列中间件,主要用于
异步
处理、解耦和缓冲。它
实现
了高级的消息队列协议(AMQP),并提供了可靠性、强大的管理界面和丰富的客户端库,是非常流行的分布式消息系统。
RabbitMQ
的基本思想是生产者将消息发送到队列中,然后
消费
者从队列中取出消息进行处理。在
RabbitMQ
中,生产者和
消费
者不知道对方的存在,并且服务于
RabbitMQ
Broker 上,即使有一端暂停了也不会影响另外一端的工作,从而保证了整个系统的稳定性和可靠性。
def mq_conn():
credentials =
pika
.PlainCredentials('admin', 'admin') # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection =
pika
.BlockingConnection(
pika
.Conn..
环境是centos6.9
rabbitmq
依赖erlang语言,首先要安装erlang,然后安装
rabbitmq
-server
启动
rabbitmq
-server:service
rabbitmq
-server start
python
3安装
pika
模块:pip3 install
pika
pika
连接
rabbitmq
示例:
import
pika
, ti...
async def main():
connection = await
aio
_
pika
.connect_robust(
“amqp://guest:guest@127.0.0.1/”
# 建立连接
queue_name = "test_queue"
async with connection:
# 上下文管理,退出时自动关闭connection
async def main() -> None:
# Perform connection
connection = await connect(host='127.0.0.1',port=5672,login='ai.litter',
串行的方式
在没有mq中间件之前,我们通常使用这种方式
实现
,
实现
起来很容易,比如先将用户信息插入数据库,然后发送成功注册的邮件、短信。以上三个任务完成后才会给用户响应,但我们应该都知道,对于邮件、短信,对于系统核心业务来说这都不是必须马上发送的,这样的
实现
方式无非会增加系统的响应时间,甚至给用户带来不好的体验。可以认为就是一个线程在串行执行三个任务。
众所周知,
RabbitMQ
是一个开源的高性能的消息队列,支持多种开发语言:Java,
Python
,.Net,C,C++,PHP等多种语言,那么如何通过
Python
语言调用
RabbitMQ
呢?
Python
中
pika
这个模块提供了完整的调用方法,通过这个包我们可以
实现
Rabbit的简单模式,交换机模式以及一些特殊的参数。那么我们如何使用
pika
模块呢,接下来,请看听我慢慢道来。
一、简单模式
简单模式下,有多个
消费
者时,采用轮询方式处理消息。
import
pika
if __name_
Ubuntu18.04安装一些较麻烦的
Python
第三方库
由于工作中会比较常用以下的第三方库,直接pip安装会报错,现总结一下(网上的教程鱼龙混杂),亲测ok
安装
aio
_
pika
比较麻烦,ubuntu16.04默认安装的是
python
3.5.2
如果直接sudo pip install
aio
_
pika
会报错
AttributeError: module ‘typing’ has no attribute ‘Coroutine’
如果在
python
3.6安装就不会出问题,看来是
python
3.5的ty
从上篇文章可知,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
消息确认就是当工作者完成任务后,会反馈给
rabbitmq
修改receive.py的内容:
1 def callback(ch, method, properties, body):
3 ...