datahub数据血缘相关
在此之前查看了以下大佬的文档:
sqllineage解析FineBI数据集导入Datahub生成血缘_DawsonSally的博客-CSDN博客
如何根据Hive SQL代码生成Datahub数据集及血缘_datahub 血缘_DawsonSally的博客-CSDN博客
sqllineage解析sql列级血缘并提交到datahub_sqllineage 字段_LCriska的博客-CSDN博客
基于DataHub元数据血缘管理实施方案_datahub 血缘_只会写demo的程序猿的博客-CSDN博客
datahub元数据管理平台从安装到使用一站式指南(未完)_datahub 安装_省略号的搬运工的博客-CSDN博客
#遍历mysql将表级别的数据血缘导入datahubimport datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter
import pymysql
import pandas as pd
from sqlalchemy import create_enginecon_engine = create_engine('mysql+pymysql://root:123456@localhost:3306/xueyuan')sql_ = "select replace(mysql,' ','') mysql,replace(ods,' ','') ods from sheet1 where mysql is not null and ods is not null;"df = pd.read_sql_query(sql_, con_engine)
ods = df['mysql'].tolist()
dwd = df['ods'].tolist()
for upstream, downstream in zip(ods, dwd):print(upstream, downstream)lineage_mce = builder.make_lineage_mce([#这里是可以写好几个来源的builder.make_dataset_urn("mysql", "industrial_chain_enterprise_project_125."+upstream), # Upstream],builder.make_dataset_urn("hive", "hudi_ods."+downstream), # Downstream)emitter = DatahubRestEmitter("http://xxx:18080")# Emit metadata!emitter.emit_mce(lineage_mce)
#手动对应测试将表级别的数据血缘导入datahubimport datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitterupstream ='t_zx_company_xxx'
downstream ='ods_company_xxx'lineage_mce = builder.make_lineage_mce([builder.make_dataset_urn("mysql", "xxxx."+upstream), # Upstream],builder.make_dataset_urn("hive", "hudi_ods."+downstream), # Downstream
)
emitter = DatahubRestEmitter("http://xxxxx:18080")# Emit metadata!
emitter.emit_mce(lineage_mce)
#通过sqllineage解析sql 将列级别的数据血缘导入datahubfrom sqllineage.runner import LineageRunner
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (DatasetLineageType,FineGrainedLineage,FineGrainedLineageDownstreamType,FineGrainedLineageUpstreamType,Upstream,UpstreamLineage,
)import sys# 库名设置
def datasetUrn(tableName):return builder.make_dataset_urn("hive", tableName) # platform = hive# 表、列级信息设置
def fieldUrn(tableName, fieldName):return builder.make_schema_field_urn(datasetUrn(tableName), fieldName)# # 目标sql文件路径
# sqlFilePath = sys.argv[1]
#
# sqlFile = open(sqlFilePath, mode='r', encoding='utf-8')sql = """
这里写sql
"""# 获取sql血缘
result = LineageRunner(sql)
print(result)# 获取sql中的下游表名
targetTableName = result.target_tables[0].__str__()# 获取列级血缘
lineage = result.get_column_lineage
# 打印列级血缘结果
result.print_column_lineage()# 字段级血缘list
fineGrainedLineageList = []
# 用于冲突检查的上游list
upStreamsList = []# 遍历列级血缘
for columnTuples in lineage():# 上游listupStreamStrList = []# 下游listdownStreamStrList = []# 逐个字段遍历for column in columnTuples:# 元组中最后一个元素为下游表名与字段名,其他元素为上游表名与字段名# 遍历到最后一个元素,为下游表名与字段名if columnTuples.index(column) == len(columnTuples) - 1:downStreamFieldName = column.raw_name.__str__()downStreamTableName = column.__str__().replace('.' + downStreamFieldName, '').__str__()print('下游表名:' + downStreamTableName+' 下游字段名:' + downStreamFieldName)downStreamStrList.append(fieldUrn(downStreamTableName, downStreamFieldName))else:upStreamFieldName = column.raw_name.__str__()upStreamTableName = column.__str__().replace('.' + upStreamFieldName, '').__str__()print('上游表名:' + upStreamTableName+' 上游字段名:' + upStreamFieldName)upStreamStrList.append(fieldUrn(upStreamTableName, upStreamFieldName))# 用于检查上游血缘是否冲突upStreamsList.append(Upstream(dataset=datasetUrn(upStreamTableName), type=DatasetLineageType.TRANSFORMED))fineGrainedLineage = FineGrainedLineage(upstreamType=FineGrainedLineageUpstreamType.DATASET,upstreams=upStreamStrList,downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,downstreams=downStreamStrList)fineGrainedLineageList.append(fineGrainedLineage)fieldLineages = UpstreamLineage(upstreams=upStreamsList, fineGrainedLineages=fineGrainedLineageList)lineageMcp = MetadataChangeProposalWrapper(entityUrn=datasetUrn(targetTableName), # 下游表名aspect=fieldLineages,
)if __name__ == '__main__':# 调用datahub REST APIemitter = DatahubRestEmitter('http://xxxxx:18080') # datahub gms server# Emit metadata!emitter.emit_mcp(lineageMcp)
#手动测试将列级别的数据血缘导入datahub
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (DatasetLineageType,FineGrainedLineage,FineGrainedLineageDownstreamType,FineGrainedLineageUpstreamType,Upstream,UpstreamLineage,
)
from datahub.metadata.schema_classes import ChangeTypeClassdef datasetUrn(tbl):return builder.make_dataset_urn("hive", tbl)def fldUrn(tbl, fld):return builder.make_schema_field_urn(datasetUrn(tbl), fld)fineGrainedLineages = [FineGrainedLineage(upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,upstreams=[fldUrn("hive.bda_csm_part_main_evt_test_1", "费用单号"), fldUrn("hive.bda_csm_part_main_evt_test_3", "费用单号")],downstreamType=FineGrainedLineageDownstreamType.FIELD,downstreams=[fldUrn("hive.bda_csm_part_main_evt_test", "费用单号")],),FineGrainedLineage(upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,upstreams=[fldUrn("hive.bda_csm_part_main_evt_test_2", "费用类型")],downstreamType=FineGrainedLineageDownstreamType.FIELD,downstreams=[fldUrn("hive.bda_csm_part_main_evt_test", "费用类型")],confidenceScore=0.8,transformOperation="myfunc",),FineGrainedLineage(upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,upstreams=[fldUrn("hive.bda_csm_part_main_evt_test_2", "单据金额"), fldUrn("hive.bda_csm_part_main_evt_test_2", "结算金额"), fldUrn("hive.bda_csm_part_main_evt_test_3", "费用单号")],downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,downstreams=[fldUrn("hive.bda_csm_part_main_evt_test", "单据金额"), fldUrn("hive.bda_csm_part_main_evt_test", "结算金额")],confidenceScore=0.7,),FineGrainedLineage(upstreamType=FineGrainedLineageUpstreamType.DATASET,upstreams=[datasetUrn("hive.bda_csm_part_main_evt_test_3")],downstreamType=FineGrainedLineageDownstreamType.FIELD,downstreams=[fldUrn("hive.bda_csm_part_main_evt_test", "合理金额")],),
]upstream = Upstream(dataset=datasetUrn("hive.bda_csm_part_main_evt_test_1"), type=DatasetLineageType.TRANSFORMED)fieldLineages = UpstreamLineage(upstreams=[upstream], fineGrainedLineages=fineGrainedLineages
)lineageMcp = MetadataChangeProposalWrapper(entityType="dataset",changeType=ChangeTypeClass.UPSERT,entityUrn=datasetUrn("hive.bda_csm_part_main_evt_test"),aspectName="upstreamLineage",aspect=fieldLineages,
)# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://1xxxx:18080")# Emit metadata!
emitter.emit_mcp(lineageMcp)
公司使用spark将处理的处理导入hudi 但是我在使用的过程中 发现并没有解析。关于此如有大佬懂 请教下,未完待续。
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
