我的mqtt协议和emqttd开源项目个人理解(8) - 客户端subscribe消息的源码分析
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun = ParserFun,packet_opts = PacketOpts,proto_state = ProtoState}) ->
parse(<>, {none, Limit}) ->
process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->Client = client(State),AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable],case lists:member(deny, AllowDenies) oftrue ->?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);false ->emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}end;
subscribe(SessPid, PacketId, TopicTable) ->From = self(),AckFun = fun(GrantedQos) ->From ! {suback, PacketId, GrantedQos}end,gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,subscriptions = Subscriptions}) ->
subscribe(ClientId, Topic, Qos) ->emqttd_server:subscribe(ClientId, Topic, Qos).
subscribe(ClientId, Topic, Qos) ->From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->pubsub_subscribe_(SubPid, Topic),if_subsciption(State, fun() ->add_subscription_(ClientId, Topic, Qos),set_subscription_stats()end),pubsub_subscribe_(SubPid, Topic) ->case ets:match(subscribed, {SubPid, Topic}) of[] ->emqttd_pubsub:async_subscribe(Topic, SubPid),ets:insert(subscribed, {SubPid, Topic});[_] ->falseend.add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->mnesia:dirty_write(subscription, Subscription).
async_subscribe(Topic, SubPid) when is_binary(Topic) ->cast(pick(Topic), {subscribe, Topic, SubPid}).handle_cast({subscribe, Topic, SubPid}, State) ->add_subscriber_(Topic, SubPid),{noreply, setstats(State)};add_subscriber_(Topic, SubPid) ->case ets:member(subscriber, Topic) offalse ->mnesia:transaction(fun add_topic_route_/2, [Topic, node()]),setstats(topic);true ->okend,ets:insert(subscriber, {Topic, SubPid}).
add_topic_route_(Topic, Node) ->add_topic_(Topic), emqttd_router:add_route(Topic, Node).add_topic_(Topic) ->add_topic_(Topic, []).add_topic_(Topic, Flags) ->Record = #mqtt_topic{topic = Topic, flags = Flags},case mnesia:wread({topic, Topic}) of[] -> mnesia:write(topic, Record, write);[_] -> okend.
add_route(Topic, Node) when is_binary(Topic), is_atom(Node) ->add_route(#mqtt_route{topic = Topic, node = Node}).add_route_(Route = #mqtt_route{topic = Topic}) ->case mnesia:wread({route, Topic}) of[] ->case emqttd_topic:wildcard(Topic) oftrue -> emqttd_trie:insert(Topic);false -> okend,mnesia:write(route, Route, write);Records ->case lists:member(Route, Records) oftrue -> ok;false -> mnesia:write(route, Route, write)endend.
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
