我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


源码写得比较绕,需要经过以下模块的调用,入口是emqttd_client模块,handle_info函数负责接收socket的数据:

(注意:gen_server:call是同步调用,cast是异步。call对应的是handle_info,cast对应的是handle_cast。)


1、-module(emqttd_client).

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun  = ParserFun,packet_opts = PacketOpts,proto_state = ProtoState}) ->

2、-module(emqttd_parser).

parse(<>, {none, Limit}) ->

3、-module(emqttd_protocol).

解析SUBSCRIBE消息

process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->Client = client(State),AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],case lists:member(deny, AllowDenies) oftrue ->?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);false ->emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}end;

4、-module(emqttd_session).

subscribe(SessPid, PacketId, TopicTable) ->From   = self(),AckFun = fun(GrantedQos) ->From ! {suback, PacketId, GrantedQos}end,gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     = ClientId,subscriptions = Subscriptions}) ->


5、-module(emqttd).

subscribe(ClientId, Topic, Qos) ->emqttd_server:subscribe(ClientId, Topic, Qos).

6、-module(emqttd_server).

subscribe(ClientId, Topic, Qos) ->From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->pubsub_subscribe_(SubPid, Topic),if_subsciption(State, fun() ->add_subscription_(ClientId, Topic, Qos),set_subscription_stats()end),pubsub_subscribe_(SubPid, Topic) ->case ets:match(subscribed, {SubPid, Topic}) of[] ->emqttd_pubsub:async_subscribe(Topic, SubPid),ets:insert(subscribed, {SubPid, Topic});[_] ->falseend.add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->mnesia:dirty_write(subscription, Subscription).

7、-module(emqttd_pubsub).

把订阅的主题和节点名称存储在Mnesia数据库,这里考虑了集群的情况;

同时也把订阅主题和进程id存储在ETS表。

async_subscribe(Topic, SubPid) when is_binary(Topic) ->cast(pick(Topic), {subscribe, Topic, SubPid}).handle_cast({subscribe, Topic, SubPid}, State) ->add_subscriber_(Topic, SubPid),{noreply, setstats(State)};add_subscriber_(Topic, SubPid) ->case ets:member(subscriber, Topic) offalse ->mnesia:transaction(fun add_topic_route_/2, [Topic, node()]),setstats(topic);true ->okend,ets:insert(subscriber, {Topic, SubPid}).
add_topic_route_(Topic, Node) ->add_topic_(Topic), emqttd_router:add_route(Topic, Node).add_topic_(Topic) ->add_topic_(Topic, []).add_topic_(Topic, Flags) ->Record = #mqtt_topic{topic = Topic, flags = Flags},case mnesia:wread({topic, Topic}) of[]  -> mnesia:write(topic, Record, write);[_] -> okend.

8、-module(emqttd_router).

add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->add_route(#mqtt_route{topic = Topic, node = Node}).add_route_(Route = #mqtt_route{topic = Topic}) ->case mnesia:wread({route, Topic}) of[] ->case emqttd_topic:wildcard(Topic) oftrue  -> emqttd_trie:insert(Topic);false -> okend,mnesia:write(route, Route, write);Records ->case lists:member(Route, Records) oftrue  -> ok;false -> mnesia:write(route, Route, write)endend.




本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部