数据血缘通常是指数据产生的链路,其采集主要通过自动解析(存储过程、SQL、ETL过程等文件)结合人工收集的方式实现。本文不涉及数据血缘如何获取,只对如何通过python操作mongodb并可视化数据血缘关系提供一些思路。
首先通过pymongo连接本地数据库,并插入测试数据
import pymongomyclient = pymongo.MongoClient("mongodb://127.0.0.1:27017/")mydb = myclient["world"]mycol = mydb["areas"]mylist = [{ "_id" : 1, "name" : "Asia" },{ "_id" : 2, "name" : "China", "belongsto" : "Asia" },{ "_id" : 3, "name" : "ZheJiang", "belongsto" : "China" },{ "_id" : 4, "name" : "HangZhou", "belongsto" : "ZheJiang" },{ "_id" : 5, "name" : "NingBo", "belongsto" : "ZheJiang" },{ "_id" : 6, "name" : "Xihu", "belongsto" : "HangZhou" }]x = mycol.insert_many(mylist)for x in mycol.find(): print(x)
输出结果:
{'_id': 1, 'name': 'Asia'}
{'_id': 2, 'name': 'China', 'belongsto': 'Asia'} {'_id': 3, 'name': 'ZheJiang', 'belongsto': 'China'} {'_id': 4, 'name': 'HangZhou', 'belongsto': 'ZheJiang'} {'_id': 5, 'name': 'NingBo', 'belongsto': 'ZheJiang'} {'_id': 6, 'name': 'Xihu', 'belongsto': 'HangZhou'}递归查询name='Xihu'这个节点的父节点
pipeline = [{'$graphLookup': { 'from': "areas", 'startWith': "$belongsto", 'connectFromField': "belongsto", 'connectToField': "name", 'as': "belongHierarchy" }},{'$match': {'name' : 'Xihu'}}]for doc in (mycol.aggregate(pipeline)): print (doc)
输出结果:
{'_id': 6, 'name': 'Xihu', 'belongsto': 'HangZhou','reportingHierarchy':
[{'_id': 1, 'name': 'Asia'},
{'_id': 2, 'name': 'China', 'belongsto': 'Asia'},
{'_id': 3, 'name': 'ZheJiang', 'belongsto': 'China'},
{'_id': 4, 'name': 'HangZhou', 'belongsto': 'ZheJiang'}]}
解析输出结果并可视化展示节点关系
import networkx as nximport matplotlib.pyplot as plt rs = list(mycol.aggregate(pipeline))def get_relation(rs): G = nx.DiGraph() for node in rs: try: G.add_edge(node['name'], node['belongsto']) for item in node['belongHierarchy']: if 'belongsto' in item.keys(): G.add_edge(item['name'], item['belongsto']) else: pass except: pass return GG = get_relation(rs)nx.draw(G, with_labels=True, font_weight='bold')plt.show()
展示area这个collection中所有的节点
pipeline = [{'$graphLookup': { 'from': "areas", 'startWith': "$belongsto", 'connectFromField': "belongsto", 'connectToField': "name", 'as': "belongHierarchy" }}]rs = list(mycol.aggregate(pipeline))def get_relation(rs): G = nx.DiGraph() for node in rs: try: G.add_edge(node['name'], node['belongsto']) for item in node['belongHierarchy']: if 'belongsto' in item.keys(): G.add_edge(item['name'], item['belongsto']) else: pass except: pass return GG = get_relation(rs)nx.draw(G, with_labels=True, font_weight='bold')plt.show()