【SDN课设】Ryu控制流量转发,虚拟地址ping,智能调控(智慧城市背景)

SDN课设,使用Ryu来控制,啃源码可真的累呀

Github源码在此

项目背景

​ 目前智慧城市越来越发展,但是随着各种网络设备以及传感器的应用,城市的网络负担也在加大。如果数据分析等操作都由城市的数据中心来做,那么一定网络流量不堪重负。雾计算,是一个好思想,然而,某些时候又需要整体进行调控,我们使用SDN的思想来实现。

​ 这是一个交通路口的信号灯及背后处理器的拓扑,我们都知道,可以通过当前路况来智能调整交通信号灯的时长,使得交通更加通畅。我们设想,在车流量不是很大的时候,摄像头10.1.1.100收集车流信息,可以由局域网中的MCU(10.3.3.150)做决策,调节其时长;然而在上下班高峰期,就需要将车流信息上传到数据处理中心(10.2.2.130),数据中心根据全城的交通情况做决策,这样就避免各个路口自己做决策结果冲突了。

​ 但是,一个摄像头显然不会有这么智能的功能,知道什么时候将流发给局域网的MCU还是远程的数据中心。我们就像,设置一个虚拟IP(10.1.1.77)与对应的虚拟MAC(66:66:66:66:66:66),路由器在收到这个目的地址为虚拟IP的包之后,根据Ryu的控制,选择将其发送给近处还是远处进行处理,而MCU和数据中心可以正常向摄像头发信息。这样以来,通过虚拟IP,对于摄像头来说就是透明的了,它只管发,不用管别的。

肝课设过程中的笔记

方法一

相当于将Ubuntu作为ovs的交换机了,好处:更加模拟现实环境,直接用linux系统。可能存在的问题:这个拓扑在清零的时候可能会有问题,比如arp表清除等(不能做到像mininet仿真的时候每次运行拓扑就从零开始。)

Ubuntu安装ovs配置

Ubuntu安装Open vSwitch

ryu RESTAPI

RYU核心源码解读:

RYU入门

RYU官方文档(其中有对模拟arp的介绍,并且可以查看guide1.dox)

RYU:OpenFlow协议源码

方法二

使用mininet搭建整个拓扑,ryu也是直接作为这个拓扑网络的控制器

优点:方便调试、更新,使用wireshark抓包,ping和tcpdump进行流量创建(这一点方法一也可以做到)

缺点:都是在一个虚拟机上进行的,互联互通有可能成为问题

Ryu+mininet+Wireshark

一个mininet+ryu逐步控制的例子

Mininet与真实网络连接

Mininet中host与外网通信

一个抓包例子

正式开始

ryu源码分析之packet类

ryupacket源码

ryu packet文档

结合这个例子学习构造包下发

逐级封装:

e = ethernet.ethernet(dst='ff:ff:ff:ff:ff:ff',src='08:60:6e:7f:74:e7',ethertype=ether.ETH_TYPE_ARP)
a = arp.arp(hwtype=1, proto=0x0800, hlen=6, plen=4, opcode=2,src_mac='08:60:6e:7f:74:e7', src_ip='192.0.2.1',dst_mac='00:00:00:00:00:00', dst_ip='192.0.2.2')
p = packet.Packet()
p.add_protocol(e)
p.add_protocol(a)

添加action事件下发包

actions = [parser.OFPActionOutput(port=port)]
out =parser.OFPPacketOut(datapath=datapath,buffer_id=ofproto.OFP_NO_BUFFER,in_port=ofproto.OFPP_CONTROLLER,actions=actions,data=data)
datapath.send_msg(out)

Ryu RESTFUL协议远程获取控制面信息

跨网段通信所需

OFPMatch 607行

OFPActinOutput 4682行

OFPInstructionActions 4550行

pkt = packet.Packet(msg.data)eth = pkt.get_protocols(ethernet.ethernet)[0]
arp_pkt = pkt.get_protocol(arp.arp)if eth:eth_dst = eth.dsteth_src = eth.src
if udp_pkt:print('***********收到虚拟udp包*********')match = parser.OFPMatch(eth_dst='66:66:66:66:66:66')print("match:", match)#kwargs = dict(eth_dst='66:66:66:66:66:66')#match1 = parser.OFPMatch(** kwargs)#print("match1:", match1)actions = [parser.OFPActionSetField(ipv4_dst='10.1.1.110'),parser.OFPActionSetField(eth_dst=src_mac),parser.OFPActionOutput(in_port)]# datapath = self._find_dp(int(switch))# assert datapath is not Noneinst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)]# idle and hardtimeout set to 0,making the entry permanent# reference openflow specmod = datapath.ofproto_parser.OFPFlowMod(datapath=datapath,match=match,idle_timeout=0,hard_timeout=0,priority=32768,instructions=inst)datapath.send_msg(mod)

RYU官方文档中,给了基于RESTFUL协议的对外通信API,Ryu RESTFUL协议远程获取控制面信息,封装在了ryu.app.ofctl_rest类中,主要提供了以下几种操作:

1.获取ovs交换机信息

2.获取流信息

3.添加流、添加新的组

由于是基于RESTFUL API,因此我们可以利用python的requests模块很轻松的使用,其方法和使用一般的http方法并无不同。

为了获取RYU所管理的网桥信息,首先根据官方文档得url:“http://localhost:8080:stats/switchs”,使用GET方法,得到所有的网桥dpid,存入列表中。

def __init__(self):self.URL='http://localhost:8080'self.dpid=[]
def getall_sw(self):url=self.URL+'/stats/switches'data = requests.get(url)if data.status_code == 200:data_all = data.json()print(data_all)           for i in data_all:self.dpid.append(i)

在遍历这个列表,使用url:“http://localhost:8080:stats/flows”,再加上列表中的dpid,组合成得到这个dpid网桥的具体信息,通过GET方法,将得到的数据转化成json格式,再提取其中的action模式、流过包数量、流过字节数等字段,得到想要的输出信息。

def getall_sw_data(self):for i in self.dpid:url=self.URL+'/stats/flow/'+str(i)data = requests.get(url)if data.status_code == 200:data_all = data.json()[str(i)]action=data_all[0]['actions']packet_count=data_all[3]['packet_count']priority=data_all[6]['priority']byte_count=data_all[8]['byte_count']print("交换机{}的action模式为{}\n"\"流过的包为{}\n"\"流过的字节为{}\n"\"优先级为{}\n".format(i,action,packet_count,byte_count,priority))

整体的代码分析

按照如图的拓扑:

55行的代码

self.host_mac_to={}

是为了将每个设备的id和其MAC地址相对应,方便后面我们知道是哪个设备。

89行:

self.arp_table.update({VIP:VMAC})
self.arp_table.update({VIP2:VMAC})

将虚拟地址的IP和MAC提前准备好,这样才能够发arp回复。

当摄像头第一次向虚拟地址发包的时候,因为没有mac地址,会发arp请求(578行),我们会给个虚拟arp回复。

随后,摄像头拿到mac成帧,向外发包,通过439行:

# 发往虚拟ip
if pkt_eth.dst == VMAC:flag = 1print('!!!!!发往虚拟ip!!!!!')tsecond = time.asctime(time.localtime(time.time()))second = tsecond.split(' ')[3].split(':')[2]print(second, '!!GET TIME!!')if pkt_ipv4:if second <= '20':  # 前20s发往本地计算单元if pkt_ipv4.src == pc1_ip:print('前20s,pc1发往pc2')dstmac = pc2_macdstip = pc2_ipif pkt_ipv4.src == pc4_ip:print('前20s,pc4发往pc5')dstmac = pc5_macdstip = pc5_ipdata = msg.dataactions = [parser.OFPActionSetField(ipv4_dst=dstip),parser.OFPActionSetField(eth_dst=dstmac), parser.OFPActionOutput(port=2)]self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

就像前面写的,对包进行拆分、组装,更换目的IP,然后再从对应端口发出去。

为了模拟,我们假装每分钟的20s~40s,为上下班高峰期,此时会将报文转发到远程的服务器。

if second > '20' and second <= '40':  # 中间20s发往远程服务器if pkt_ipv4.src == pc1_ip:print('中间20s,pc1发往pc3')smac = gate1_macif pkt_ipv4.src == pc4_ip:print('中间20s,pc4发往pc3')smac = gate3_macactions = [parser.OFPActionOutput(port=gate_port)]data = msg.dataself.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)actions = [parser.OFPActionSetField(eth_src=smac),parser.OFPActionSetField(ipv4_dst=pc3_ip),parser.OFPActionSetField(eth_dst=gate2_mac),parser.OFPActionOutput(port=gate_port)]data = msg.datadatapath = self.datapaths[0]self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)actions = [parser.OFPActionSetField(eth_src=gate2_mac),parser.OFPActionSetField(ipv4_dst=pc3_ip),parser.OFPActionSetField(eth_dst=pc3_mac), parser.OFPActionOutput(port=1)]data = msg.datadatapath = self.datapaths[0]self.sendpaket(parser, ofproto, data, datapath, actions, buffer_id, in_port)

额外功能:

204行:

# ipv6 广播报文目的Mac开头都是33:33“
if '33:33' in dst_mac[:5]:# the controller has not flooded this packet beforeif (src_mac,dst_mac) not in self.flood_history[dpid]:# we remember this packetself.flood_history[dpid].append((src_mac,dst_mac))else:# the controller have flooded this packet before,we do nothing and returnreturn

防止ipv6的广播风暴。

为了能够检测整个网络的效果,还写了一个flask作为后端的web服务器,可以随时在线上查看网络的负载以及路由器的情况:

import requests
import json
class ryu_restful:def __init__(self):self.URL='http://localhost:8080'self.dpid=[]def getall_sw(self):url=self.URL+'/stats/switches'data = requests.get(url)if data.status_code == 200:data_all = data.json()print(data_all)           for i in data_all:self.dpid.append(i)def getall_sw_data(self):for i in self.dpid:url=self.URL+'/stats/flow/'+str(i)data = requests.get(url)if data.status_code == 200:data_all = data.json()[str(i)]action=data_all[0]['actions']packet_count=data_all[3]['packet_count']priority=data_all[6]['priority']byte_count=data_all[8]['byte_count']print("交换机{}的action模式为{}\n"\"流过的包为{}\n"\"流过的字节为{}\n"\"优先级为{}\n".format(i,action,packet_count,byte_count,priority))def add_flow(self):url=sel.URL+'/stats/flowentry/add'data= '{"dpid": 1,"cookie": 1,"cookie_mask": 1,"table_id": 0,"idle_timeout": 30,'\' "hard_timeout": 30, "priority": 11111,"flags": 1,"match":{"in_port":1}, "actions":[{"type":"OUTPUT","port": 2}]}'res=requests.post(url=url,data=data)print(res.text)def get_flow(self):for i in self.dpid:url=self.URL+'/stats/flow'+str(i)data=requests.get(url)if data.status_code==200:data_all=data.json()[str(i)][0]match=data_all['match']actions=data_all['actions']print('流{}的匹配表为{},动作为{}'.format(i,match,actions))if __name__ == "__main__":ryu=ryu_restful()ryu.getall_sw()ryu.getall_sw_data()ryu.get_flow()

最后的效果

向虚拟地址发UDP报文:

远程监测网络运行情况:


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部