【进程通信】zmq
参考
接口
ctx
socket
connect
bind
send
recv
zmq_setsockopt
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
zmq_setsockopt()函数会对socket参数指定的socket进行设置,设置的属性由option_name参数指定,属性值由参数option_value指定。option_len参数指定属性值的数据存储空间的大小。
实例
- demo1
a) 新建文件rtemsg.ptoto
syntax = "proto3";message IBCallStartCallReq {string phoneNum = 1;}message IBCallCancelCallReq {string phoneNum = 1;}message RTEMessage {int32 message_id = 1;oneof msg_body {IBCallStartCallReq msg_ibcall_start_call_req = 2;IBCallCancelCallReq msg_ibcall_cancel_call_req = 3;}}
protoc --cpp_out=./ rtemsg.proto //编译运行
新建文件rtemsg_test.cc
#include
#include
#include
#include
#include "zmq.h"
#include "rtemsg.pb.h"#define IPC_MSG_IBCALL_START_CALL_REQ 0x1001
#define IPC_MSG_IBCALL_CANCEL_CALL_REQ 0x1002
#define IPC_MSG_QUIT 0x0000#define MSG_URL "tcp://127.0.0.1:6611"void *sub_thread(void *arg)
{void *ctx = zmq_ctx_new();void *sub = zmq_socket(ctx, ZMQ_SUB);std::cout << "connect pub..." << std::endl;int ret = zmq_connect(sub, MSG_URL);if (ret < 0) {std::cout << "zmq_connect() failed: " << ret << std::endl;}ret = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0);if (ret < 0) {std::cout << "subscribe failed: " << errno << std::endl;}int stop = 0;while (!stop) {char buf[1024];std::cout << "reading data..." << std::endl;int nb = zmq_recv(sub, buf, 1024, 0);if (nb < 0) {std::cout << "zmq_recv() ret:" << nb << std::endl;}std::cout << "read " << nb << " bytes" << std::endl;RTEMessage msg;msg.ParseFromArray(buf, nb);switch (msg.message_id()) {case IPC_MSG_IBCALL_START_CALL_REQ:{std::cout << "start ibcall, phone: " << msg.msg_ibcall_start_call_req().phonenum() << std::endl;break;}case IPC_MSG_IBCALL_CANCEL_CALL_REQ:{std::cout << "cancel call " << std::endl;break;}case IPC_MSG_QUIT:{stop = 1;break;}}}return NULL;
}int main()
{void *ctx = zmq_ctx_new();void *pub = zmq_socket(ctx, ZMQ_PUB);int ret = zmq_bind(pub, MSG_URL);if (ret < 0) {std::cout << "zmq_bind() failed: " << errno << std::endl;}pthread_t tid;pthread_create(&tid, NULL, sub_thread, NULL);sleep(2);RTEMessage start_call_msg;start_call_msg.set_message_id(IPC_MSG_IBCALL_START_CALL_REQ);auto req = start_call_msg.mutable_msg_ibcall_start_call_req();req->set_phonenum("13503363383");int sz = start_call_msg.ByteSizeLong();char buf[1024];start_call_msg.SerializeToArray(buf, sz);std::cout << "send start call msg: " << sz << std::endl;ret = zmq_send(pub, buf, sz, 0);if (ret < 0) {std::cout << "zmq_send() failed: " << errno << std::endl;}RTEMessage cancel_call_msg;cancel_call_msg.set_message_id(IPC_MSG_IBCALL_CANCEL_CALL_REQ);sz = cancel_call_msg.ByteSizeLong();cancel_call_msg.SerializeToArray(buf, sz);std::cout << "send cancel call msg: " << sz << std::endl;zmq_send(pub, buf, sz, 0);sleep(3);std::cout << "send quit msg" << std::endl;RTEMessage msg;msg.set_message_id(IPC_MSG_QUIT);sz = msg.ByteSizeLong();msg.SerializeToArray(buf, sz);zmq_send(pub, buf, sz, 0);pthread_join(tid, NULL);std::cout << "program exited" << std::endl;return 0;
}
g++ rtemsg_test.cc rtemsg.pb.cc -lprotobuf -o rtemsg -lpthread -lzmq
//将上次生成的rtemsg。pb.cc和rtemsg_test.cc一起编译运行,注意连接到库
2. pub_sub

#include
#include
#include
#include
#include "zmq.h"#define MSG_URL "tcp://127.0.0.1:6001"
#define n 100int main(){printf("This is server...\n");void *ctx = zmq_ctx_new();void *pub = zmq_socket(ctx, ZMQ_PUB);int ret = zmq_bind(pub, MSG_URL);if(ret < 0) {printf("zmq_bind failed:%d\n", errno);}for(int i = 0;i < n; i++){char buf[256];int sz = snprintf(buf, sizeof(buf), "%d", i);int bc = zmq_send(pub, buf, sz, 0);printf("send %d bytes, data:%s\n", bc, buf);zmq_sleep(1);}zmq_close(pub);zmq_ctx_destroy(ctx);return 0;
}
#include
#include
#include
#include
#include "zmq.h"#define MSG_URL "tcp://127.0.0.1:6001"
#define n 100int main(){printf(" This is client...\n");void *ctx = zmq_ctx_new();void *sub = zmq_socket(ctx, ZMQ_SUB);printf("connect pub...\n");int ret = zmq_connect(sub, MSG_URL);if(ret < 0){printf("zmq_connect failed:%d\n", ret);}ret = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0);if (ret < 0) {printf("subsribe failed:%d\n", errno);}while (1){printf("Start recv data\n");char buf[256];int bc = zmq_recv(sub, buf, sizeof(buf)-1, 0);buf[bc] = '\0';if (bc < 0){printf("Recv failed\n");}printf("Recv %d bytes, data:%s\n", bc, buf);}zmq_close(sub);zmq_ctx_destroy(ctx);return 0;}
3. xpub_xsub

#include
#include
#include
#include "zmq.h"#define MSG_URL_FRONT "tcp://127.0.0.1:6001"
#define MSG_URL_BACK "tcp://127.0.0.1:6002"int main(){printf("This is proxy\n");void *ctx = zmq_ctx_new();void *front = zmq_socket(ctx, ZMQ_XSUB);void *back = zmq_socket(ctx, ZMQ_XPUB);zmq_bind(front, MSG_URL_FRONT);//绑定地址,注意urlzmq_bind(back, MSG_URL_BACK);zmq_proxy(front, back, NULL);//代理zmq_close(front);zmq_close(back);zmq_ctx_destroy(ctx);return 0;}#include
#include
#include
#include
#include "zmq.h"#define MSG_URL "tcp://127.0.0.1:6001"
#define n 100int main(){printf("This is pub\n");void *ctx = zmq_ctx_new();//创建上下文void *pub = zmq_socket(ctx, ZMQ_PUB);//创建套接字int zc = zmq_connect(pub, MSG_URL);//连接地址if ( zc < 0 ){printf("Connect failed\n");return -1;}for ( int i = 0; i < n; i++){char buf[256];int sz = snprintf(buf, 256, "%d", i);int zs = zmq_send(pub, buf, sz, 0);//发送信息printf("send %d bytes, data:%s\n", zs, buf);zmq_sleep(1);}zmq_close(pub);//关闭套接字zmq_ctx_destroy(ctx);//销毁上下文return 0;
}#include
#include
#include
#include
#include "zmq.h"
#define MSG_URL "tcp://127.0.0.1:6002"int main(){void *ctx = zmq_ctx_new();void *sub = zmq_socket(ctx, ZMQ_SUB);int zc = zmq_connect(sub, MSG_URL);if ( zc < 0 ){printf( "Connect failed");return -1;}zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "", 0);//设置过滤属性while (1){printf("Start recv data\n");char buf[256];int zr = zmq_recv(sub, buf, 256, 0);if ( zr < 0 ){printf(" Recv failed\n");return -1;}printf("rect %d bytes, data: %s\n", zr, buf);}zmq_close(sub);zmq_ctx_destroy(ctx);return 0;}
4. xpub_xsub+pb
运行结果:

5. xpub_xsub+pb+pthread
6. 双向发送接收
ps:如果不订阅主题的话,一端就可以进行消息的循环
7. 订阅主题

8. 命令行选择订阅主题

9. 不同系统

思考
- demo1的字节数
- 对于每一次发送,相对应的回应消息
- scanf
- 指针
- char string’
- strlen,sizeof
- strlen
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
