python实现网页多人对战版五子棋(websocket)

联网版五子棋

前个星期websocket做了一个联机版五子棋,一路坑坑洼洼没有搞清楚原理就开始写,饶了好多弯路。现在完工了腾出时间来整理的同时再对websocket做一些总结,同时对那些边边角角的知识做记录。

能够异步通信,首先想到的当然是http轮询,但是学长要求用websocket实现,所以也没多想就用websocket呗,反正都是从0开始。

( 使用的是 < Sanic + websocket > )

Blueprint:
	Gobang||—— mysql		|    |—— __init__.py|	   |—— config.py		# 数据库配置文件|	   |—— create_table_ifnot.py	# 建表文件|	   |—— DB_api.py	# 数据库接口|—— play		# 判断输赢	|	   |—— __init__.py|	   |—— battle.py|—— view|	   |—— __init.py|	   |—— conn.py	# 与前端的接口|	   |—— conf.py	# 配置文件|—— test.py	# 测试接口用|—— usr.py	# 主程序入口
< usr. py >

在与前端测试的过程中发生了跨域问题,网上找了一种简单的办法,但仍不理解其原理。
关于跨域问题的理解(但是现在还没写,只有百度)

from sanic import Sanic, response
import asynciofrom view.conn import bp1app = Sanic(__name__)
app.blueprint(bp1)@app.route('/test')
async def test(request):return response.text('success')@app.middleware('response')		# 添加即可解决跨域问题
async def add(request, response):response.headers["Access-Control-Allow-Origin"] = '*'response.headers["Access-Control-Allow-Methods"] = "POST,GET,OPTIONS"response.headers["Access-Control-Expose-Headers"] = "Set-Cookie,token"response.headers["Access-Control-Allow-Headsanic_demo/main/usr.py:16ers"] = "Set-Cookie,token"response.headers["Access-Control-Allow-Credentials"] = "true"if __name__ == '__main__':app.run(debug=False, host='0.0.0.0', port=9001)
< conf. py >
battleground = {}    # 存放正在对战的双方名字-->you:oppobattledic = {}   # 存放参加战斗的玩家名字-->ip:name
battle_wsdic = {}   # battle的ws字典-->ip:ws
waitdic = {}   # 存放闲置玩家名字-->ip:name
online_wsdic = {}     # ws列表-->ip:wsaside_quit = 0  # 一方退出而另一方仍在游戏中的情况
< conn. py>

这里是这个项目的核心:对各个用户做分类,其中列表(字典)的处理尤为重要

设计的是两个状态:等待和对战

from sanic import Blueprintfrom view.conf import battledic, battleground, battle_wsdic, \online_wsdic, waitdic
from play.battle import *
from mysql.DB_api import db_insertbp1 = Blueprint('bp1')
battlinglist = {}  # bothside:battledata@bp1.websocket('/register')
async def register(request, ws):_quit = 0ip = request.iptry:waitdic.pop(ip)except:passwhile True:print('返回register')name = await ws.recv()_namelist = []for key, value in waitdic.items():_namelist.append(value)if _namelist.count(name) == 0 and name != 'null' and name:ip = request.ipwaitdic[ip] = name      # ip-名字await ws.send('positive')print('your name:', name)print('waitdic:', waitdic)else:await ws.send('negative')print('<<<<<<<<<<<<<<<<<输入的名字不合理')if _quit == 1:break@bp1.websocket('/waiting')
async def waiting(request, ws):your_ip = request.iptry:name = waitdic[your_ip]     # <<<<<<<<<<<<<<<<<<<<手机解锁唤醒的时候会有进入waiting的操作,如果waitdic中没有这个ip就会报错try:    # 获得这个人的在play里面的旧名字oldname = battledic[your_ip]print('一边重置')print('旧名字:', oldname)except:pass_namelist = []for value in waitdic.values():_namelist.append(value)if _namelist.count(name) == 1 and name != 'null' and name:online_wsdic[your_ip] = ws  # 将ip与ws绑定# 给所有人广播名单await  broadcast_all()quit_ = 0_oldrt = Nonewait = 0while True:_blackgoin = 0_whitegoin = 0_blackquit = 0_whitequit = 0for black, white in battleground.items():if black == name:_oldrt = None_blackgoin = 1battledic[your_ip] = nameawait ws.send('yes')if white == name:_oldrt = None_whitegoin = 1battledic[your_ip] = nameawait ws.send('yes')# 开始play后等待状态睡眠,双方if _blackgoin == 1:while True:if your_ip not in battledic:_blackquit = 1if _blackquit == 1:await  broadcast_all()  # 给所有人广播名单break   # 如果从play中退出来则breakawait asyncio.sleep(1)if _whitegoin == 1:while True:if your_ip not in battledic:_whitequit = 1if _whitequit == 1:await  broadcast_all()  # 给所有人广播名单breakawait asyncio.sleep(1)if quit_ == 1:      # 暂定永久等待breakif wait == 10:wait = 0    # wait用于控制台显示频率用print(name, '正在等待')wait += 1await asyncio.sleep(0.3)        # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<调整延迟时间以快速开始游戏else:print(name, '名字错误!!!')except KeyError:print('<<<<<<<<<<<<这个ip不存在')finally:try:online_wsdic.pop(your_ip)while True:_quit = 0if your_ip in waitdic:_quit = 1if _quit == 1:breakawait asyncio.sleep(0.1)waitdic.pop(your_ip)# 给所有人广播名单await  broadcast_all()except KeyError:print('<<<<<<<<<<<<这个ip不存在')@bp1.websocket('/choose')
async def choose(request, ws):_list = []   # 制作等待列表ip_black = 0ip_white = 0black = await ws.recv()  # 接收当前用户white = await ws.recv()  # 接收他选择的对手for i, j in waitdic.items():  # for循环里最好用i, j_list.append(j)if black == j:ip_black = iif white == j:ip_white = iif _list.count(white) == 1 and _list.count(black) == 1 and white != 'null' and white and white != black:# 如果对方在等待列表里# 如果对手是存在的# 如果对手不是自己# 则绑定战斗双方await online_wsdic[ip_white].send('confirm')answer = await online_wsdic[ip_white].recv()if answer == 'yes':await online_wsdic[ip_black].send('pass')await online_wsdic[ip_white].send('pass')battleground[black] = whiteelif answer == 'no':await online_wsdic[ip_black].send('nopass')await online_wsdic[ip_white].send('nopass')else:print(1111)else:print(black)print(white)print('<--你的选择有问题')@bp1.websocket('/play')
async def play(request, ws):flag_black = Falseflag_white = False_onlyOnce_b = 0_onlyOnce_w = 0black = Noneip_black = Nonewhite = Noneip_white = Nonebothside = NoneTom = NoneJerry = Nonetry:endbattle = -1your_ip = request.ipyour_name = battledic[your_ip]battle_wsdic[your_ip] = ws     # 把当前用户ws添加进来# 获取双方的名字for key, value in battleground.items():if your_name == key or your_name == value:black = key     # 根据battledic获得双方的名字, black是主动点击方white = valuebothside = black + whitebattlinglist[bothside] = []# 开始下棋前的确认等待(添加等待)while True:for i, j in battle_wsdic.items():if battledic[i] == black and _onlyOnce_b == 0:flag_black = True_onlyOnce_b = 1if battledic[i] == white and _onlyOnce_w == 0:flag_white = True_onlyOnce_w = 1if flag_white and flag_black:print(your_name, '可以开始了')breakelse:print(your_name, '在等待对手确认')await asyncio.sleep(0.3)        # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<调整延迟时间以快速开始游戏# 获得双方的ip,这部分只能放在添加等待的后面for i, j in battledic.items():if j == white:  # 根据battlelist获得双方的ipip_white = iif j == black:ip_black = i# 将这个人从waitlist去除并广播waitdic.pop(your_ip)await broadcast_all()# 下棋部分Tom = battle_wsdic[ip_black]    # 获得双方的ws对象Jerry = battle_wsdic[ip_white]while True:# tom先下棋rec_t = await Tom.recv()tom_play = dumps(rec_t)if tom_play:print(black + ' 下棋', ':', tom_play)battlinglist[bothside].append(tom_play)  # 存放有效数据endbattle = check_win(tom_play, battlinglist[bothside])  # 判断tom这一步棋赢了没await Jerry.send(rec_t)  # 传给jerry 前端传过来的数据,是str型# ******************黑方获胜*****************if endbattle == 1:print('黑色方' + black + '获胜')print('战斗数据:', battlinglist[bothside])db_insert(locations=battlinglist[bothside], win=endbattle)    # 向数据库中写入该盘对句break# jerry下棋rec_j = await Jerry.recv()jerry_play = dumps(rec_j)if jerry_play:print(white + ' 下棋', ':', jerry_play)    # 显示这个人下的那个子battlinglist[bothside].append(jerry_play)endbattle = check_win(jerry_play, battlinglist[bothside])await Tom.send(rec_j)  # 传给Tom 前端传来的数据,是str型# ******************白方获胜*****************if endbattle == 0:print('白色方' + white + '获胜')print('战斗数据:', battlinglist[bothside])db_insert(locations=battlinglist[bothside], win=endbattle)    # 向数据库中写入该盘对句breakfinally:try:battleground.pop(black)    # 战斗列表去出这两个人battlinglist.pop(bothside)  # 将这一盘的战斗数据清除battledic.pop(ip_black)battledic.pop(ip_white)battle_wsdic.pop(ip_black)battle_wsdic.pop(ip_white)waitdic[ip_black] = blackwaitdic[ip_white] = whiteexcept KeyError:print('<<<<<<<<<<<<<<)await Tom.send('battle complete!')await Jerry.send('battle complete!')async def broadcast_all():broadlist = []for i, j in waitdic.items():broadlist.append(j)rt = ','.join(str(i) for i in broadlist)  # 将数组转换成字符串发送for i in online_wsdic.values():  # 给所有用户同时更新用户信息界面try:  # 加一个try于是不阻塞了~~await i.send(rt)except:print('<<<<<<<<<<<<<<)continue

五子棋的判断输赢部分

def dumps(_str):# 将前端发过来的str转换成list_list = _str.join(',0')k = _list[2: -2].split(',')rt = []for i in range(len(k)):rt.append(int(k[i]))return rt# 判断这颗棋子有没有赢
def check_win(data, battlinglist):print(battlinglist)z1 = []     # -方向棋子z2 = []     # |方向棋子z3 = []     # /方向棋子z4 = []     # \方向棋子flag = False    # 5连标志符color = data[2]     # 取出颜色x = data[0]     # 取出xy = data[1]     # 取出yfor i in battlinglist:xi = i[0]   # 取出循环的xyi = i[1]   # 取出循环的ycolori = i[2]# 只判断相同颜色if colori != color:continueif x == xi:		# |z1.append(yi)flag = check_five(z1)if flag:print('z1:', z1)breakif y == yi:		# ——z2.append(xi)flag = check_five(z2)if flag:print('z2:', z2)breakif xi - x == yi - y:	# /z3.append(xi)flag = check_five(z3)if flag:print('z3:', z3)breakif xi - x == y - yi:	# \z4.append(xi)flag = check_five(z4)if flag:print('z4:', z4)breakelse:continueif flag:return colorelse:return -1# 判断5连
def check_five(list1):list1.sort()count_one = []counter = []rt = Falsemm = 0for i in list1:if i == list1[0]:mm = icontinuecount_one.append(i - mm)mm = i# print(count_one)for i in range(len(count_one)-3):counter = count_one[i:i + 4]if counter.count(1) == 4:print('5连')rt = Truereturn rt

写这个demo的时候大部分时间都是在写conn. py,虽然看似逻辑简单,但是其中的大大小小的坑真的是无数。

希望自己日后能在总体布局和代码质量上有所突破


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部