class ZMQ::Socket

ZeroMQ message socket.

Description

Key differences to conventional sockets

Generally speaking, conventional sockets present a synchronous interface to either connection-oriented reliable byte streams (SOCK_STREAM), or connection-less unreliable datagrams (SOCK_DGRAM). In comparison, 0MQ sockets present an abstraction of an asynchronous message queue, with the exact queueing semantics depending on the socket type in use. Where conventional sockets transfer streams of bytes or discrete datagrams, 0MQ sockets transfer discrete messages.

0MQ sockets being asynchronous means that the timings of the physical connection setup and teardown, reconnect and effective delivery are transparent to the user and organized by 0MQ itself. Further, messages may be queued in the event that a peer is unavailable to receive them.

Conventional sockets allow only strict one-to-one (two peers), many-to-one (many clients, one server), or in some cases one-to-many (multicast) relationships. With the exception of ZMQ::PAIR, 0MQ sockets may be connected to multiple endpoints using connect(), while simultaneously accepting incoming connections from multiple endpoints bound to the socket using bind(), thus allowing many-to-many relationships.

Socket Types

The following sections present the socket types defined by 0MQ, grouped by the general messaging pattern which is built from related socket types.

Request-reply pattern

The request-reply pattern is used for sending requests from a client to one or more instances of a service, and receiving subsequent replies to each request sent.

ZMQ::REQ

A socket of type ZMQ::REQ is used by a client to send requests to and receive replies from a service. This socket type allows only an alternating sequence of send(request) and subsequent recv(reply) calls. Each request sent is load-balanced among all services, and each reply received is matched with the last issued request.

When a ZMQ::REQ socket enters an exceptional state due to having reached the high water mark for all services, or if there are no services at all, then any send() operations on the socket shall block until the exceptional state ends or at least one service becomes available for sending; messages are not discarded.

Summary of ZMQ::REQ characteristics

Compatible peer sockets

ZMQ::REP

Direction

Bidirectional

Send/receive pattern

Send, Receive, Send, Receive, …

Outgoing routing strategy

Load-balanced

Incoming routing strategy

Last peer

ZMQ::HWM option action

Block

ZMQ::REP

A socket of type ZMQ::REP is used by a service to receive requests from and send replies to a client. This socket type allows only an alternating sequence of recv(request) and subsequent send(reply) calls. Each request received is fair-queued from among all clients, and each reply sent is routed to the client that issued the last request.

When a ZMQ::REP socket enters an exceptional state due to having reached the high water mark for a client, then any replies sent to the client in question shall be dropped until the exceptional state ends.

Summary of ZMQ::REP characteristics

Compatible peer sockets

ZMQ::REQ

Direction

Bidirectional

Send/receive pattern

Receive, Send, Receive, Send, …

Incoming routing strategy

Fair-queued

Outgoing routing stratagy

Last peer

ZMQ::HWM option action

Drop

Publish-subscribe pattern

The publish-subscribe pattern is used for one-to-many distribution of data from a single publisher to multiple subscribers in a fanout fashion.

ZMQ::PUB

A socket of type ZMQ::PUB is used by a publisher to distribute data. Messages sent are distributed in a fanout fashion to all connected peers. The recv() function is not implemented for this socket type.

When a ZMQ::PUB socket enters an exceptional state due to having reached the high water mark for a subscriber, then any messages that would be sent to the subscriber in question shall instead be dropped until the exceptional state ends.

Summary of ZMQ::PUB characteristics

Compatible peer sockets

ZMQ::SUB

Direction

Unidirectional

Send/receive pattern

Send only

Incoming routing strategy

N/A

Outgoing routing strategy

Fanout

ZMQ::HWM option action

Drop

ZMQ::SUB

A socket of type ZMQ::SUB is used by a subscriber to subscribe to data distributed by a publisher. Initially a ZMQ::SUB socket is not subscribed to any messages, use the ZMQ::SUBSCRIBE option of setsockopt() to specify which messages to subscribe to. The send() function is not implemented for this socket type.

Summary of ZMQ::SUB characteristics

Compatible peer sockets

ZMQ::PUB

Direction

Unidirectional

Send/receive pattern

Receive only

Incoming routing strategy

Fair-queued

Outgoing routing strategy

N/A

ZMQ::HWM option action

N/A

Pipeline pattern

The pipeline pattern is used for distributing data to nodes arranged in a pipeline. Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is load-balanced among all connected nodes.

ZMQ::PUSH

A socket of type ZMQ::PUSH is used by a pipeline node to send messages to downstream pipeline nodes. Messages are load-balanced to all connected downstream nodes. The ZMQ::recv() function is not implemented for this socket type.

When a ZMQ::PUSH socket enters an exceptional state due to having reached the high water mark for all downstream nodes, or if there are no downstream nodes at all, then any send() operations on the socket shall block until the exceptional state ends or at least one downstream node becomes available for sending; messages are not discarded.

Summary of ZMQ::PUSH characteristics

Compatible peer sockets

ZMQ::PULL

Direction

Unidirectional

Send/receive pattern

Send only

Incoming routing strategy

N/A

Outgoing routing strategy

Load-balanced

ZMQ::HWM option action

Block

ZMQ::PULL

A socket of type ZMQ::PULL is used by a pipeline node to receive messages from upstream pipeline nodes. Messages are fair-queued from among all connected upstream nodes. The send() function is not implemented for this socket type.

Summary of ZMQ::PULL characteristics

Compatible peer sockets

ZMQ::PUSH

Direction

Unidirectional

Send/receive pattern

Receive only

Incoming routing strategy

Fair-queued

Outgoing routing strategy

N/A

ZMQ::HWM option action

N/A

Exclusive pair pattern

The exclusive pair is an advanced pattern used for communicating exclusively between two peers.

ZMQ::PAIR

A socket of type ZMQ::PAIR can only be connected to a single peer at any one time. No message routing or filtering is performed on messages sent over a ZMQ::PAIR socket.

When a ZMQ::PAIR socket enters an exceptional state due to having reached the high water mark for the connected peer, or if no peer is connected, then any send() operations on the socket shall block until the peer becomes available for sending; messages are not discarded.

NOTE ZMQ::PAIR sockets are experimental, and are currently missing several features such as auto-reconnection.

Summary of ZMQ::PAIR characteristics

Compatible peer sockets

ZMQ::PAIR

Direction

Bidirectional

Send/receive pattern

Unrestricted

Incoming routing strategy

N/A

Outcoming routing strategy

N/A

ZMQ::HWM option action

Block

Public Instance Methods

bind(endpoint) → nil click to toggle source

Creates an endpoint for accepting connections and binds it to the socket.

The endpoint argument is a string consisting of two parts as follows: transport://address. The transport part specifies the underlying transport protocol to use. The meaning of the address part is specific to the underlying transport protocol selected.

The following transports are defined:

inproc

local in-process (inter-thread) communication transport

ipc

local inter-process communication transport

tcp

unicast transport using TCP

pgm, epgm

reliable multicast transport using PGM

With the exception of ZMQ:PAIR sockets, a single socket may be connected to multiple endpoints using connect(), while simultaneously accepting incoming connections from multiple endpoints bound to the socket using bind(). Refer to ZMQ::Socket for a description of the exact semantics involved when connecting or binding a socket to multiple endpoints.

static VALUE socket_bind (VALUE self_, VALUE addr_)
{
    void * s;
    Data_Get_Struct (self_, void, s);
    Check_Socket (s);

    int rc = zmq_bind (s, rb_string_value_cstr (&addr_));
    if (rc != 0) {
        rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
        return Qnil;
    }

    return Qnil;
}
close() → nil click to toggle source

Destroys the 0MQ socket. Any outstanding messages physically received from the network but not yet received by the application with #recv shall be discarded. The behaviour for discarding messages sent by the application with #send but not yet physically transferred to the network depends on the value of the ZMQ::LINGER socket option for the socket.

static VALUE socket_close (VALUE self_)
{
    void * s = NULL;
    Data_Get_Struct (self_, void, s);
    if (s != NULL) {
        int rc = zmq_close (s);
        if (rc != 0) {
            rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
            return Qnil;
        }

        DATA_PTR (self_) = NULL;
    }
    return Qnil;
}
connect(endpoint) → nil click to toggle source

Connects the socket to the endpoint specified by the endpoint argument.

The endpoint argument is a string consisting of two parts as follows: transport://address. The transport part specifies the underlying transport protocol to use. The meaning of the address part is specific to the underlying transport protocol selected.

The following transports are defined:

inproc

local in-process (inter-thread) communication transport

ipc

local inter-process communication transport

tcp

unicast transport using TCP

pgm, epgm

reliable multicast transport using PGM

With the exception of ZMQ:PAIR sockets, a single socket may be connected to multiple endpoints using connect(), while simultaneously accepting incoming connections from multiple endpoints bound to the socket using bind(). Refer to ZMQ::Socket for a description of the exact semantics involved when connecting or binding a socket to multiple endpoints.

NOTE: The connection will not be performed immediately, but as needed by 0MQ. Thus, a successful invocation of connect() does not indicate that a physical connection was or can actually be established.

static VALUE socket_connect (VALUE self_, VALUE addr_)
{
    void * s;
    Data_Get_Struct (self_, void, s);
    Check_Socket (s);

    int rc = zmq_connect (s, rb_string_value_cstr (&addr_));
    if (rc != 0) {
        rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
        return Qnil;
    }

    return Qnil;
}
getsockopt(option) click to toggle source

Retrieves the value of the specified 0MQ socket option.

The following options can be retrievesd with the getsockopt() function:

ZMQ::RCVMORE: More message parts to follow

The ZMQ::RCVMORE option shall return a boolean value indicating if the multi-part message currently being read from the specified socket has more message parts to follow. If there are no message parts to follow or if the message currently being read is not a multi-part message a value of false shall be returned. Otherwise, a value of true shall be returned.

Refer to send() and recv() for a detailed description of sending/receiving multi-part messages.

Option value type

Boolean

Option value unit

N/A

Default value

N/A

Applicable socket types

all

ZMQ::HWM: Retrieve high water mark

The ZMQ::HWM option shall retrieve the high water mark for the specified socket. The high water mark is a hard limit on the maximum number of outstanding messages 0MQ shall queue in memory for any single peer that the specified socket is communicating with.

If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, 0MQ shall take appropriate action such as blocking or dropping sent messages. Refer to the individual socket descriptions in ZMQ::Socket for details on the exact action taken for each socket type.

The default ZMQ::HWM value of zero means “no limit”.

Option value type

Integer

Option value unit

messages

Default value

0

Applicable socket types

all

ZMQ::SWAP: Retrieve disk offload size

The ZMQ::SWAP option shall retrieve the disk offload (swap) size for the specified socket. A socket which has ZMQ::SWAP set to a non-zero value may exceed it’s high water mark; in this case outstanding messages shall be offloaded to storage on disk rather than held in memory.

The value of ZMQ::SWAP defines the maximum size of the swap space in bytes.

Option value type

Integer

Option value unit

bytes

Default value

0

Applicable socket types

all

ZMQ::AFFINITY: Retrieve I/O thread affinity

The ZMQ::AFFINITY option shall retrieve the I/O thread affinity for newly created connections on the specified socket.

Affinity determines which threads from the 0MQ I/O thread pool associated with the socket’s context shall handle newly created connections. A value of zero specifies no affinity, meaning that work shall be distributed fairly among all 0MQ I/O threads in the thread pool. For non-zero values, the lowest bit corresponds to thread 1, second lowest bit to thread 2 and so on. For example, a value of 3 specifies that subsequent connections on socket shall be handled exclusively by I/O threads 1 and 2.

See also ZMQ::Context#new for details on allocating the number of I/O threads for a specific context.

Option value type

Integer

Option value unit

N/A (bitmap)

Default value

0

Applicable socket types

all

ZMQ::IDENTITY: Retrieve socket identity

The ZMQ::IDENTITY option shall retrieve the identity of the specified socket. Socket identity determines if existing 0MQ infastructure (message queues, forwarding devices) shall be identified with a specific application and persist across multiple runs of the application.

If the socket has no identity, each run of an application is completely separate from other runs. However, with identity set the socket shall re-use any existing 0MQ infrastructure configured by the previous run(s). Thus the application may receive messages that were sent in the meantime, message queue limits shall be shared with previous run(s) and so on.

Identity can be at least one byte and at most 255 bytes long. Identities starting with binary zero are reserved for use by 0MQ infrastructure.

Option value type

String

Option value unit

N/A

Default value

nil

Applicable socket types

all

ZMQ::RATE: Retrieve multicast data rate

The ZMQ::Rate option shall retrieve the maximum send or receive data rate for multicast transports using the specified socket.

Option value type

Integer

Option value unit

kilobits per second

Default value

100

Applicable socket types

all, when using multicast transports

ZMQ::RECOVERY_IVL: Get multicast recovery interval

The ZMQ::RECOVERY_IVL option shall retrieve the recovery interval for multicast transports using the specified socket. The recovery interval determines the maximum time in seconds that a receiver can be absent from a multicast group before unrecoverable data loss will occur.

Option value type

Integer

Option value unit

seconds

Default value

10

Applicable socket types

all, when using multicast transports

ZMQ::RECOVERY_IVL_MSEC: Get multicast recovery interval in milliseconds

The ZMQ::RECOVERY_IVL_MSEC option shall retrieve the recovery interval, in milliseconds (ms) for multicast transports using the specified socket. The recovery interval determines the maximum time in milliseconds that a receiver can be absent from a multicast group before unrecoverable data loss will occur.

For backward compatibility, the default value of ZMQ::RECOVERY_IVL_MSEC is -1 indicating that the recovery interval should be obtained from the ZMQ::RECOVERY_IVL option. However, if the ZMQ::RECOVERY_IVL_MSEC value is not zero, then it will take precedence, and be used.

Option value type

Integer

Option value unit

milliseconds

Default value

-1

Applicable socket types

all, when using multicast transports

ZMQ::MCAST_LOOP: Control multicast loopback

The ZMQ::MCAST_LOOP option controls whether data sent via multicast transports can also be received by the sending host via loopback. A value of zero indicates that the loopback functionality is disabled, while the default value of 1 indicates that the loopback functionality is enabled. Leaving multicast loopback enabled when it is not required can have a negative impact on performance. Where possible, disable ZMQ::MCAST_LOOP in production environments.

Option value type

Boolean

Option value unit

N/A

Default value

true

Applicable socket types

all, when using multicast transports

ZMQ::SNDBUF: Retrieve kernel transmit buffer size

The ZMQ::SNDBUF option shall retrieve the underlying kernel transmit buffer size for the specified socket. A value of zero means that the OS default is in effect. For details refer to your operating system documentation for the SO_SNDBUF socket option.

Option value type

Integer

Option value unit

bytes

Default value

0

Applicable socket types

all

ZMQ::RCVBUF: Retrieve kernel receive buffer size

The ZMQ::RCVBUF option shall retrieve the underlying kernel receive buffer size for the specified socket. A value of zero means that the OS default is in effect. For details refer to your operating system documentation for the SO_RCVBUF socket option.

Option value type

Integer

Option value unit

bytes

Default value

0

Applicable socket types

all

ZMQ::LINGER: Retrieve linger period for socket shutdown

The ZMQ::LINGER option shall retrieve the linger period for the specified socket. The linger period determines how long pending messages which have yet to be sent to a peer shall linger in memory after a socket is closed with #close, and further affects the termination of the socket’s context with ZMQ#close(). The following outlines the different behaviours:

  • The default value of −1 specifies an infinite linger period. Pending messages shall not be discarded after a call to #close; attempting to terminate the socket’s context with ZMQ::Context#close shall block until all pending messages have been sent to a peer.

  • The value of 0 specifies no linger period. Pending messages shall be discarded immediately when the socket is closed with #close.

  • Positive values specify an upper bound for the linger period in milliseconds. Pending messages shall not be discarded after a call to #close; attempting to terminate the socket’s context with ZMQ::Context#close shall block until either all pending messages have been sent to a peer, or the linger period expires, after which any pending messages shall be discarded.

Option value type

Integer

Option value unit

milliseconds

Default value

-1 (infinite)

Applicable socket types

all

ZMQ::RECONNECT_IVL: Retrieve reconnection interval

The ZMQ::RECONNECT_IVL option shall retrieve the reconnection interval for the specified socket. The reconnection interval is the maximum period 0MQ shall wait between attempts to reconnect disconnected peers when using connection−oriented transports.

Option value type

Integer

Option value unit

milliseconds

Default value

100

Applicable socket types

all, only for connection-oriented transports

ZMQ::RECONNECT_IVL_MAX: Retrieve maximum reconnection interval

The ZMQ::RECONNECT_IVL_MAX option shall set the maximum reconnection interval for the specified socket. This is the maximum period ØMQ shall wait between attempts to reconnect. On each reconnect attempt, the previous interval shall be doubled untill ZMQ::RECONNECT_IVL_MAX is reached. This allows for exponential backoff strategy. Default value means no exponential backoff is performed and reconnect interval calculations are only based on ZMQ::RECONNECT_IVL.

Values less than ZMQ::RECONNECT_IVL will be ignored.

Option value type

Integer

Option value unit

milliseconds

Default value

0 (only use RECONNECT_IVL)

Applicable socket types

all, only for connection-oriented transports

ZMQ::BACKLOG: Retrieve maximum length of the queue of outstanding connections

The ZMQ::BACKLOG option shall retrieve the maximum length of the queue of outstanding peer connections for the specified socket; this only applies to connection−oriented transports. For details refer to your operating system documentation for the listen function.

Option value type

Integer

Option value unit

connections

Default value

100

Applicable socket types

all, only for connection-oriented transports

ZMQ::FD: Retrieve file descriptor associated with the socket

The ZMQ::FD option shall retrieve the file descriptor associated with the specified socket. The returned file descriptor can be used to integrate the socket into an existing event loop; the 0MQ library shall signal any pending events on the socket in an edge−triggered fashion by making the file descriptor become ready for reading.

Note

The ability to read from the returned file descriptor does not necessarily indicate that messages are available to be read from, or can be written to, the underlying socket; applications must retrieve the actual event state with a subsequent retrieval of the ZMQ::EVENTS option.

Caution

The returned file descriptor is intended for use with a poll or similar system call only. Applications must never attempt to read or write data to it directly.

Option value type

int on POSIX systems, SOCKT on Windows

Option value unit

N/A

Default value

N/A

Applicable socket types

all

ZMQ::EVENTS: Retrieve socket event state

The ZMQ::EVENTS option shall retrieve the event state for the specified socket. The returned value is a bit mask constructed by OR’ing a combination of the following event flags:

ZMQ::POLLIN

Indicates that at least one message may be received from the specified socket without blocking.

ZMQ::POLLOUT

Indicates that at least one message may be sent to the specified socket without blocking.

The combination of a file descriptor returned by the ZMQ::FD option being ready for reading but no actual events returned by a subsequent retrieval of the ZMQ::EVENTS option is valid; applications should simply ignore this case and restart their polling operation/event loop.

Option value type

uint32_t

Option value unit

N/A (flags)

Default value

N/A

Applicable socket types

all

static VALUE socket_getsockopt (VALUE self_, VALUE option_)
{
    int rc = 0;
    VALUE retval;
    void * s;
    
    Data_Get_Struct (self_, void, s);
    Check_Socket (s);
  
    switch (NUM2INT (option_)) {
#if ZMQ_VERSION >= 20100
        case ZMQ_FD:
        {
#ifdef _WIN32
                        SOCKET optval;
#else
                        int optval;
#endif
            size_t optvalsize = sizeof(optval);

            rc = zmq_getsockopt (s, NUM2INT (option_), (void *)&optval,
                                 &optvalsize);

            if (rc != 0) {
              rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
              return Qnil;
            }

            if (NUM2INT (option_) == ZMQ_RCVMORE)
                retval = optval ? Qtrue : Qfalse;
            else
                retval = INT2NUM (optval);
        }
        break;
        case ZMQ_EVENTS:
        {
            uint32_t optval;
            size_t optvalsize = sizeof(optval);

            rc = zmq_getsockopt (s, NUM2INT (option_), (void *)&optval,
                                 &optvalsize);

            if (rc != 0) {
              rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
              return Qnil;
            }

            if (NUM2INT (option_) == ZMQ_RCVMORE)
                retval = optval ? Qtrue : Qfalse;
            else
                retval = INT2NUM (optval);
        }
        break;
        case ZMQ_TYPE:
        case ZMQ_LINGER:
        case ZMQ_RECONNECT_IVL:
        case ZMQ_BACKLOG:
#if ZMQ_VERSION >= 20101
        case ZMQ_RECONNECT_IVL_MAX:
        case ZMQ_RECOVERY_IVL_MSEC:
#endif
        {
            int optval;
            size_t optvalsize = sizeof(optval);

            rc = zmq_getsockopt (s, NUM2INT (option_), (void *)&optval,
                                 &optvalsize);

            if (rc != 0) {
              rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
              return Qnil;
            }

            if (NUM2INT (option_) == ZMQ_RCVMORE)
                retval = optval ? Qtrue : Qfalse;
            else
                retval = INT2NUM (optval);
        }
        break;
#endif
    case ZMQ_RCVMORE:
    case ZMQ_HWM:
    case ZMQ_SWAP:
    case ZMQ_AFFINITY:
    case ZMQ_RATE:
    case ZMQ_RECOVERY_IVL:
    case ZMQ_MCAST_LOOP:
    case ZMQ_SNDBUF:
    case ZMQ_RCVBUF:
        {
            int64_t optval;
            size_t optvalsize = sizeof(optval);

            rc = zmq_getsockopt (s, NUM2INT (option_), (void *)&optval,
                                 &optvalsize);

            if (rc != 0) {
              rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
              return Qnil;
            }

            if (NUM2INT (option_) == ZMQ_RCVMORE)
                retval = optval ? Qtrue : Qfalse;
            else
                retval = INT2NUM (optval);
        }
        break;
    case ZMQ_IDENTITY:
        {
            char identity[255];
            size_t optvalsize = sizeof (identity);

            rc = zmq_getsockopt (s, NUM2INT (option_), (void *)identity,
                                 &optvalsize);

            if (rc != 0) {
              rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
              return Qnil;
            }

            if (optvalsize > sizeof (identity))
                optvalsize = sizeof (identity);

            retval = rb_str_new (identity, optvalsize);
        }
        break;
    default:
        rb_raise (exception_type, "%s", zmq_strerror (EINVAL));
        return Qnil;
    }
  
    return retval;
}
recv(flags=0) → message | nil click to toggle source

Receives a message from the socket. If there are no messages available on the socket, the recv() function shall block until the request can be satisfied. The flags argument is a combination of the flags defined below:

ZMQ::NOBLOCK

Specifies that the operation should be performed in

non-blocking mode. If there are no messages available on the socket, the recv() function shall fail and return nil.

Multi-part messages

A 0MQ message is composed of 1 or more message parts. 0MQ ensures atomic delivery of messages; peers shall receive either all message parts of a message or none at all.

The total number of message parts is unlimited.

An application wishing to determine if a message is composed of multiple parts does so by retrieving the value of the ZMQ::RCVMORE socket option on the socket it is receiving the message from, using getsockopt(). If there are no message parts to follow, or if the message is not composed of multiple parts, ZMQ::RCVMORE shall report a value of false. Otherwise, ZMQ::RCVMORE shall report a value of true, indicating that more message parts are to follow.

static VALUE socket_recv (int argc_, VALUE* argv_, VALUE self_)
{
    VALUE flags_;
    
    rb_scan_args (argc_, argv_, "01", &flags_);

    void * s;
    Data_Get_Struct (self_, void, s);
    Check_Socket (s);

    int flags = NIL_P (flags_) ? 0 : NUM2INT (flags_);

    zmq_msg_t msg;
    int rc = zmq_msg_init (&msg);
    assert (rc == 0);

#ifdef HAVE_RUBY_INTERN_H
    if (!(flags & ZMQ_NOBLOCK)) {
        struct zmq_send_recv_args recv_args;
        recv_args.socket = s;
        recv_args.msg = &msg;
        recv_args.flags = flags;
        rb_thread_blocking_region (zmq_recv_blocking, (void*) &recv_args,
            NULL, NULL);
        rc = recv_args.rc;
    }
    else
#endif
        rc = zmq_recv (s, &msg, flags);
    if (rc != 0 && zmq_errno () == EAGAIN) {
        rc = zmq_msg_close (&msg);
        assert (rc == 0);
        return Qnil;
    }

    if (rc != 0) {
        rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
        rc = zmq_msg_close (&msg);
        assert (rc == 0);
        return Qnil;
    }

    VALUE message = rb_str_new ((char*) zmq_msg_data (&msg),
        zmq_msg_size (&msg));
    rc = zmq_msg_close (&msg);
    assert (rc == 0);
    return message;
}
send(message, flags=0) → true | false click to toggle source

Queue the message referenced by the msg argument to be send to the socket. The flags argument is a combination of the flags defined below:

ZMQ::NOBLOCK

Specifies that the operation should be performed in

non-blocking mode. If the message cannot be queued on the socket, the function shall fail and return false.

ZMQ::SNDMORE

Specifies that the message being sent is a multi-part message,

and that further message parts are to follow. Refer to the section regarding multi-part messages below for a detailed description.

NOTE: A successful invocation of send() does not indicate that the message has been transmitted to the network, only that it has been queued on the socket and 0MQ has assumed responsibility for the message.

Multi-part messages

A 0MQ message is composed of 1 or more message parts. 0MQ ensures atomic delivery of messages; peers shall receive either all message parts of a message or none at all.

The total number of message parts is unlimited.

An application wishing to send a multi-part message does so by specifying the ZMQ::SNDMORE flag to send(). The presence of this flag indicates to 0MQ that the message being sent is a multi-part message and that more message parts are to follow. When the application wishes to send the final message part it does so by calling send() without the ZMQ::SNDMORE flag; this indicates that no more message parts are to follow.

This function returns true if successful, false if not.

static VALUE socket_send (int argc_, VALUE* argv_, VALUE self_)
{
    VALUE msg_, flags_;
    
    rb_scan_args (argc_, argv_, "11", &msg_, &flags_);

    void * s;
    Data_Get_Struct (self_, void, s);
    Check_Socket (s);

    Check_Type (msg_, T_STRING);

    int flags = NIL_P (flags_) ? 0 : NUM2INT (flags_);

    zmq_msg_t msg;
    int rc = zmq_msg_init_size (&msg, RSTRING_LEN (msg_));
    if (rc != 0) {
        rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
        return Qnil;
    }
    memcpy (zmq_msg_data (&msg), RSTRING_PTR (msg_), RSTRING_LEN (msg_));

#ifdef HAVE_RUBY_INTERN_H
    if (!(flags & ZMQ_NOBLOCK)) {
        struct zmq_send_recv_args send_args;
        send_args.socket = s;
        send_args.msg = &msg;
        send_args.flags = flags;
        rb_thread_blocking_region (zmq_send_blocking, (void*) &send_args, NULL, NULL);
        rc = send_args.rc;
    }
    else
#endif
        rc = zmq_send (s, &msg, flags);
    if (rc != 0 && zmq_errno () == EAGAIN) {
        rc = zmq_msg_close (&msg);
        assert (rc == 0);
        return Qfalse;
    }

    if (rc != 0) {
        rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
        rc = zmq_msg_close (&msg);
        assert (rc == 0);
        return Qnil;
    }

    rc = zmq_msg_close (&msg);
    assert (rc == 0);
    return Qtrue;
}
setsockopt(option, value) → nil click to toggle source

Sets the value of a 0MQ socket option.

The following socket options can be set with the setsockopt() function:

ZMQ::HWM: Set high water mark

The ZMQ::HWM option shall set the high water mark for the specified socket. The high water mark is a hard limit on the maximum number of outstanding messages 0MQ shall queue in memory for any single peer that the specified socket is communicating with.

If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, 0MQ shall take appropriate action such as blocking or dropping sent messages. Refer to the individual socket descriptions in ZMQ::Socket for details on the exact action taken for each socket type.

The default ZMQ::HWM value of zero means “no limit”.

Option value type

Integer

Option value unit

messages

Default value

0

Applicable socket types

all

ZMQ::SWAP: Set disk offload size

The ZMQ::SWAP option shall set the disk offload (swap) size for the specified socket. A socket which has ZMQ::SWAP set to a non-zero value may exceed it’s high water mark; in this case outstanding messages shall be offloaded to storage on disk rather than held in memory.

The value of ZMQ::SWAP defines the maximum size of the swap space in bytes.

Option value type

Integer

Option value unit

bytes

Default value

0

Applicable socket types

all

ZMQ::AFFINITY: Set I/O thread affinity

The ZMQ::AFFINITY option shall set the I/O thread affinity for newly created connections on the specified socket.

Affinity determines which threads from the 0MQ I/O thread pool associated with the socket’s context shall handle newly created connections. A value of zero specifies no affinity, meaning that work shall be distributed fairly among all 0MQ I/O threads in the thread pool. For non-zero values, the lowest bit corresponds to thread 1, second lowest bit to thread 2 and so on. For example, a value of 3 specifies that subsequent connections on socket shall be handled exclusively by I/O threads 1 and 2.

See also ZMQ::Context#new for details on allocating the number of I/O threads for a specific context.

Option value type

Integer

Option value unit

N/A (bitmap)

Default value

0

Applicable socket types

all

ZMQ::IDENTITY: Set socket identity

The ZMQ::IDENTITY option shall set the identity of the specified socket. Socket identity determines if existing 0MQ infastructure (message queues, forwarding devices) shall be identified with a specific application and persist across multiple runs of the application.

If the socket has no identity, each run of an application is completely separate from other runs. However, with identity set the socket shall re-use any existing 0MQ infrastructure configured by the previous run(s). Thus the application may receive messages that were sent in the meantime, message queue limits shall be shared with previous run(s) and so on.

Identity should be at least one byte and at most 255 bytes long. Identities starting with binary zero are reserved for use by 0MQ infrastructure.

Option value type

String

Option value unit

N/A

Default value

nil

Applicable socket types

all

ZMQ::SUBSCRIBE: Establish message filter The ZMQ::SUBSCRIBE option shall establish a new message filter on a ZMQ::SUB socket. Newly created ZMQ::SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.

An empty value of length zero shall subscribe to all incoming messages. A non-empty value shall subscribe to all messages beginning with the specified prefix. Mutiple filters may be attached to a single ZMQ::SUB socket, in which case a message shall be accepted if it matches at least one filter.

Option value type

String

Option value unit

N/A

Default value

N/A

Applicable socket types

ZMQ::SUB

ZMQ::UNSUBSCRIBE: Remove message filter

The ZMQ::UNSUBSCRIBE option shall remove an existing message filter on a ZMQ::SUB socket. The filter specified must match an existing filter previously established with the ZMQ::SUBSCRIBE option. If the socket has several instances of the same filter attached the ZMQ::UNSUBSCRIBE option shall remove only one instance, leaving the rest in place and functional.

Option value type

String

Option value unit

N/A

Default value

nil

Applicable socket types

all

ZMQ::RATE: Set multicast data rate

The ZMQ::RATE option shall set the maximum send or receive data rate for multicast transports such as pgm using the specified socket.

Option value type

Integer

Option value unit

kilobits per second

Default value

100

Applicable socket types

all, when using multicast transports

ZMQ::RECOVERY_IVL: Set multicast recovery interval

The ZMQ::RECOVERY_IVL option shall set the recovery interval for multicast transports using the specified socket. The recovery interval determines the maximum time in seconds that a receiver can be absent from a multicast group before unrecoverable data loss will occur.

<bCaution:</b> Exercise care when setting large recovery intervals as the data needed for recovery will be held in memory. For example, a 1 minute recovery interval at a data rate of 1Gbps requires a 7GB in-memory buffer.

Option value type

Integer

Option value unit

seconds

Default value

10

Applicable socket types

all, when using multicast transports

ZMQ::RECOVERY_IVL_MSEC: Set multicast recovery interval in milliseconds

The ZMQ::RECOVERY_IVL_MSEC option shall set the recovery interval, specified in milliseconds (ms) for multicast transports using the specified socket. The recovery interval determines the maximum time in milliseconds that a receiver can be absent from a multicast group before unrecoverable data loss will occur.

A non-zero value of the ZMQ::RECOVERY_IVL_MSEC option will take precedence over the ZMQ::RECOVERY_IVL option, but since the default for the ZMQ::RECOVERY_IVL_MSEC is -1, the default is to use the ZMQ::RECOVERY_IVL option value.

<bCaution:</b> Exercise care when setting large recovery intervals as the data needed for recovery will be held in memory. For example, a 1 minute recovery interval at a data rate of 1Gbps requires a 7GB in-memory buffer.

Option value type

Integer

Option value unit

milliseconds

Default value

-1

Applicable socket types

all, when using multicast transports

ZMQ::MCAST_LOOP: Control multicast loopback

The ZMQ::MCAST_LOOP option shall control whether data sent via multicast transports using the specified socket can also be received by the sending host via loopback. A value of zero disables the loopback functionality, while the default value of 1 enables the loopback functionality. Leaving multicast loopback enabled when it is not required can have a negative impact on performance. Where possible, disable ZMQ::MCAST_LOOP in production environments.

Option value type

Boolean

Option value unit

N/A

Default value

true

Applicable socket types

all, when using multicast transports

ZMQ::SNDBUF: Set kernel transmit buffer size

The ZMQ::SNDBUF option shall set the underlying kernel transmit buffer size for the socket to the specified size in bytes. A value of zero means leave the OS default unchanged. For details please refer to your operating system documentation for the SO_SNDBUF socket option.

Option value type

Integer

Option value unit

bytes

Default value

0

Applicable socket types

all

ZMQ::RCVBUF: Set kernel receive buffer size

The ZMQ::RCVBUF option shall set the underlying kernel receive buffer size for the socket to the specified size in bytes. A value of zero means leave the OS default unchanged. For details refer to your operating system documentation for the SO_RCVBUF socket option.

Option value type

Integer

Option value unit

bytes

Default value

0

Applicable socket types

all

ZMQ::LINGER: Set linger period for socket shutdown

The ZMQ::LINGER option shall set the linger period for the specified socket. The linger period determines how long pending messages which have yet to be sent to a peer shall linger in memory after a socket is closed with #close, and further affects the termination of the socket’s context with ZMQ#close(). The following outlines the different behaviours:

  • The default value of −1 specifies an infinite linger period. Pending messages shall not be discarded after a call to #close; attempting to terminate the socket’s context with ZMQ::Context#close shall block until all pending messages have been sent to a peer.

  • The value of 0 specifies no linger period. Pending messages shall be discarded immediately when the socket is closed with #close.

  • Positive values specify an upper bound for the linger period in milliseconds. Pending messages shall not be discarded after a call to #close; attempting to terminate the socket’s context with ZMQ::Context#close shall block until either all pending messages have been sent to a peer, or the linger period expires, after which any pending messages shall be discarded.

Option value type

Integer

Option value unit

milliseconds

Default value

-1 (infinite)

Applicable socket types

all

ZMQ::RECONNECT_IVL: Set reconnection interval

The ZMQ::RECONNECT_IVL option shall set the reconnection interval for the specified socket. The reconnection interval is the maximum period 0MQ shall wait between attempts to reconnect disconnected peers when using connection−oriented transports.

Option value type

Integer

Option value unit

milliseconds

Default value

100

Applicable socket types

all, only for connection-oriented transports

ZMQ::BACKLOG: Set maximum length of the queue of outstanding connections

The ZMQ::BACKLOG option shall set the maximum length of the queue of outstanding peer connections for the specified socket; this only applies to connection−oriented transports. For details refer to your operating system documentation for the listen function.

Option value type

Integer

Option value unit

connections

Default value

100

Applicable socket types

all, only for connection-oriented transports

static VALUE socket_setsockopt (VALUE self_, VALUE option_,
    VALUE optval_)
{

    int rc = 0;
    void * s;

    Data_Get_Struct (self_, void, s);
    Check_Socket (s);

    switch (NUM2INT (option_)) {
    case ZMQ_HWM:
    case ZMQ_SWAP:
    case ZMQ_AFFINITY:
    case ZMQ_RATE:
    case ZMQ_RECOVERY_IVL:
    case ZMQ_MCAST_LOOP:
    case ZMQ_SNDBUF:
    case ZMQ_RCVBUF:
            {
                uint64_t optval = FIX2LONG (optval_);

                //  Forward the code to native 0MQ library.
                rc = zmq_setsockopt (s, NUM2INT (option_),
                    (void*) &optval, sizeof (optval));
            }
            break;

#if ZMQ_VERSION >= 20100
        case ZMQ_LINGER:
        case ZMQ_RECONNECT_IVL:
        case ZMQ_BACKLOG:
#if ZMQ_VERSION >= 20101
    case ZMQ_RECONNECT_IVL_MAX:
    case ZMQ_RECOVERY_IVL_MSEC:
#endif
        {
            int optval = FIX2INT (optval_);

            //  Forward the code to native 0MQ library.
            rc = zmq_setsockopt (s, NUM2INT (option_),
                (void*) &optval, sizeof (optval));
        }
        break;
#endif

    case ZMQ_IDENTITY:
    case ZMQ_SUBSCRIBE:
    case ZMQ_UNSUBSCRIBE:

        //  Forward the code to native 0MQ library.
        rc = zmq_setsockopt (s, NUM2INT (option_),
            (void *) StringValueCStr (optval_), RSTRING_LEN (optval_));
        break;

    default:
        rb_raise (exception_type, "%s", zmq_strerror (EINVAL));
        return Qnil;
    }

    if (rc != 0) {
        rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
        return Qnil;
    }

    return self_;
}