叮当机器人人脸识别插件开发
玩转了叮当语音机器人之后,想做个人脸识别功能,能把用户的人脸存在云端数据库,有点类似于指纹解锁的功能。
首先在dingdang文件夹里有一个dingdang.py文件,它是整个系统的启动文件。打开它有一个app = Dingdang() 函数。
这就是对dingdang这个类的实例化,之后有个dingdang.run() 这句代码会创造一个新的线程。
所以我们可以利用这一点,做一个类似于登录的“门卫”线程。
在dingdang.py的最末尾加上这样两句话:
if __name__=='__main__':dingdang_main()
因为我们是开机启动的,所以最好写一个脚本文件。比如AutoRestart.sh (.sh文件都是脚本文件)
#!/bin/bash
sleep 5s
while true
dopython2.7 /home/nvidia/AI_Robot/twilight/multi_thread/main.py
done
怎么样是不是很简单呢,因为这就是bash语言的特点。这是一个循环,意思是不断的运行main.py.
之后的话,就正式开始我们main.py的编写了。既然是多线程,就不得不实现线程互斥了:mutex=threading.Lock() 。在此基础上 进入人脸识别函数。 为了方便,我们将所有函数封装到类face_compare和face_detect里面。
在这之前,我们至少要好好看看百度API调用格式。官方是这样的:
import urllib, urllib2, sys
import ssl# client_id 为官网获取的AK, client_secret 为官网获取的SK
host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=【官网获取的AK】&client_secret=【官网获取的SK】'
request = urllib2.Request(host)
request.add_header('Content-Type', 'application/json; charset=UTF-8')
response = urllib2.urlopen(request)
content = response.read()
if (content):print(content)
简单的说明一下就是我们要向百度的服务器发送一个请求,发送的必须是base64编码的图片,然后服务器会返回一个json文件,包含了我们需要的信息。ok下面我们开始着手编写我们的函数了。
实际情况是我们的USB摄像头经常可能被占用,这种情况下我们就需要解除占用,调用camera.release()。
解除占用后,先写入一张图片作为测试。之后我们先安装aip模块,它使得api调用更加简单。再根据返回的人脸相似度确定这是不是人脸。评价标准是message['results']['score']是否大于75。
pip install aip
之后为客户端实例化一个类。self.client=AipFace(self.APP_ID,self.API_KEY,self.SECRET_KEY) 。再调用其中的match函数实现匹配。
#摄像头拍摄人脸和存好人脸对比,CameraID为opencv调用摄像头编号def Face_Compare(self,path,CameraID=0):#从人脸库获取目标人脸图片的FaceToken#Data_Face=self.GetUserPic(groupID,userID)#FaceToken=Data_Face['result']['face_list'][0]['face_token']if gv.camera_cover==True:#这是一个全局变量,与其他文件共享raise Exception('摄像头占用中')gv.camera_cover=True#占用中camera=cv2.VideoCapture(CameraID)#开启摄像头print 'start shot'ret,ShotPic=camera.read()#读取摄像头数据camera.release()#释放摄像头gv.camera_cover=False#未占用print 'shot OK'cv2.imwrite('./ShotBuffe2.jpg',ShotPic)#ShotPic=self.load_img('./ShotBuffer.jpg')message=self.client.match([{'image':self.load_img('./beckham1.jpg'),'image_type':self.imageType},{'image':self.load_img(path),'image_type':self.imageType,}])print message#返回人脸相似度得分if message['result']==None:print 'no person'else:score=message['result']['score']if score>75:print 'the same person'else:print ' not the same'
下面是人脸相关的完整代码,包含了人脸检测,人脸识别以及用户注册的函数:
# -*- coding: utf-8 -*-#人脸有关代码from aip import AipFace
import numpy as np
import base64
import cv2
from AI_Robot.twilight.usb_camera.calibration import *
from AI_Robot.Durant.dingdang.dingdang import *
from AI_Robot.twilight.multi_thread import global_variable as gv#人脸检测
class face_detect(object):def __init__(self):self.APP_ID='11506121'self.API_KEY='你申请的APPKEY'self.SECRET_KEY='你申请的SECRETKEY'self.client=AipFace(self.APP_ID,self.API_KEY,self.SECRET_KEY)self.imageType = "BASE64"#加载图片供传送给aip使用def load_img(self,path):with open(path,'rb') as fp:return base64.b64encode(fp.read())#返回检测到的人脸数量def Message_FaceNum(self,message):return message['result']['face_num']#返回人脸位置信息def Message_FacePosition(self,message):face_list=message['result']['face_list'][0]['location']return face_list['left'],face_list['top'],face_list['width'],face_list['height'],face_list['rotation']#返回人脸识别正确率def Message_Probability(self,message):return message['result']['face_list'][0]['face_probability']#返回年龄def Message_Age(self,message):return message['result']['face_list'][0]['age']#返回美丑打分def Message_Beauty(self,message):return message['result']['face_list'][0]['beauty']#返回表情def Message_Expression(self,message):return message['result']['face_list'][0]['expression']['type']#返回性别def Message_Sex(self,message):return message['result']['face_list'][0]['gender']['type']#返回年龄美丑和表情def Message_ABE(self,message):beauty=self.Message_Beauty(message)age=self.Message_Age(message)emotion=self.Message_Expression(message)return beauty,age,emotion#人脸检测,标记出人脸位置,max_num为最大允许检测数,face_type为照片类型,默认生活照,
def face_detect(self,face,max_num=1,face_type='LIVE',face_field='age,beauty,expression,gender'):#face=self.load_img(path)options={}options["max_face_num"]=max_numoptions['face_field']=face_fieldoptions['face_type']=face_typemessage=self.client.detect(face,self.imageType,options)return message#face_detect主函数def face_detect_main(self,cameraID=1):if gv.camera_cover==True:raise Exception('摄像头占用中')gv.camera_cover=Truecamera=cv2.VideoCapture(cameraID)robot=Dingdang()robot.mic.say(u'年龄评估拍照即将开始,请做好准备')ret,pic=camera.read()camera.release()gv.camera_cover=Falsecv2.imwrite('./ImgBuffer.jpg',pic)img=self.load_img('./ImgBuffer.jpg')message=self.face_detect(img)if message['result']==None:robot.mic.say(u'拍摄质量不合,请重拍')else:age=self.Message_Age(message)robot.mic.say(u'您的年龄大约是'+str(age)+'岁')#人脸比对
class face_compare(object):def __init__(self):self.APP_ID='11506121'self.API_KEY='你申请的APIKEY'self.SECRET_KEY='你申请的SECRETKEY'self.client=AipFace(self.APP_ID,self.API_KEY,self.SECRET_KEY)self.imageType = "BASE64"self.quality_control='LOW'self.liveness_control='NONE'#加载图片供传送给aip使用def load_img(self,path):with open(path,'rb') as fp:return base64.b64encode(fp.read())#用户注册def UserRegister(self,ImgPath,groupID,userID):options={}Img=self.load_img(ImgPath)options['quality_control']=self.quality_controloptions['liveness_control']=self.liveness_controlstatus=self.client.addUser(Img,self.imageType,groupID,userID,options)print statusreturn status[u'result']#用户注册主函数def UserRegister_main(self,cameraID=1,groupID='group1'):user_list=self.client.getGroupUsers(groupID)#print user_listuser_len=len(user_list[u'result'][u'user_id_list'])if gv.camera_cover==True:raise Exception('摄像头占用中')gv.camera_cover=Truecamera=cv2.VideoCapture(cameraID)robot=Dingdang()robot.mic.say(u'即将进行用户注册拍摄,请准备')ret,pic=camera.read()camera.release()gv.camera_cover=Falsecv2.imwrite('./ShotBuffer.jpg',pic)img=self.load_img('./ShotBuffer.jpg')status=self.UserRegister('./ShotBuffer.jpg',groupID,'user'+str(user_len+1))if status==None:robot.mic.say(u'照片拍摄不合格,注册失败') else:robot.mic.say(u'注册成功')#用于获取一个已注册用户的人脸图片信息def GetUserPic(self,userID,groupID='group1'):userPic=self.client.faceGetlist(userID,groupID)return userPic#摄像头拍摄人脸和存好人脸对比,CameraID为opencv调用摄像头编号def Face_Compare(self,path,CameraID=0):#从人脸库获取目标人脸图片的FaceToken#Data_Face=self.GetUserPic(groupID,userID)#FaceToken=Data_Face['result']['face_list'][0]['face_token']if gv.camera_cover==True:raise Exception('摄像头占用中')gv.camera_cover=Truecamera=cv2.VideoCapture(CameraID)print 'start shot'ret,ShotPic=camera.read()camera.release()gv.camera_cover=Falseprint 'shot OK'cv2.imwrite('./ShotBuffe2.jpg',ShotPic)#ShotPic=self.load_img('./ShotBuffer.jpg')message=self.client.match([{'image':self.load_img('./beckham1.jpg'),'image_type':self.imageType},{'image':self.load_img(path),'image_type':self.imageType,}])print message#返回人脸相似度得分if message['result']==None:print 'no person'else:score=message['result']['score']if score>75:print 'the same person'else:print ' not the same'#将摄像头呀拍到的图片和数据库中人脸进行对比,UserID!=None则对指定人进行搜索def Face_Search(self,GroupID='group1',UserID=None,max_num=1,CameraID=0):if gv.camera_cover==True:raise Exception('摄像头占用中')gv.camera_cover=Truecamera=cv2.VideoCapture(CameraID)print 'start shot'ret,ShotPic=camera.read()print 'Shot OK'camera.release()gv.camera_cover=False#ShotPic=SoloShot(0)#print ShotPic.shapecv2.imwrite('./ShotBuffer.jpg',ShotPic)options={}options['quality_control']=self.quality_controloptions['liveness_control']=self.liveness_controloptions['max_user_num']=max_numif UserID!=None:options[user_id]=UserIDmessage=self.client.search(self.load_img('./ShotBuffer.jpg'),self.imageType,GroupID,options)if message['result']==None:print message['error_msg']return Falseelse:score= message['result']['user_list'][0]['score']if score>75:return Trueelse:return Falseif __name__=='__main__':#a=face_detect()#message=a.face_detect('./man2.jpg')#beauty,age,emotion=a.Message_ABE(message)#print 'beauty: ',beauty#print 'age: ',age#print 'emotion: ',emotiona=face_compare()#a.Face_Search()a.UserRegister_main()
下面是main.py了。系统启动以后进入一个认证循环,只有用户通过验证以后才能启动系统。 先开了一个进程robot = dingdang()以便初始化麦克风等,之后认证循环通过以后要及时删去之前的进程 del robot .再用下面这三行代码进行新线程创建和调度。
threads.append(Dingdang_Robot())
threads[1].setDaemon(True)#设置守护者进程
threads[1].start()
主线程A中,创建了子线程B,并且在主线程A中调用了B. setDaemon(),这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出.这就是setDaemon方法的含义,这基本和join是相反的。此外,还有个要特别注意的:必须在start() 方法调用之前设置,如果不设置为守护线程,程序会被无限挂起。而join是A创建B以后等待B执行完再运行。
#coding:utf-8import threading
import time
import wave
import osfrom AI_Robot.Durant.dingdang.client.dingdangpath import *
from AI_Robot.Durant.dingdang.dingdang import *from AI_Robot.twilight.baidu.face import *
from AI_Robot.twilight.multi_thread import global_variable as gv#dingdang机器人线程
class Dingdang_Robot(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):dingdang_main()if __name__=='__main__':camera=cv2.VideoCapture(0)camera.release()time.sleep(0.1)camera=cv2.VideoCapture(1)camera.release()#time.sleep(10)threads=[]#global mutexmutex=threading.Lock() #语音提示robot=Dingdang() face_com=face_compare()#Robot_mic=Mic()persona='DINGDANG'if 'robot_name' in robot.config:persona=robot.config['robot_name']#设置进程结束线程全部终止threads[0].setDaemon(True)#启动线程threads[0].start()time.sleep(1)#检查线程是否全部启动成功for t in threads:#print t.isAlive()if t.isAlive()!=True:os._exit(0)robot.mic.say(u'启动成功,请进行人脸认证')while True:#break #跳过人脸认证print '认证循环'threshold,transcribed=robot.mic.passiveListen(persona)#print 'transcribed:',transcribedif transcribed=='DINGDANG':words=robot.mic.activeListenToAllOptions(threshold)if words is None or words==[]:continueprint 'words:',words[0]if '拍摄' in words[0]:robot.mic.say(u'即将拍照,请准备')status=face_com.Face_Search()if status==True:robot.mic.say(u'人脸识别认证成功')breakelif status==False:robot.mic.say(u'认证失败,请重拍')else:robot.mic.say(u'请先通过人脸认证')transcribed=Nonedel robotthreads.append(Dingdang_Robot())threads[1].setDaemon(True)threads[1].start()time.sleep(0.5)while True:#print threads[0].isAlive()#print threads[0].isAlive()'''if threads[0].isAlive()!=True:os._exit(0)'''time.sleep(10)
以上就是人脸识别的全部代码。欢迎交流 durant1997@foxmail.com
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
