python实现网页多人对战版五子棋(websocket)
联网版五子棋
前个星期websocket做了一个联机版五子棋,一路坑坑洼洼没有搞清楚原理就开始写,饶了好多弯路。现在完工了腾出时间来整理的同时再对websocket做一些总结,同时对那些边边角角的知识做记录。
能够异步通信,首先想到的当然是http轮询,但是学长要求用websocket实现,所以也没多想就用websocket呗,反正都是从0开始。
( 使用的是 < Sanic + websocket > )
Blueprint:
Gobang||—— mysql | |—— __init__.py| |—— config.py # 数据库配置文件| |—— create_table_ifnot.py # 建表文件| |—— DB_api.py # 数据库接口|—— play # 判断输赢 | |—— __init__.py| |—— battle.py|—— view| |—— __init.py| |—— conn.py # 与前端的接口| |—— conf.py # 配置文件|—— test.py # 测试接口用|—— usr.py # 主程序入口
< usr. py >
在与前端测试的过程中发生了跨域问题,网上找了一种简单的办法,但仍不理解其原理。
关于跨域问题的理解(但是现在还没写,只有百度)
from sanic import Sanic, response
import asynciofrom view.conn import bp1app = Sanic(__name__)
app.blueprint(bp1)@app.route('/test')
async def test(request):return response.text('success')@app.middleware('response') # 添加即可解决跨域问题
async def add(request, response):response.headers["Access-Control-Allow-Origin"] = '*'response.headers["Access-Control-Allow-Methods"] = "POST,GET,OPTIONS"response.headers["Access-Control-Expose-Headers"] = "Set-Cookie,token"response.headers["Access-Control-Allow-Headsanic_demo/main/usr.py:16ers"] = "Set-Cookie,token"response.headers["Access-Control-Allow-Credentials"] = "true"if __name__ == '__main__':app.run(debug=False, host='0.0.0.0', port=9001)
< conf. py >
battleground = {} # 存放正在对战的双方名字-->you:oppobattledic = {} # 存放参加战斗的玩家名字-->ip:name
battle_wsdic = {} # battle的ws字典-->ip:ws
waitdic = {} # 存放闲置玩家名字-->ip:name
online_wsdic = {} # ws列表-->ip:wsaside_quit = 0 # 一方退出而另一方仍在游戏中的情况
< conn. py>
这里是这个项目的核心:对各个用户做分类,其中列表(字典)的处理尤为重要
设计的是两个状态:等待和对战
from sanic import Blueprintfrom view.conf import battledic, battleground, battle_wsdic, \online_wsdic, waitdic
from play.battle import *
from mysql.DB_api import db_insertbp1 = Blueprint('bp1')
battlinglist = {} # bothside:battledata@bp1.websocket('/register')
async def register(request, ws):_quit = 0ip = request.iptry:waitdic.pop(ip)except:passwhile True:print('返回register')name = await ws.recv()_namelist = []for key, value in waitdic.items():_namelist.append(value)if _namelist.count(name) == 0 and name != 'null' and name:ip = request.ipwaitdic[ip] = name # ip-名字await ws.send('positive')print('your name:', name)print('waitdic:', waitdic)else:await ws.send('negative')print('<<<<<<<<<<<<<<<<<输入的名字不合理')if _quit == 1:break@bp1.websocket('/waiting')
async def waiting(request, ws):your_ip = request.iptry:name = waitdic[your_ip] # <<<<<<<<<<<<<<<<<<<<手机解锁唤醒的时候会有进入waiting的操作,如果waitdic中没有这个ip就会报错try: # 获得这个人的在play里面的旧名字oldname = battledic[your_ip]print('一边重置')print('旧名字:', oldname)except:pass_namelist = []for value in waitdic.values():_namelist.append(value)if _namelist.count(name) == 1 and name != 'null' and name:online_wsdic[your_ip] = ws # 将ip与ws绑定# 给所有人广播名单await broadcast_all()quit_ = 0_oldrt = Nonewait = 0while True:_blackgoin = 0_whitegoin = 0_blackquit = 0_whitequit = 0for black, white in battleground.items():if black == name:_oldrt = None_blackgoin = 1battledic[your_ip] = nameawait ws.send('yes')if white == name:_oldrt = None_whitegoin = 1battledic[your_ip] = nameawait ws.send('yes')# 开始play后等待状态睡眠,双方if _blackgoin == 1:while True:if your_ip not in battledic:_blackquit = 1if _blackquit == 1:await broadcast_all() # 给所有人广播名单break # 如果从play中退出来则breakawait asyncio.sleep(1)if _whitegoin == 1:while True:if your_ip not in battledic:_whitequit = 1if _whitequit == 1:await broadcast_all() # 给所有人广播名单breakawait asyncio.sleep(1)if quit_ == 1: # 暂定永久等待breakif wait == 10:wait = 0 # wait用于控制台显示频率用print(name, '正在等待')wait += 1await asyncio.sleep(0.3) # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<调整延迟时间以快速开始游戏else:print(name, '名字错误!!!')except KeyError:print('<<<<<<<<<<<<这个ip不存在')finally:try:online_wsdic.pop(your_ip)while True:_quit = 0if your_ip in waitdic:_quit = 1if _quit == 1:breakawait asyncio.sleep(0.1)waitdic.pop(your_ip)# 给所有人广播名单await broadcast_all()except KeyError:print('<<<<<<<<<<<<这个ip不存在')@bp1.websocket('/choose')
async def choose(request, ws):_list = [] # 制作等待列表ip_black = 0ip_white = 0black = await ws.recv() # 接收当前用户white = await ws.recv() # 接收他选择的对手for i, j in waitdic.items(): # for循环里最好用i, j_list.append(j)if black == j:ip_black = iif white == j:ip_white = iif _list.count(white) == 1 and _list.count(black) == 1 and white != 'null' and white and white != black:# 如果对方在等待列表里# 如果对手是存在的# 如果对手不是自己# 则绑定战斗双方await online_wsdic[ip_white].send('confirm')answer = await online_wsdic[ip_white].recv()if answer == 'yes':await online_wsdic[ip_black].send('pass')await online_wsdic[ip_white].send('pass')battleground[black] = whiteelif answer == 'no':await online_wsdic[ip_black].send('nopass')await online_wsdic[ip_white].send('nopass')else:print(1111)else:print(black)print(white)print('<--你的选择有问题')@bp1.websocket('/play')
async def play(request, ws):flag_black = Falseflag_white = False_onlyOnce_b = 0_onlyOnce_w = 0black = Noneip_black = Nonewhite = Noneip_white = Nonebothside = NoneTom = NoneJerry = Nonetry:endbattle = -1your_ip = request.ipyour_name = battledic[your_ip]battle_wsdic[your_ip] = ws # 把当前用户ws添加进来# 获取双方的名字for key, value in battleground.items():if your_name == key or your_name == value:black = key # 根据battledic获得双方的名字, black是主动点击方white = valuebothside = black + whitebattlinglist[bothside] = []# 开始下棋前的确认等待(添加等待)while True:for i, j in battle_wsdic.items():if battledic[i] == black and _onlyOnce_b == 0:flag_black = True_onlyOnce_b = 1if battledic[i] == white and _onlyOnce_w == 0:flag_white = True_onlyOnce_w = 1if flag_white and flag_black:print(your_name, '可以开始了')breakelse:print(your_name, '在等待对手确认')await asyncio.sleep(0.3) # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<调整延迟时间以快速开始游戏# 获得双方的ip,这部分只能放在添加等待的后面for i, j in battledic.items():if j == white: # 根据battlelist获得双方的ipip_white = iif j == black:ip_black = i# 将这个人从waitlist去除并广播waitdic.pop(your_ip)await broadcast_all()# 下棋部分Tom = battle_wsdic[ip_black] # 获得双方的ws对象Jerry = battle_wsdic[ip_white]while True:# tom先下棋rec_t = await Tom.recv()tom_play = dumps(rec_t)if tom_play:print(black + ' 下棋', ':', tom_play)battlinglist[bothside].append(tom_play) # 存放有效数据endbattle = check_win(tom_play, battlinglist[bothside]) # 判断tom这一步棋赢了没await Jerry.send(rec_t) # 传给jerry 前端传过来的数据,是str型# ******************黑方获胜*****************if endbattle == 1:print('黑色方' + black + '获胜')print('战斗数据:', battlinglist[bothside])db_insert(locations=battlinglist[bothside], win=endbattle) # 向数据库中写入该盘对句break# jerry下棋rec_j = await Jerry.recv()jerry_play = dumps(rec_j)if jerry_play:print(white + ' 下棋', ':', jerry_play) # 显示这个人下的那个子battlinglist[bothside].append(jerry_play)endbattle = check_win(jerry_play, battlinglist[bothside])await Tom.send(rec_j) # 传给Tom 前端传来的数据,是str型# ******************白方获胜*****************if endbattle == 0:print('白色方' + white + '获胜')print('战斗数据:', battlinglist[bothside])db_insert(locations=battlinglist[bothside], win=endbattle) # 向数据库中写入该盘对句breakfinally:try:battleground.pop(black) # 战斗列表去出这两个人battlinglist.pop(bothside) # 将这一盘的战斗数据清除battledic.pop(ip_black)battledic.pop(ip_white)battle_wsdic.pop(ip_black)battle_wsdic.pop(ip_white)waitdic[ip_black] = blackwaitdic[ip_white] = whiteexcept KeyError:print('<<<<<<<<<<<<<<)await Tom.send('battle complete!')await Jerry.send('battle complete!')async def broadcast_all():broadlist = []for i, j in waitdic.items():broadlist.append(j)rt = ','.join(str(i) for i in broadlist) # 将数组转换成字符串发送for i in online_wsdic.values(): # 给所有用户同时更新用户信息界面try: # 加一个try于是不阻塞了~~await i.send(rt)except:print('<<<<<<<<<<<<<<
五子棋的判断输赢部分
def dumps(_str):# 将前端发过来的str转换成list_list = _str.join(',0')k = _list[2: -2].split(',')rt = []for i in range(len(k)):rt.append(int(k[i]))return rt# 判断这颗棋子有没有赢
def check_win(data, battlinglist):print(battlinglist)z1 = [] # -方向棋子z2 = [] # |方向棋子z3 = [] # /方向棋子z4 = [] # \方向棋子flag = False # 5连标志符color = data[2] # 取出颜色x = data[0] # 取出xy = data[1] # 取出yfor i in battlinglist:xi = i[0] # 取出循环的xyi = i[1] # 取出循环的ycolori = i[2]# 只判断相同颜色if colori != color:continueif x == xi: # |z1.append(yi)flag = check_five(z1)if flag:print('z1:', z1)breakif y == yi: # ——z2.append(xi)flag = check_five(z2)if flag:print('z2:', z2)breakif xi - x == yi - y: # /z3.append(xi)flag = check_five(z3)if flag:print('z3:', z3)breakif xi - x == y - yi: # \z4.append(xi)flag = check_five(z4)if flag:print('z4:', z4)breakelse:continueif flag:return colorelse:return -1# 判断5连
def check_five(list1):list1.sort()count_one = []counter = []rt = Falsemm = 0for i in list1:if i == list1[0]:mm = icontinuecount_one.append(i - mm)mm = i# print(count_one)for i in range(len(count_one)-3):counter = count_one[i:i + 4]if counter.count(1) == 4:print('5连')rt = Truereturn rt
写这个demo的时候大部分时间都是在写conn. py,虽然看似逻辑简单,但是其中的大大小小的坑真的是无数。
希望自己日后能在总体布局和代码质量上有所突破
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
