痴情的煎饼
1 年前 |
Teams
Learn more about Teams
I am seeing a strange behavior using
ZMQ_PUB
.
I have a producer which
.connect()
-s to different processes
that
.bind()
on
ZMQ_SUB
sockets.
The subscribers all
.bind()
, the publisher
.connect()
-s.
When a producer starts, it creates a
ZMQ_PUB
socket and
.connect()
-s it to different processes. It then immediately starts sending messages at a regular period.
As expected, if there are no connected subscribers, it drops all messages, until a subscriber starts.
The flow works normal then, when a subscriber starts, it receives the messages from that moment on.
Now, the problem is:
So what I see is that the producer enqueued all messages while the subscriber was down. As soon as the socket reconnected, because the subscriber process restarted, it sent all queued messages.
As I understood from here , a publisher should drop all sent messages when there are no connected subscribers:
ZeroMQ examples
"A publisher has no connected subscribers, then it will simply drop all messages."
Why is this happening?
By the way, I am using C++ over linux for these tests.
I tried setting a different identity on the subscriber when it binds, but it didn't work. Publisher still enqueues messages, and delivers them all when subscriber restart.
Thanks in advance,
UPDATE:
IMPORTANT UPDATE!!!!!
Before posting this question
I had tried different solutions. One was to set
ZMQ_LINGER
to 0, which didn't work.
I added
ZMQ:IMMEDIATE
, and it worked, but I just found out that
ZMQ:IMMEDIATE
alone does not work. It requires also
ZMQ_LINGER
.
Luis Rojas 3 hours ago
UPDATE: As per request, I am adding some simple test cases to show my point. One is a simple subscriber, which runs on command line and receives the uri where to bind, for instance :
$ ./sub tcp://127.0.0.1:50001
The other is a publisher, which receives a list of uris to connect to, for instance :
./pub tcp://127.0.0.1:50001 tcp://127.0.0.1:50002
The subscriber receives up to 5 messages, then closes socket and exit. We can see on wireshark the exchange of FIN/ACK, both ways, and how the socket moves to TIME_WAIT state. Then, publisher starts sending SYN, trying to reconnect (that probes the ZMQ_PUB knows that connection closed)
I am explicitely not unsubscribing the socket, just closing it. In my opinion, if the socket closed, the publisher should automatically end any subscription for that connection.
So what I see is : I start subscriber(one or more), I start publisher, which starts sending messages. Subscriber receives 5 messages and ends. In the meantime publisher continues sending messages, WITH NO CONNECTED SUBSCRIBER. I restart the subscriber, and receives immediately several messages, because they were queued at the publishers side. I think those queued messages break the Publish/Subscribe model, where messages should be delivered only to connected subscribers. If a susbcriber closes the connection, messages to that subscriber should be dropped. Even more, when subscriber restarts, it may decide to subscribe to other messages, but it will still receive those subscribed by a "previous encarnation" that was binded at same port.
My proposal is that ZMQ_PUB (on connect mode), when detecting a socket disconnection, should clear all subscriptions on that socket, until it reconnects and the NEW subscriber decides to resubscribe.
I apologize for language mistakes, but english is not my native language.
Pub's code:
#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>
#include <string>
#include <zeromq/zmq.hpp>
int main( int argc, char *argv[] )
if ( argc < 2 )
fprintf( stderr, "Usage : %s <remoteUri1> [remoteUri2...]\n",
basename( argv[0] ) );
exit ( EXIT_FAILURE );
std::string pLocalUri( argv[1] );
zmq::context_t localContext( 1 );
zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_PUB );
if ( NULL == pSocket )
fprintf( stderr, "Couldn't create socket. Aborting...\n" );
exit ( EXIT_FAILURE );
int i;
for ( i = 1; i < argc; i++ )
printf( "Connecting to [%s]\n", argv[i] );
pSocket->connect( argv[i] );
catch( ... )
fprintf( stderr, "Couldn't connect socket to %s. Aborting...\n", argv[i] );
exit ( EXIT_FAILURE );
printf( "Publisher Up and running... sending messages\n" );
fflush(NULL);
int msgCounter = 0;
char msgBuffer[1024];
sprintf( msgBuffer, "Message #%d", msgCounter++ );
zmq::message_t outTask( msgBuffer, strlen( msgBuffer ) + 1 );
printf("Sending message [%s]\n", msgBuffer );
pSocket->send ( outTask );
sleep( 1 );
catch( ... )
fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
exit ( EXIT_FAILURE );
while ( true );
exit ( EXIT_SUCCESS );
Sub's code
#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>
#include <string>
#include <zeromq/zmq.hpp>
int main( int argc, char *argv[] )
if ( argc != 2 )
fprintf( stderr, "Usage : %s <localUri>\n", basename( argv[0] ) );
exit ( EXIT_FAILURE );
std::string pLocalUri( argv[1] );
zmq::context_t localContext( 1 );
zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_SUB );
if ( NULL == pSocket )
fprintf( stderr, "Couldn't create socket. Aborting...\n" );
exit ( EXIT_FAILURE );
pSocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
pSocket->bind( pLocalUri.c_str() );
catch( ... )
fprintf( stderr, "Couldn't bind socket. Aborting...\n" );
exit ( EXIT_FAILURE );
int msgCounter = 0;
printf( "Subscriber Up and running... waiting for messages\n" );
fflush( NULL );
zmq::message_t inTask;
pSocket->recv ( &inTask );
printf( "Message received : [%s]\n", inTask.data() );
fflush( NULL );
msgCounter++;
catch( ... )
fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
exit ( EXIT_FAILURE );
while ( msgCounter < 5 );
// pSocket->setsockopt( ZMQ_UNSUBSCRIBE, "", 0 ); NOT UNSUBSCRIBING
pSocket->close();
exit ( EXIT_SUCCESS );
Hope, Luis, you do not mind a few light polishing touches on the post. Also feel free to enjoy some other pieces of experience collected in the ZeroMQ posts in >>> stackoverflow.com/… Anyway, enjoy the smart landscapes of ZeroMQ support for distributed systems' designs!
– user3666197
Jan 18, 2017 at 9:42
Q: Why is this happening?
Because the SUB
is actually still connected ( not "disconnected" enough ).
Yes, might be surprising, but killing the SUB
-process, be it on .bind()
- or .connect()
-attached side of the socket's transport-media, does not mean, the Finite-State-Machine of the I/O-pump has "moved" into disconnected-state.
Given that, the PUB
-side has no other option but to consider the SUB
-side still live and connected ( even while the process was silently killed beyond the line-of-sight of the PUB
-side ) and for such "distributed"-state there is a ZeroMQ protocol-defined behaviour ( a PUB
-side duty ) to collect all the interim messages for a ( yes, invisibly dead ) SUB
-scriber, which the PUB
-side still considers fair to live ( but might be having just some temporally intermitent issues somewhere low, on the transport I/O-levels or some kinds of remote CPU-resources starvations or concurrency-introduced transiently intermitent { local | remote } blocking states et al ).
So it buffers...
In case your assassination of the SUB
-side agent would appear to have been a bit more graceful ( using the zeroised ZMQ_LINGER
+ an adequate .close()
on the socket-resource instance ) the PUB
-side will recognise the "distributed"-system system-wide Finite-State-Automaton shift into an indeed "DISCONNECT"-ed state and a due change-of-behaviour will happen on the PUB
-side of the "distributed-FSA", not storing any messages for this "visibly" indeed "DISCONNECT"-ed SUB
-- exactly what the documentation states.
"Distributed-FSA" has but quite a weak means to recognise state-change events "beyond it's horizon of localhost contols. KILL
-ing a remote process, which implements some remarkable part of the "distributed-FSA" is a devastating event, not a method how to keep the system work. A good option for such external risks might be
Sounds complex?
Oh, yes, it is complex, indeed. That's exactly why ZeroMQ solved this for us, to be free and enjoy designing our application architectures based on top of these ( already solved ) low level complexities.
Distributed-system FSA ( a system-wide FSA of layered composition of sub-FSA-s )
To just imagine what is silently going on under the hood, imagine just having a simple, tandem pair of FSA-FSA - exactly what the pair of .Context()
instances try to handle for us in the simplest ever 1:1 PUB/SUB
scenario where the use-case KILL
-s all the sub-FSA-s on the SUB
-side without giving a shot to acknowledge the intention to the PUB
-side. Even the TCP-protocol ( living both on the PUB
-side and SUB
-side ) has several state-transition from [ESTABLISHED
] to [CLOSED
] state.
A quick X-ray view on a distributed-systems' FSA-of-FSA-s
( just the TCP-protocol FSA was depicted for clarity )
PUB
-side:
I completely disagree. The SUB side was not simply "silently killed beyond the line-of-sight". It correctly closed the socket. I can send you the wireshark traces, if you don't believe me. The TCP socket was closed, I can see the FIN/ACK messages and how the socket on the SUB side went to TIME_WAIT. Even more, I can see the PUB side starts to send SYN messages, trying to reconnect. So, the ZMQ_PUB socket knows the connection closed, From that moment it should know there is no SUB connected anymore and should start dropping messages! It clearly seems like a bug in ZeroMQ to me.
– Kalki70
Jan 18, 2017 at 19:19
Welcome, Luis. Given the new information posted in the comment above, would you mind to document the fully compatible MCVE ( the M-inimum C-omplete V-erifiable E-xample ) of the experiment to be able to re-run and potentially reproduce 1:1 the claimed observation? That would be the fastest path to the Occam Razor.
– user3666197
Jan 18, 2017 at 20:29
'coz if you bind a subscriber and then interrupt it, there's no way the publisher knows that the subscriber is unbound and so it queues the messages to the bound port and when you restart again on the same port, the queued messages will be drained.
Option 2:
But if you want to do it your way, you need to do the following things:
Register an interrupt handler (SIGINT
) in the subscriber code
On the interrupt of the subscriber do the following:
unsubscribe
the topic
close
the sub socket
exit the subscriber process cleanly with preferably 0 return code
UPDATE:
Regarding the point of identity, do not assume that setting identity will uniquely identify a connection. If it is left to zeromq, it will assign the identities of the incoming connections using unique arbitrary numbers.
Identities are not used to respond back to the clients in general. They are used for responding back to the clients in case ROUTER
sockets are used.
'Coz ROUTERsockets are asynchronous where as REQ/REP are synchronous. In Async we need to know to whom we respond back. It can be n/w address or a random number or uuid etc.
UPDATE:
I don't consider this as in issue with zeromq because throughout the guide PUB/SUB is explained in the way that Publisher is generally static (Server and is bound to a port) and subscribers come and go along the way (Clients which connect to the port).
There is another option which would exactly fit to your requirement
ZMQ_IMMEDIATE
or ZMQ_DELAY_ATTACH_ON_CONNECT
Setting the above socket option on the publisher would not let the messages en queue when there are no active connections to it.
Option 1 shoudl not be needed. ZeroMQ documentation is very clear, that it doesn't matter which parts binds or connects, the important part is the ZMQ_TYPE. Per design for my problem, subscribers must bind. Option2 : It has no sense. What do you mean the publisher has no way to know there is no subscriber anymore? it must know, as the TCP socket was disconnected. There maybe many reasons why the subscriber disappeared, including hardware crash. The ZMQ_PUB socket must not depend on the behavior of subscriber. If there are no connected subscribers, publisher should drop messages.
– Kalki70
Jan 17, 2017 at 15:53
True. But that is true mostly for 1-1 communication. It only means either party can start communication without waiting for the other party. But remember bind
will perform a socket bind to the port given and connect
will perform a bind to another arbitrary port waiting on connection with the supplied port. There's a lot of difference between them
– manikawnth
Jan 17, 2017 at 19:06
@LuisRojas Please read this from zmq documentation and you get the difference "Although ZeroMQ tries to be neutral about which side binds and which side connects, there are differences. We'll see these in more detail later. The upshot is that you should usually think in terms of "servers" as static parts of your topology that bind to more or less fixed endpoints, and "clients" as dynamic parts that come and go and connect to these endpoints. "
– manikawnth
Jan 17, 2017 at 19:14
I've UPDATED another option which would fit your needs. Check and let me know if it helps.
– manikawnth
Jan 17, 2017 at 20:55
Thanks, the ZMQ_IMMEDIATE option worked perfect. In most forums it is said that it doesn't matter which side binds, for instance : stackoverflow.com/questions/29752338/… You can see there that I added a description of what I am doing with this model: Basically, I am broadcasting commands from some "administrator console" to a group of processes. This console can run from anywhere, and so the processes cam't connect(), because they don't know where to. They just expect commands. Thank you very much again.
– Kalki70
Jan 17, 2017 at 21:57
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.