EMQX消息存储Redis源码分析(3)
emqx_backend_redis 模块是主要的工作模块,内部定义了redis统计,模块载入和卸载接口。
1、emqx_metrics 模块
2、emqx 模块的钩子函数
3、emqx_topic 模块
-module(emqx_backend_redis).
-include("../include/emqx_backend_redis.hrl").-include_lib("../include/emqx.hrl").%% API
-export([pool_name/1,compile_cmd/1]).-export([register_metrics/0,load/0,unload/0]).-export([on_client_connected/3,%% 已经连接的客户端on_subscribe_lookup/3,%% 订阅查询on_client_disconnected/4, %% 断开的客户查询on_message_fetch_for_queue/4,on_message_fetch_for_pubsub/4,on_message_fetch_for_keep_latest/4,on_retain_lookup/4,%% 保留消息查询on_message_publish/2, %% 消息发布on_message_store_keep_latest/2,%%消息存储on_message_retain/2,on_retain_delete/2,%% 保留消息删除on_message_acked_for_queue/3,on_message_acked_for_pubsub/3,on_message_acked_for_keep_latest/3,run_redis_commands/2, %% redis命令run_redis_commands/3,run_redis_commands/4]).pool_name(Pool)->list_to_atom(lists:concat([emqx_backend_redis,'_',Pool])).%% 统计
register_metrics() ->[emqx_metrics:new(MetricName) || MetricName<- ['backend.redis.client_connected','backend.redis.subscribe_lookup','backend.redis.client_disconnected','backend.redis.message_fetch_for_queue','backend.redis.message_fetch_for_pubsub','backend.redis.message_fetch_for_keep_latest','backend.redis.retain_lookup','backend.redis.message_publish','backend.redis.message_store_keep_latest','backend.redis.message_retain','backend.redis.retain_delete','backend.redis.message_acked_for_queue','backend.redis.message_acked_for_pubsub','backend.redis.message_acked_for_keep_latest']].%% 载入钩子函数,在emqx_backend_redis.conf文件里面定义的
load()->%% 钩子函数列表HookList = parse_hook(application:get_env(emqx_backend_redis, hooks, [])),%% 从HookList取出参数,然后执行函数lists:foreach(fun ({Hook,Action,Pool,Filter,ExpiredTime}) ->case proplists:get_value(<<"function">>, Action) ofundefined ->Commands = proplists:get_value(<<"commands">>, Action, []),Cmds = compile_cmds(Commands),load_(Hook, run_redis_commands, ExpiredTime, {Filter, Pool, Cmds});Fun ->load_(Hook, b2a(Fun), ExpiredTime, {Filter, Pool, undefined})endend, HookList),io:format("~s is loaded.~n", [emqx_backend_redis]),ok.load_(Hook, Fun, ExpiredTime, {Filter, Pool, undefined}) ->load_(Hook, Fun, ExpiredTime, {Filter, Pool});load_(Hook, Fun, ExpiredTime, Params) ->case Hook of'client.connected' ->emqx:hook(Hook, fun emqx_backend_redis:Fun/3, [Params]);'client.disconnected' ->emqx:hook(Hook, fun emqx_backend_redis:Fun/4, [Params]);'session.subscribed' ->emqx:hook(Hook, fun emqx_ba
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
