context = zmq_ctx_new();
// 第二步:创建socket,flag为ZMQ_SUB是订阅者
subscriber = zmq_socket(context, ZMQ_SUB);
// 第三步:zmq_setsockopt 设置socket连接参数
int reconnectTime = 10;
// ZMQ_RECONNECT_IVL 重连间隔 ZMQ_RECONNECT_IVL_MAX:设置重连间隔的最大值
int ret = zmq_setsockopt(subscriber_sub, ZMQ_RECONNECT_IVL, &reconnectTime, sizeof(reconnectTime));
// 设置连接心跳
int ivl = 5 * 1000;
int ttl = 2 * 60 * 1000;
int timeout = 5 * 1000;
zmq_setsockopt(subscriber_sub,ZMQ_HEARTBEAT_IVL, &ivl, sizeof(ivl)); // 心跳间隔
zmq_setsockopt(subscriber_sub,ZMQ_HEARTBEAT_TTL, &ttl, sizeof(ttl)); // 设置TTL值ZMTP心跳
zmq_setsockopt(subscriber_sub,ZMQ_HEARTBEAT_TIMEOUT, &timeout, sizeof(timeout));// 心跳超时
// 连接超时时间计算:ZMQ_HEARTBEAT_IVL + ZMQ_HEARTBEAT_TIMEOUT = ? seconds
// 第四步:连接
zmq_connect(subscriber, "tcp://localhost:10012");
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "Quote", strlen("Quote")); //允许订阅多个频道
void ZmqClient::subs() {
char topic_name[256] = { 0 }; //用于接收订阅的主题名
char payload[1024] = { 0 }; //用于接收订阅主题的内容
while (1)
memset(topic_name, 0, sizeof(topic_name));
memset(payload, 0, sizeof(payload));
// zmq_recv 阻塞式接收,如果要设置接收或者发布者发送超时可以设zmq_setsockopt(subscriber_sub, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
int size = zmq_recv(subscriber, topic_name, sizeof(topic_name), 0); //接收订阅的主题名称
if (size == -1)
std::cout << "recv topic error!!" << std::endl;
continue;
size = zmq_recv(subscriber, payload, sizeof(payload), 0); //接收订阅的消息
if (size == -1)
std::cout << "recv payload error!!" << std::endl;
std::cout << topic_name << payload << std::endl;
// 第五步:退出时调用
zmq_close(subscriber);
zmq_ctx_destroy(context);
void* context;
void* subscriber;
context = zmq_ctx_new();
publisher = zmq_socket(context, ZMQ_PUB);
// 绑定地址
std::string addr = "tcp://localhost:10012"
int result = zmq_bind(publisher, addr.data());//zmq_bind()执行成功会返回0。其它情况则返回-1并且设置errno
// 发送
zmq_send(publisher, "Quote", strlen("Quote"), ZMQ_SNDMORE); //指定要发布消息的主题
zmq_send(publisher, ptr.c_str(), strlen(ptr.c_str()), 0); //向设置的主题发布消息
// 退出时调用
zmq_close(publisher);
zmq_ctx_destroy(context);
zmq_setsockopt 方法的参数设置:
ZMQ_RECONNECT_IVL:设置连接区间
的ZMQ_RECONNECT_IVL购股权为指定的套接字设置初始重新连接间隔。重新连接间隔是ØMQ在使用面向连接的传输时尝试重新连接断开的对等端之间等待的时间。
ZMQ_IMMEDIATE:将消息仅排队到已完成的连接
默认情况下,即使连接尚未完成,队列仍将填充传出连接。这可能导致带有循环路由的套接字上的“丢失”消息(REQ,PUSH,DEALER)。如果此选项设置为1,则消息只应排队等待完成的连接。如果没有其他连接,这将导致套接字阻塞,但会阻止队列填充等待连接的管道。
ZMQ_HEARTBEAT_IVL:发送ZMTP之间设置间隔心跳
将设置发送ZMTP心跳对指定的插槽之间的间隔。如果设置此选项并且大于0,则每隔ZMQ_HEARTBEAT_IVL毫秒发送一次PING ZMTP命令。
ZMQ_HEARTBEAT_TIMEOUT:为ZMTP设置超时心跳
将设置多长时间超时;发送PING命令ZMTP并没有收到任何流量后,连接之前要等待。此选项仅在ZMQ_HEARTBEAT_IVL也设置且大于0时有效。如果在发送PING命令后没有收到流量,但连接将超时,但接收的流量不必是PONG命令 - 任何接收到的流量将取消超时。
ZMQ_HEARTBEAT_TTL:设置TTL值ZMTP心跳
应当建立远程对等的超时ZMTP心跳。如果这个选项大于0,如果在TTL周期内没有收到更多的流量,远程端会超时。如果ZMQ_HEARTBEAT_IVL未设置或为0,则此选项不起作用。在内部,该值向下舍入到最接近的十分位,任何小于100的值都不起作用。
ZMQ_CONNECT_TIMEOUT:设置connect()超时
设置了多久时间,出connect()系统调用来等待。系统调用通常需要很长时间才会返回超时错误。设置此选项可让库在较早的时间间隔超时。
ZMQ_BACKLOG:未完成的连接
应设定为指定的套接字优秀的对等连接的队列的最大长度;这仅适用于面向连接的传输。有关详细信息,请参阅您的操作系统文档,以获取listen函数。
https://www.cnblogs.com/fengbohello/p/4230135.html
ZMQ接口文档的官方网站 : http://api.zeromq.org/
ZMQ接口文档的百度网盘下载地址(英文):http://pan.baidu.com/s/1jGDqXfS
● zmq - 0MQ 轻量级消息传输内核
● zmq_bind - 绑定一个socket
● zmq_close - 关闭ZMQ socket
● zmq_connect - 由一个socket创建一个对外连接
● zmq_ctx_destroy - 销毁一个ZMQ环境上下文
● zmq_ctx_get - 得到环境上下文的属性
● zmq_ctx_new – 创建一个新的ZMQ 环境上下文
● zmq_ctx_set - 设置环境上下文属性
● zmq_ctx_shutdown - 停止一个ZMQ context
● zmq_socket_monitor - 注册一个监控回调函数
● zmq_ctx_term - 终结一个ZMQ环境上下文
● zmq_curve – 安全的认证方式和保密方式
● zmq_curve_keypair - 生成一个新的CURVE 密钥对
● zmq_disconnect - 断开一个socket的连接
● zmq_errno – 返回errno的值给调用此函数的线程
● zmq_init - 初始化ZMQ环境上下文 (已弃用)
● zmq_z85_decode – 从一个用Z85算法生成的文本中解析出二进制密码
● zmq_z85_encode – 使用Z85算法对一个二进制秘钥进行加密,输出可打印的文本
● zmq_version – 返回ZMQ链接库的版本
● zmq_unbind - 停止连接外来的请求
● zmq_plain - 明文认证
● zmq_null - 无安全和加密
● zmq_msg_more - 指出是不是还有更多的消息部分可以接收
● zmq_msg_init - 初始化一个空的ZMQ消息结构
● zmq_msg_init_data - 从一个指定的存储空间中初始化一个ZMQ消息对象的数据
● zmq_msg_init_size - 使用一个指定的空间大小初始化ZMQ消息对象
● zmq_msg_move - 将一个消息里面的内容移动到另一个消息里面
● zmq_msg_copy - 把一个消息的内容复制到另一个消息中
● zmq_msg_data - 返回消息内容的指针
● zmq_msg_get - 获取消息的性质
● zmq_msg_set - 设置消息的性质
● zmq_msg_size - 以字节为单位返回消息内容的大小
● zmq_msg_recv - 从一个socket中接受一个消息帧
● zmq_msg_close – 释放一个ZMQ消息
● zmq_msg_send – 从一个socket发送一个消息帧
● zmq_term - 终结ZMQ环境上下文(context)(已弃用)
● zmq_strerror - 获取ZMQ错误描述字符串
● zmq_poll - I/O多路技术
● zmq_tcp – 使用TCP协议的ØMQ网络单播协议
● zmq_recv – 从一个socket上接收一个消息帧
● zmq_send – 在一个socket上发送一个消息帧
● zmq_proxy – 开始ZMQ内置代理
● zmq_recvmsg – 从一个socket上接收一个消息帧 (已弃用)
● zmq_sendmsg – 从一个socket上发送一个消息帧 (已弃用)
● zmq_ipc – ZMQ本地进程间通信传输协议
● zmq_proxy_steerable – 以STOP/RESUME/TERMINATE控制方式开启内置的ZMQ代理
● zmq_inproc – ØMQ 本地进程内(线程间)传输方式
● zmq_pgm – ØMQ 使用PGM 进行可靠的多路传输
● zmq_send_const – 从一个socket上发送一个固定内存数据
● zmq_socket – 创建ZMQ套接字
● zmq_setsockopt –设置ZMQ socket的属性
● zmq_getsockopt – 获取ZMQ socket的属性
发布订阅模式接收端:void* context;void* subscriber;// 第一步:zmq_ctx_new 创建context对象context = zmq_ctx_new();// 第二步:创建socket,flag为ZMQ_SUB是订阅者subscriber = zmq_socket(context, ZMQ_SUB);// 第三步:zmq_setsockopt 设置socket连接参数int reconnectTime = 10;// ZMQ_RECONNEC
内含简单项目代码及
ZeroMQ
使用教程,资源适合初学者,可以满足快速入门要求
下载
ZeroMQ
下载网址:http://
zeromq
.org/intro:get-the-software
点击“Windows sources”下载
zeromq
-4.0.3.zip文件
编译
ZeroMQ
库文件
解压
zeromq
-4.0.3.zip文件,进入builds\msvc目录,用VS打开*.sln工程文件,编译生成解决方案。编译完成后,会在lib目录下生成dll和lib文件
编写简单的测试工程
1.用VS新建2个项目,一个是server端,一个是client端
2.将
ZeroMQ
源码项目的include目录下的两个文件“
zmq
.h”,“
zmq
_utils.h”拷贝至自己新建的工程
3.将
ZeroMQ
源码项目的lib目录下的两个文件“lib
zmq
.dll”,“lib
zmq
.lib”拷贝至自己新建的工程
4.将文件“
zmq
.h”,“
zmq
_utils.h”和“lib
zmq
.lib”添加进自己新建的项目。
我使用的
ZeroMQ
版本是4.2.0,应用的是其发布-订阅
模式
应该知道的细节:
PUB-SUB套接字是慢连接,你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。参考文章:传送门1.传送门2.
先看代码,参考该文章:
ZeroMQ
的订阅发布(publish-subscribe)
模式
heartbeat
超时值定义了RabbitMQ及其client库在多久之后认为TCP连接不可到达。这个值是在client连接RabbitMQ服务器的时候协商好的,在RabbitMQ 3.0及以上版本,broker缺省就会自动尝试进行
heartbeat
协商,而对于低版本则必须由client在连接时显示地请求协商。该值单位为秒,缺省是60秒。
每隔
timeout
/ 2秒发送一个
Heartbeat
消息帧,这个值有时被称作
heartbeat
interval,如果连续丢失两个
heartbeat
s消息帧,就认...
文章目录关于Workerworker中的计时器worker的心跳worker中对
ZMQ
连接的维护WorkerWorker.startWorker.loadWorker
2021SC@SDUSC
package org.apache.storm.daemon.worker;
关于Worker
通过worker-data方法定义了一个包含很多共享数据的映射集合,worker中的很多方法都依赖它。
worker中的计时器
每个计时器都对应着一个java线程,worker中使用计时器进行心跳保持以及获取元数据的更新
zeromq
作为网络通讯库,是支持发布、订阅机制的,但是又与MQTT等发布、订阅概念有所不同。
由于
ZMQ
通讯是基于CS模型的,没有服务程序做中转,也就意味着订阅端作服务端和发布端作服务端是不同的。
如下图所示,每个框表示一个进程,
zmq
发布、订阅机制有如下性质:
服务程序作发布者 只能跟作订阅者的客户端程序通信,服务程序作订阅者只能跟作发布者的客户端程序通信。
订阅者作客户端,只能跟一个...
ZeroMQ
的
发布订阅
模式
(Publish-Subscribe)是一种常用的消息传递
模式
,适用于需要发布和订阅消息的场景,例如多人聊天室、订阅新闻、实时数据传输等。
在
发布订阅
模式
中,消息发布者(Publisher)将消息发送到一个或多个主题(Topic),消息订阅者(Subscriber)可以根据自己的需求订阅感兴趣的主题,从而接收到相应的消息。
发布订阅
模式
支持一对多和多对多的消息传递,可以实现广播、多播等功能。
具体实现时,可以使用
ZeroMQ
的 PUB-SUB 套接字组合来实现
发布订阅
模式
。消息发布者使用 PUB 套接字将消息发送到指定的主题,消息订阅者使用 SUB 套接字订阅感兴趣的主题,并接收相应的消息。需要注意的是,订阅者必须在发布者发送消息之前订阅相应的主题,否则无法接收到消息。
以下是使用
ZeroMQ
实现
发布订阅
模式
的示例代码:
Publisher:
```python
import
zmq
context =
zmq
.Context()
socket = context.socket(
zmq
.PUB)
socket.bind("tcp://*:5555")
while True:
topic = input("Enter topic: ")
message = input("Enter message: ")
socket.send_string(f"{topic} {message}")
Subscriber:
```python
import
zmq
context =
zmq
.Context()
socket = context.socket(
zmq
.SUB)
socket.connect("tcp://localhost:5555")
topic_filter = input("Enter topic filter: ")
socket.setsockopt_string(
zmq
.SUBSCRIBE, topic_filter)
while True:
message = socket.recv_string()
topic, content = message.split(' ', 1)
print(f"[{topic}] {content}")
在这个示例中,消息发布者通过 PUB 套接字将消息发送到指定的主题,消息订阅者通过 SUB 套接字订阅感兴趣的主题,并接收相应的消息。需要注意的是,订阅者必须在发布者发送消息之前订阅相应的主题,否则无法接收到消息。