25Nosql-Redis-队列,模拟12306购票
一.Redis
1.简介
Redis:REmote DIctionary Server,顾名思义,远程字典服务。
Redis是单线程的。可以响应一秒钟10万次请求。Redis自身是集群的,可以有多个Redis同属于一个集群,它自己区分主从,不需要人工干预。
如果VS中要用到Redis,出来要安装Redis数据库,还要安装类似于ado.net的数据库操作的工具,我们使用sqlserver数据库,就是通过ado.net来操作的,而Redis数据库的操作工具,一个是官方提供的,ServiceStack.Redis,但是它有1小时6000次免费使用的限制,还有一个是免费的,StackExchange.Redis。不管是哪一个,都要在项目中引用。
2.开始使用
1)我们可以写一个Redis的配置文件信息:
///
/// redis配置文件信息
/// 也可以放到配置文件去
///
public sealed class RedisConfigInfo
{/// /// 可写的Redis链接地址/// format:ip1,ip2/// /// 默认6379端口/// public string WriteServerList = "127.0.0.1:6379";/// /// 可读的Redis链接地址/// format:ip1,ip2/// public string ReadServerList = "127.0.0.1:6379";/// /// 最大写链接数/// public int MaxWritePoolSize = 60;/// /// 最大读链接数/// public int MaxReadPoolSize = 60;/// /// 本地缓存到期时间,单位:秒/// public int LocalCacheTime = 180;/// /// 自动重启/// public bool AutoStart = true;/// /// 是否记录日志,该设置仅用于排查redis运行时出现的问题,/// 如redis工作正常,请关闭该项/// public bool RecordeLog = false;}
2)我们把这些配置信息传给一个Redis管理类:
public class RedisManager{/// /// redis配置文件信息/// private static RedisConfigInfo RedisConfigInfo = new RedisConfigInfo();/// /// Redis客户端池化管理/// private static PooledRedisClientManager prcManager;/// /// 静态构造方法,初始化链接池管理对象/// static RedisManager(){CreateManager();}/// /// 创建链接池管理对象/// private static void CreateManager(){string[] WriteServerConStr = RedisConfigInfo.WriteServerList.Split(',');string[] ReadServerConStr = RedisConfigInfo.ReadServerList.Split(',');prcManager = new PooledRedisClientManager(ReadServerConStr, WriteServerConStr,new RedisClientManagerConfig{MaxWritePoolSize = RedisConfigInfo.MaxWritePoolSize,MaxReadPoolSize = RedisConfigInfo.MaxReadPoolSize,AutoStart = RedisConfigInfo.AutoStart,});}/// /// 客户端缓存操作对象/// public static IRedisClient GetClient(){//prcManager是个连接池管理类,有了它,可以随时创建Client连接.return prcManager.GetClient();}}
3)引用的ServiceStack.Redis中,IRedisClient是个接口,对Redis操作的方法,都在这里面,有几百个,我们再封装一个RedisBase接口,里面是常用的方法:
/// /// RedisBase类,是redis操作的基类,继承自IDisposable接口,主要用于释放内存/// public abstract class RedisBase : IDisposable{public IRedisClient iClient { get; private set; }public RedisBase(){//这里有了连接池的连接,就可以完成Redis数据库的操作iClient = RedisManager.GetClient();}//public static IRedisClient iClient { get; private set; }//static RedisBase()//{// iClient = RedisManager.GetClient();//}public virtual void FlushAll(){iClient.FlushAll();}private bool _disposed = false;protected virtual void Dispose(bool disposing){if (!this._disposed){if (disposing){iClient.Dispose();iClient = null;}}this._disposed = true;}public void Dispose(){Dispose(true);GC.SuppressFinalize(this);}/// /// 保存数据DB文件到硬盘/// public void Save(){iClient.Save();}/// /// 异步保存数据DB文件到硬盘/// public void SaveAsync(){iClient.SaveAsync();}}
3.Redis可以存什么
1)string:传统的key-value。如果要存对象,则要把对象序列化后再保存,如果要修改对象的属性,则先读取对象——反序列化——修改属性——序列化——存回Redis。
2)Hashtable:key-List
跟string的区别在于不用反序列化,直接修改某个字段。一个hashid——{key:value;key:value;key:value;},可以一次性查找实体,也可以单个,还可以单个修改。
3)Set:key-List
用哈希表来保持字符串的唯一性,不能有重复(因为哈希算法,让重复的数据,后面的覆盖前面的)。没有先后顺序,存储一些集合性的数据。可以对不同的数据集合进行交、差、并、补集。
作用: 1.共同好友、二度好友;2.利用唯一性,可以统计访问网站的所有独立 IP
4)ZSet:
Sorted Sets是将 Set 中的元素增加了一个权重参数 score,使得集合中的元素能够按 score 进行有序排列
1.带有权重的元素,比如一个游戏的用户得分排行榜,还有评论盖楼
2.比较复杂的数据结构,一般用到的场景不算太多
Redis是放在内存中的,如果断电,是会丢失的,如果用save命令或者定时save,是可以保存在硬盘的。还可以用日志来恢复数据。
Redis还有List(链表)操作,它是双向链表:
Redis这个先进先出的数据结构很适合做异步队列。
二.异步队列
上面是利用异步队列来模拟12306买票的过程。
1.一个客户端的请求,发到服务端,如果处理一个请求要5秒钟,服务器支持并行并且开启100个进程支持100个用户,那么等于它每秒支持20个用户,这对于12306这么多同时在线买票的人,明显是不够的。
2.建立一个队列(Queue),Server先不做任何处理,只是把请求(包含购票人,乘车人,证件号码,出发到达地,时间等信息)按照队列先进先出的方式放入队列(可以是多个队列),这个过程只要5毫秒,则服务器的处理速度提高了1000倍,可以应付大波的请求。
3.有众多的处理服务器,从队列中取数据进行处理,并返回结果给客户端,告诉客户有票/无票。
Redis很适合做异步队列,Redis不但适合做队列先进先出,还可以做栈先进后出、双向链表,它有众多API,可以左插入(PushItemToList),也可以右插入(PrependItemToList)等等。
#region 赋值/// /// 从左侧向list中添加值/// public void LPush(string key, string value){base.iClient.PushItemToList(key, value);}/// /// 从左侧向list中添加值,并设置过期时间/// public void LPush(string key, string value, DateTime dt){base.iClient.PushItemToList(key, value);base.iClient.ExpireEntryAt(key, dt);}/// /// 从左侧向list中添加值,设置过期时间/// public void LPush(string key, string value, TimeSpan sp){base.iClient.PushItemToList(key, value);base.iClient.ExpireEntryIn(key, sp);}/// /// 从左侧向list中添加值/// public void RPush(string key, string value){base.iClient.PrependItemToList(key, value);}/// /// 从右侧向list中添加值,并设置过期时间/// public void RPush(string key, string value, DateTime dt){base.iClient.PrependItemToList(key, value);base.iClient.ExpireEntryAt(key, dt);}/// /// 从右侧向list中添加值,并设置过期时间/// public void RPush(string key, string value, TimeSpan sp){base.iClient.PrependItemToList(key, value);base.iClient.ExpireEntryIn(key, sp);}/// /// 添加key/value/// public void Add(string key, string value){base.iClient.AddItemToList(key, value);}/// /// 添加key/value ,并设置过期时间/// public void Add(string key, string value, DateTime dt){base.iClient.AddItemToList(key, value);base.iClient.ExpireEntryAt(key, dt);}/// /// 添加key/value。并添加过期时间/// public void Add(string key, string value, TimeSpan sp){base.iClient.AddItemToList(key, value);base.iClient.ExpireEntryIn(key, sp);}/// /// 为key添加多个值/// public void Add(string key, List values){base.iClient.AddRangeToList(key, values);}/// /// 为key添加多个值,并设置过期时间/// public void Add(string key, List values, DateTime dt){base.iClient.AddRangeToList(key, values);base.iClient.ExpireEntryAt(key, dt);}/// /// 为key添加多个值,并设置过期时间/// public void Add(string key, List values, TimeSpan sp){base.iClient.AddRangeToList(key, values);base.iClient.ExpireEntryIn(key, sp);}#endregion#region 获取值/// /// 获取list中key包含的数据数量/// public long Count(string key){return base.iClient.GetListCount(key);}/// /// 获取key包含的所有数据集合/// public List Get(string key){return base.iClient.GetAllItemsFromList(key);}/// /// 获取key中下标为star到end的值集合 /// public List Get(string key, int star, int end){return base.iClient.GetRangeFromList(key, star, end);}#endregion#region 阻塞命令/// /// 阻塞命令:从list中keys的尾部移除一个值,并返回移除的值,阻塞时间为sp/// public string BlockingPopItemFromList(string key, TimeSpan? sp){return base.iClient.BlockingDequeueItemFromList(key, sp);}/// /// 阻塞命令:从list中keys的尾部移除一个值,并返回移除的值,阻塞时间为sp/// public ItemRef BlockingPopItemFromLists(string[] keys, TimeSpan? sp){return base.iClient.BlockingPopItemFromLists(keys, sp);}/// /// 阻塞命令:从list中keys的尾部移除一个值,并返回移除的值,阻塞时间为sp/// public string BlockingDequeueItemFromList(string key, TimeSpan? sp){return base.iClient.BlockingDequeueItemFromList(key, sp);}/// /// 阻塞命令:从list中keys的尾部移除一个值,并返回移除的值,阻塞时间为sp/// public ItemRef BlockingDequeueItemFromLists(string[] keys, TimeSpan? sp){return base.iClient.BlockingDequeueItemFromLists(keys, sp);}/// /// 阻塞命令:从list中key的头部移除一个值,并返回移除的值,阻塞时间为sp/// public string BlockingRemoveStartFromList(string keys, TimeSpan? sp){return base.iClient.BlockingRemoveStartFromList(keys, sp);}/// /// 阻塞命令:从list中key的头部移除一个值,并返回移除的值,阻塞时间为sp/// public ItemRef BlockingRemoveStartFromLists(string[] keys, TimeSpan? sp){return base.iClient.BlockingRemoveStartFromLists(keys, sp);}/// /// 阻塞命令:从list中一个fromkey的尾部移除一个值,添加到另外一个tokey的头部,并返回移除的值,阻塞时间为sp/// public string BlockingPopAndPushItemBetweenLists(string fromkey, string tokey, TimeSpan? sp){return base.iClient.BlockingPopAndPushItemBetweenLists(fromkey, tokey, sp);}#endregion#region 删除/// /// 从尾部移除数据,返回移除的数据/// public string PopItemFromList(string key){return base.iClient.PopItemFromList(key);}/// /// 移除list中,key/value,与参数相同的值,并返回移除的数量/// public long RemoveItemFromList(string key, string value){return base.iClient.RemoveItemFromList(key, value);}/// /// 从list的尾部移除一个数据,返回移除的数据/// public string RemoveEndFromList(string key){return base.iClient.RemoveEndFromList(key);}/// /// 从list的头部移除一个数据,返回移除的值/// public string RemoveStartFromList(string key){return base.iClient.RemoveStartFromList(key);}#endregion#region 其它/// /// 从一个list的尾部移除一个数据,添加到另外一个list的头部,并返回移动的值/// public string PopAndPushItemBetweenLists(string fromKey, string toKey){return base.iClient.PopAndPushItemBetweenLists(fromKey, toKey);}#endregion
三.用Redis模拟12306的异步队列处理
1.首先准备两个队列,test和task。前者6个任务,后者10个任务。
Console.WriteLine("*****************************************");{using (RedisListService service = new RedisListService()){service.FlushAll();List stringList = new List();for (int i = 0; i < 10; i++){stringList.Add(string.Format($"放入任务{i}"));}service.LPush("test", "这是一个学生1");service.LPush("test", "这是一个学生2");service.LPush("test", "这是一个学生3");service.LPush("test", "这是一个学生4");service.LPush("test", "这是一个学生5");service.LPush("test", "这是一个学生6");service.Add("task", stringList);Console.WriteLine(service.Count("test"));Console.WriteLine(service.Count("task"));var list = service.Get("test");//这里是做分页的,拿分页数据list = service.Get("task", 2, 4);//开启一个线程,而且是死循环,允许我们不停的输入数据,并添加到test队列的后面Action act = new Action(() =>{while (true){Console.WriteLine("************请输入数据**************");string testTask = Console.ReadLine();service.LPush("test", testTask);}});//这里是让控制台程序不退出,把界面卡住act.EndInvoke(act.BeginInvoke(null, null));}}
2.建立处理器,用来抢上面保存到队列的任务并进行处理
string path = AppDomain.CurrentDomain.BaseDirectory;string tag = path.Split('/', '\\').Last(s => !string.IsNullOrEmpty(s));Console.WriteLine($"这里是 {tag} 启动了。。");using (RedisListService service = new RedisListService()){Action act = new Action(() =>{while (true){var result = service.BlockingPopItemFromLists(new string[] { "test", "task" }, TimeSpan.FromHours(3));Thread.Sleep(5000);Console.WriteLine($"这里是 {tag} 队列获取的消息 {result.Id} {result.Item}");}});act.EndInvoke(act.BeginInvoke(null, null));}
3.我们可以开多个处理器,看它们是怎么抢任务的
上面第2步的控制台程序,我们把exe文件拷贝出来多份:
里面的内容:
对于第一个exe文件,我们打开多个,比如3个(当然如果服务器性能够强劲,可以开N个,无限扩展),结果如下:
我们看到,队列test和task总共有16个任务,被3个控制台程序瓜分后进行处理了。
这就是12306网站,接收请求——放入队列——多个处理程序抢请求并进行处理,这个全流程的模拟。
现在市面上还有很多队列的工具,比如rabitmq,msmq,它们和redis队列差不多,redis队列轻量级。
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
