python-流行病学调查报告内容提取、梳理

  在各类传染病疫情应急处理过程中,快速、准确、高效的抓取流调报告中的相关内容,分析接触关系,是各项防控措施制定的理论基础。
  在参与的疫情防控中,现场流调人员会收集感染者、密接者的流行病学信息,反馈到疫情指挥中心后由相关人员汇总、整理和分析,在整理分析的过程中我遇到了两种实际场景:
  一、在若干收集到的流掉报告,目的是提取流调对象的接触信息,整理流调对象之间的相互接触关系。
  实际过程中,当流调对象较少,可以直接人工提取,整理接触信息是较为简单的工作。但是随着流调对象的增加,每增加一个,都需要在所有的流调报告中搜寻一遍,整理接触信息(注:除了流调报告之外,还有流调对象的id文件。流调报告中包括密接信息,但是因为没有确诊感染,因此这些密接者并没有流调报告,但是在确诊之后,会进行详细的流行病学调查),工作量会成倍增加。因此想到可以用python进行提取和整理。
  整体思路是:第一步,建立已经收集到的所有流调报告的字典库;第二步,根据id文件确定每个流调对象出现在每个流调报告中的次数;第三步,根据第二步的结果建立关系对子,判定标准为如果甲出现在乙的流调报告中,则判定甲乙有关系;第四部,将第三步的关系对子扩展成关系链,我的思路是以一个点开始,遍历整个关系对子,一个一个的添加。

import pandas as pd
import os
from docx import Document
import numpy as np# 读取word文档内容
def word_read(docx_name):file_text = []file = Document(docx_name)for para in file.paragraphs:file_text.append(para.text)table_text = []file = Document(docx_name)for tables in file.tables:for i in range(0, len(tables.rows)):for j in range(0, len(tables.columns)):table_text.append(str(tables.cell(i, j).text))return file_text + table_text# 建立所有文件字典库
dir_list = os.listdir('D:\\xxx\\xxx\\xxx')dictionary = {}
warn_mess = []
for docx_name in dir_list:file_name = 'D:\\xxx\\xxx\\xxx\\' + docx_nametry:Word_Text = word_read(file_name)except Exception as e:warn_mess.append(str('未识别文档:' + str(e) + file_name))dictionary[docx_name] = Word_Text# 进度条completed = int(dir_list.index(docx_name) * 25 / len(dir_list)) * "■"uncompleted = int((len(dir_list) - dir_list.index(docx_name)) * 25 / len(dir_list)) * "□"schedule = (dir_list.index(docx_name) / len(dir_list)) * 100print("\r{}{}{:.2f}%".format(completed, uncompleted, schedule), end="")for i in warn_mess:print(str(warn_mess.index(i) + 1) + i)# 判断病例是否出现在流调报告里面,若在,返回病例名字和文件名
data = pd.read_csv("xxx.csv", encoding='gbk')
disease_names = data['姓名']name = []
report = []
count = []
for disease_name in disease_names:disease_name = disease_name.replace(' ', '')for key in dictionary.keys():text = ''  # 所有文本text = text.join(dictionary[key])text = text.replace(' ', '')if disease_name in text:name.append(disease_name)report.append(key)count.append(text.count(disease_name))# 提取流调文件名中的病例姓名
correspond_name = []
for name_report in report:for name_disease in disease_names:tem_correspond_name = []text = ''if name_report.count(name_disease.replace(' ', '')) > 0:tem_correspond_name.append(name_disease)breakcorrespond_name.append(text.join(tem_correspond_name))connection_output = pd.DataFrame({'name': name, 'correspond_name': correspond_name, 'report': report, 'count': count, })
connection_output = connection_output.loc[(connection_output['name'] != connection_output['correspond_name'])]connection_output.to_csv('connection_output.csv', encoding='gbk', index=False)# 去重函数
def dup_remove(aim_list):news_li = []for i in aim_list:if i not in news_li:news_li.append(i)return news_liconnection_lib = []
for i in range(connection_output.shape[0]):connection_lib.append(list(connection_output['name'])[i] + '-' + list(connection_output['correspond_name'])[i])cluster_output = {}
for i in disease_names:cluster_output[i] = [i]try:for name in disease_names:tem_list = [name]for m in range(len(connection_lib)):for j in range(m, len(connection_lib)):if len(connection_lib[j]) > 0:if connection_lib[j].split('-')[0] in ''.join(cluster_output[name]) or connection_lib[j].split('-')[1] in \''.join(cluster_output[name]):tem_list.append(connection_lib[j].split('-')[0])tem_list.append(connection_lib[j].split('-')[1])connection_lib[j] = ''cluster_output[name] = dup_remove(tem_list)
except Exception as e:print(e)df = pd.DataFrame.from_dict(cluster_output, orient='index')
df.to_csv('test.csv', encoding='gbk')

  当然这么做的缺点也很明显,就是难以精确地建立一二三代病例关系网,因为关系对子没有上下级特征,建立的关系链也是无方向的。因为疫情处理迅速、及时,没有更大规模的病例产生,因此没有数据支撑代码继续完善。我的完善设想是根据流调报告模板,采用正则提取的方式,找到上下级关系,再利用“NetWorkx”库建立网络关系图。

  二、第二个场景是在流调报告中找到有高风险地区旅居经历的流调对象。
  这个场景就相对简单了,在第一个场景中也有用到类似的操作,基本步骤是建立流调报告字典库,然后遍历值,如果存在则返回键名即可。

import os
from docx import Documentdef word_read(docx_name):file_text = []file = Document(docx_name)for para in file.paragraphs:file_text.append(para.text)table_text = []file = Document(docx_name)for tables in file.tables:for i in range(0, len(tables.rows)):for j in range(0, len(tables.columns)):table_text.append(tables.cell(i, j).text)return file_text + table_text# word_read('D:\\xxx\\xxx\\xxx\\' + os.listdir('D:\\xxx\\xxx\\xxx')[1])# 建立所有文件字典库
dir_list = os.listdir('D:\\xxx\\xxx\\xxx')dictionary = {}
warn_mess = []
for docx_name in dir_list:file_name = 'D:\\xxx\\xxx\\xxx\\' + docx_nametry:word_text = word_read(file_name)except Exception as e:warn_mess.append(str('未识别文档:' + str(e) + file_name))dictionary[docx_name] = word_text# 进度条completed = int(dir_list.index(docx_name) * 25 / len(dir_list)) * "■"uncompleted = int((len(dir_list) - dir_list.index(docx_name)) * 25 / len(dir_list)) * "□"schedule = (dir_list.index(docx_name) / len(dir_list)) * 100print("\r{}{}{:.2f}%".format(completed, uncompleted, schedule), end="")# 判断是否出现在流调报告里面,若在,返回文字和文件名name = []
report = []
count = []def text_found(text):text = text.replace(' ', '')for key in dictionary.keys():word_text = ''  # 所有文本word_text = word_text.join(dictionary[key])word_text = word_text.replace(' ', '')if text in word_text:# name.append(text)# report.append(key)# count.append(word_text.count(text))print(text + '    在流调报告:' + key + '    中出现    ' + str(word_text.count(text)) + '次')text_found('xxx')

  总结,在这两个场景中用到的几个操作:读取word文件(包括word文件中表格的读取)、建立和遍历字典、去重和进度条。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部