CN110505307B - 一种网间交通流数据的交换方法及系统 - Google Patents
一种网间交通流数据的交换方法及系统 Download PDFInfo
- Publication number
- CN110505307B CN110505307B CN201910811244.0A CN201910811244A CN110505307B CN 110505307 B CN110505307 B CN 110505307B CN 201910811244 A CN201910811244 A CN 201910811244A CN 110505307 B CN110505307 B CN 110505307B
- Authority
- CN
- China
- Prior art keywords
- traffic flow
- flow data
- file
- data
- ftp
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
- 238000000034 method Methods 0.000 title claims abstract description 39
- 238000004891 communication Methods 0.000 claims abstract description 45
- 238000005516 engineering process Methods 0.000 claims abstract description 11
- 238000012544 monitoring process Methods 0.000 claims abstract description 9
- 230000001360 synchronised effect Effects 0.000 claims description 14
- 230000008569 process Effects 0.000 claims description 10
- 238000012545 processing Methods 0.000 claims description 9
- 238000011161 development Methods 0.000 claims description 6
- 238000012986 modification Methods 0.000 claims description 6
- 230000004048 modification Effects 0.000 claims description 6
- 238000006243 chemical reaction Methods 0.000 claims description 5
- 238000007405 data analysis Methods 0.000 claims description 5
- 238000012806 monitoring device Methods 0.000 claims description 4
- 230000009466 transformation Effects 0.000 claims description 2
- 238000004806 packaging method and process Methods 0.000 claims 2
- 239000000872 buffer Substances 0.000 claims 1
- 230000004907 flux Effects 0.000 claims 1
- 230000001052 transient effect Effects 0.000 claims 1
- 230000005540 biological transmission Effects 0.000 abstract description 9
- 238000004364 calculation method Methods 0.000 abstract description 8
- 230000008878 coupling Effects 0.000 abstract description 5
- 238000010168 coupling process Methods 0.000 abstract description 5
- 238000005859 coupling reaction Methods 0.000 abstract description 5
- 238000010586 diagram Methods 0.000 description 5
- 230000008859 change Effects 0.000 description 3
- 238000013461 design Methods 0.000 description 2
- 238000004458 analytical method Methods 0.000 description 1
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/10—File systems; File servers
- G06F16/14—Details of searching files based on file metadata
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/10—File systems; File servers
- G06F16/16—File or folder operations, e.g. details of user interfaces specifically adapted to file systems
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/10—File systems; File servers
- G06F16/17—Details of further file system functions
- G06F16/178—Techniques for file synchronisation in file systems
- G06F16/1794—Details of file format conversion
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/01—Protocols
- H04L67/06—Protocols specially adapted for file transfer, e.g. file transfer protocol [FTP]
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/01—Protocols
- H04L67/12—Protocols specially adapted for proprietary or special-purpose networking environments, e.g. medical networks, sensor networks, networks in vehicles or remote metering networks
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Data Mining & Analysis (AREA)
- Databases & Information Systems (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Signal Processing (AREA)
- Computer Networks & Wireless Communication (AREA)
- Health & Medical Sciences (AREA)
- Computing Systems (AREA)
- General Health & Medical Sciences (AREA)
- Medical Informatics (AREA)
- Library & Information Science (AREA)
- Human Computer Interaction (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
- Data Exchanges In Wide-Area Networks (AREA)
Abstract
本发明提供一种网间交通流数据的交换方法,其可以占用较小的带宽,实现延时低、可靠性高的网络传输,且系统间的耦合性很低,易于扩展和维护。同时本发明也公开了实现一种网间交通流数据的交换方法的系统。其包括:S1:信号控制系统获取道路交通监控设备抓取的阶段交通流数据;S2:将信号控制系统传入的阶段交通流数据,从xml格式转换成json格式;S3:将json格式的阶段交通流数据存储成文本文件,发送到FTP同步目录中;S4:公安信息通信网边界接入平台定期扫描FTP同步目录;S5:在公安网内部,把位于接收端FTP目录中的文本格式的数据解析成json格式的数据,送入kafka队列;S6:公安交通指挥平台采用流式计算技术从kafka队列中消费数据,进行后续计算和操作。
Description
技术领域
本发明涉及智能交通控制技术领域,具体为一种网间交通流数据的交换方法及系统。
背景技术
智能交通管理中需要实时监测城市交通运行态势。如图1所示,为了保障公安交通指挥平台4能够正常开展业务,运行在公安网的公安交通指挥平台4,需要实时获得每个信号控制路口设置的道路交通监控设备1抓取的阶段交通流数据;阶段交通流数据是在一个信号周期中的基本时间单元中的检测到的路口各车道的交通流信息,包含:过车车辆数、当量小汽车数、平均车头时距、饱和度、排队长度、占有率等信息。
道路交通监控设备1位于公安网外部的视频专网,公安交通指挥平台4位于公安网,为了确保数据的保密性,两个网络之间数据交换只能通过公安信息通信网边界接入平台3进行。
现有技术中,通过在视频专网中设置信号控制系统2,信号控制系统2获取道路交通监控设备1抓取的阶段交通流数据,然后依靠公安信息通信网边界接入平台3数据库的同步模块进行数据同步,把阶段交通流数据接入位于公安网内部的公安交通指挥平台4。但信号控制系统2采集的阶段交通流数据具有频率高、数据量大的特点,对传输网络性能要求高,随着交管系统规模的扩展,系统成本不断升高。另外,公安信息通信网边界接入平台3中,数据库同步模块存在延时较高、配置复杂、系统间耦合性高等问题;一旦接入的阶段交通流数据格式、种类、频率等等需求需要调整,则需要修改公安信息通信网边界接入平台3源数据表、目标数据表等多个配置,修改工作较大,不适合现有的智能交通管理的实际需求。
发明内容
为了解决现有的数据交换方法中存在的对传输网络性能要求高、数据需求变化时无法快速调整等问题,本发明提供一种网间交通流数据的交换方法,其可以占用较小的带宽,实现延时低、可靠性高的网络传输,且系统间的耦合性很低,易于扩展和维护。同时本发明也公开了实现一种网间交通流数据的交换方法的系统。
本发明的技术方案是这样的:一种网间交通流数据的交换方法,其包括以下步骤:
S1:信号控制系统获取道路交通监控设备抓取的阶段交通流数据;
其特征在于,其还包括下面的步骤:
S2:将所述信号控制系统传入的所述阶段交通流数据,从xml格式转换成json格式;
S3:将json格式的所述阶段交通流数据存储成文本文件;
设置一个时间阈值TH,且以TH秒/次的频度发送到设置在视频专网的FTP同步目录中;
S4:公安信息通信网边界接入平台定期扫描所述FTP同步目录;
以TH秒/每次的频度,将文本格式的所述阶段交通流数据以FTP协议同步到位于公安网内部的接收端FTP目录中;
S5:在公安网内部,把位于所述接收端FTP目录中的文本格式的所述阶段交通流数据解析成json格式的数据,送入kafka队列;
S6:公安交通指挥平台采用流式计算技术从所述kafka队列中消费数据,进行后续计算和操作。
其进一步特征在于:
步骤S3中,将json格式的所述阶段交通流存储成文本文件的时候,包括如下步骤:
a1:确认系统中已经存在的文本文件的个数N;
a2:如果 N = 0,则新建一个文本文件;
否则,存储文本文件之前确认本次所述阶段交通流数据的接收时间T是否大于所述时间阈值TH:
T = 本次所述阶段交通流数据接收完毕的时间 - 当前系统时间;
a3:如果T大于等于TH,则新建一个文本文件;
否则,如果T小于TH,则在原来的文本文件中,将数据追加写入下面的一行;
步骤S3中,所述时间阈值TH 设为1秒;
步骤S3中,将json格式的所述阶段交通流数据存储成文本文件,存储的文本文件的文件名格式为:xxxx年-xx月-xx日-xx小时-xx分钟-xx秒.log;
步骤S5中,在公安网内部,设置阶段交通流数据接收端,所述阶段交通流数据接收端为基于Flume 的扩展并进行二次开发的服务器;所述阶段交通流数据接收端定期扫描所述接收端FTP目录中的文本格式的所述阶段交通流数据,且进行解析成json格式,送入所述kafka队列,其包括如下步骤:
b1:所述阶段交通流数据接收端定期扫描所述接收端FTP目录,获取所述接收端FTP目录下的所有文本格式的所述阶段交通流数据的元信息,所述元信息包括:文件名和文件大小;
b2:把所有文件的所述元信息按照文件名排序,形成文件临时列表;
b3:从所述文件临时列表中,依次取出每个文件的所述元信息,在本地的已处理文件信息集合的数据进行检索比对;
所述已处理文件信息集合包括:所有已经处理过的阶段交通数据的所述元信息;
b4:如果比对结果为:集合中包含该文件,代表该文件已被处理过,无需对其进行后续处理;
如果比对结果为:集合中不存在该文件,代表改文件没有被处理过需要进行后续处理,执行步骤b5;
b5:把需要进行后续处理的文本格式的所述阶段交通流数据,基于FTP协议,依次拉取到所述阶段交通流数据接收端本地;
b6:对该文件按行解析,且把每行数据解析并封装成一个event;
b7:对每个event进行解析,还原成json格式的所述阶段交通流数据;
b8:把json格式的所述阶段交通流数据发送到所述kafka队列中;
步骤b3中,所述文件临时列表中的每个文件的所述元信息,与所述已处理文件信息集合中的数据进行检索比对的过程,包括如下步骤:
b3-1:取出每个文件所述元信息中的文件名作为待检索文件名,在所述已处理文件信息集合的数据中检索;
b3-2:如果所述已处理文件信息集合中不包含所述待检索文件名,则,所述比对结果设置为:集合中不存在该文件,结束本次检索比对;
否则,在所述已处理文件信息集合中,找出所有的与所述待检索文件文件名相同的文件,用这些同名文件的所述元信息构成同名文件列表;
b3-3:取出所述待检索文件名对应的文件大小,在所述同名文件列表中检索;
如果所述同名文件列表中存在相同的文件大小,则所述比对结果设置为:集合中包含该文件,结束本次检索比对;
否则,所述比对结果设置为:集合中不存在该文件,结束本次检索比对。
一种网间交通流数据的交换系统,其包括位于视频网络的信号控制系统,所述信号控制系统从道路交通监控设备获取阶段交通流数据,所述视频网络的中的数据通过公安信息通信网边界接入平台接入到位于公安专网中的公安交通指挥平台中,其特征在于,其还包括:
位于所述视频网络中,设置于所述信号控制系统、所述公安信息通信网边界接入平台之间的阶段交通流数据发送端;以及位于所述公安专网中,设置于所述公安信息通信网边界接入平台、所述公安交通指挥平台之间的阶段交通流数据接收端;
所述阶段交通流数据发送端通过socket通信连接所述所述信号控制系统,接收所述阶段交通流数据,并将数据保存为文本文件格式的数据后,放入位于所述阶段交通流数据发送端内部的FTP同步目录中;所述阶段交通流数据发送端基于FTP通信协议,同步所述阶段交通流数据至所述公安信息通信网边界接入平台;
所述阶段交通流数据接收端为基于Flume 的扩展并进行二次开发的服务器,其与所述公安信息通信网边界接入平台之间设置前置机;所述公安信息通信网边界接入平台定期扫描阶段位于所述阶段交通流数据发送端中的所述FTP同步目录,将里面的所述阶段交通流数据同步至设置于所述前置机中的接收端FTP目录中;所述阶段交通流数据接收端定期扫描所述接收端FTP目录,将未处理的所述阶段交通流数据接收至本地,解析后传输至kafka队列;所述公安交通指挥平台采用流式计算技术从所述kafka队列中消费数据,进行后续计算和操作。
所述阶段交通流数据发送端包括:socket通信子模块、数据解析转化子模块、文本文件写入子模块、文本文件发送子模块;
所述socket通信子模块:负责与所述信号控制系统建立连接,并且接收所述信号控制系统发过来的所述阶段交通流数据;
所述数据解析转化子模块:将所述阶段交通流数据从xml格式进行解析转化成json格式的字符串;
所述文本文件写入子模块:将json格式的所述阶段交通流数据存储成文本文件格式;所述文本文件写入子模块根据时间阈值TH进行判断,把每次接收的json格式的所述阶段交通流数据新建为一个文本文件、或者把数据追加在原有的文本文件中;
所述文本文件发送子模块:其为一个独立线程,会定时扫描在其监控目录下的文本文件信息,将文件的最后修改时间在时间阈值TH以前的所有文本文件,按照文件名顺序依次发送至所述FTP同步目录;
所述阶段交通流数据接收端包括:FTP客户端子模块、Flume FTP接收器子模块、Flume通道子模块、Flume接收器子模块;
所述FTP客户端子模块:与位于所述前置机中的所述接收端FTP目录建立通信连接;
所述Flume ftp接收器子模块:基于Flume接收器的自定义实现的模块,判断所述接收端FTP目录下文本文件中未处理的文件,然后通过所述FTP客户端子模块依次拉取所述接收端FTP目录下的所有为处理的文本文件,按行解析包装成event格式发送至所述Flume通道子模块;
所述Flume通道子模块:是一种短暂的存储容器,将从所述Flume ftp接收器处接收到的event格式的数据缓存起来;
所述Flume 接收器子模块:是一个独立线程,不间断的接收所述Flume 通道子模块中的event格式的数据,将其解析成json格式的所述阶段交通流数据,发送至外部所述kafka队列中。
本发明提供的一种网间交通流数据的交换方法,通过FTP文件同步的方式把视频专网中的数据,通过公安信息通信网边界接入平台同步到公安网中,FTP文件同步方式不但可靠性高,且延时低,提高了传输效率;同步数据的时候,把阶段交通流数据从xml格式转换成json格式,json格式的数据,数据格式比较简单、易于读写,且属于压缩数据格式,进行同步时占用带宽小,对传输网络性能要求降低,进而降低了系统成本;在公安网内部,通过流式计算技术和kafka队列技术结合进行数据计算,不但更适用于大数据的实时计算需求,且一旦系统对阶段交通流数据的格式、数据类型等等数据方面有什么变化的需求,也无需重新配置数据库,和调整与之对应的公安信息通信网边界接入平台中的相关配置,减少了因数据需求变化而引起的修改工作,降低了系统间的耦合性,使系统更易于扩展和维护。
附图说明
图1为现有技术中网间数据交换系统的结构示意图;
图2为本发明的网间交通流数据交换系统的结构示意图;
图3为阶段交通流数据发送端模块构成结构示意图;
图4为阶段交通流数据接收端模块构成结构示意图;
图5为阶段交通流数据接收端发送数据到Kafka队列的流程示意图。
具体实施方式
如图2所示,本发明一种网间交通流数据的交换系统,其包括位于视频网络的信号控制系统2,信号控制系统2从道路交通监控设1备获取阶段交通流数据,视频网络的中的数据通过公安信息通信网边界接入平台3接入到位于公安专网中的公安交通指挥平台4中,其还包括:
位于视频网络中,设置于信号控制系统2、公安信息通信网边界接入平台3之间的阶段交通流数据发送端5;以及位于公安专网中,设置于公安信息通信网边界接入平台3、公安交通指挥平台4之间的阶段交通流数据接收端9;
阶段交通流数据发送端5通过socket通信连接信号控制系统2,接收信号控制系统2传输过来的阶段交通流数据,并将数据保存为文本文件格式的数据后,放入位于阶段交通流数据发送端5内部的FTP同步目录6中;阶段交通流数据发送端5基于FTP通信协议,同步阶段交通流数据至公安信息通信网边界接入平台3;
阶段交通流数据接收端9为基于Flume 的扩展并进行二次开发的服务器,其与公安信息通信网边界接入平台3之间设置前置机7;公安信息通信网边界接入平台3定期扫描阶段位于阶段交通流数据发送端5中的FTP同步目录6,将里面的阶段交通流数据同步至设置于前置机7中的接收端FTP目录8中;阶段交通流数据接收端9定期扫描接收端FTP目录8,将未处理的阶段交通流数据接收至本地,解析后传输至kafka队列10;公安交通指挥平台4采用流式计算技术从kafka队列10中消费数据,进行后续计算和操作。
一种网间交通流数据的交换方法,其包括以下步骤:
S1:信号控制系统2获取道路交通监控设备1抓取的阶段交通流数据,同步到阶段交通流数据发送端5中;
S2:在阶段交通流数据发送端5中,基于数据解析转化子模块,将信号控制系统2传入的阶段交通流数据,从xml格式转换成json格式;
S3:在文本文件写入子模块中,将json格式的阶段交通流数据存储成文本文件;
设置一个时间阈值TH,且以TH秒/次的频度发送到设置在视频专网的FTP同步目录6中;
S4:公安信息通信网边界接入平台3定期扫描FTP同步目录6;
通过文本文件发送子模块,以TH秒/每次的频度,将文本格式的阶段交通流数据以FTP协议同步到位于公安网内部的接收端FTP目录8中;
S5:在公安网内部,把位于接收端FTP目录8中的文本格式的阶段交通流数据,通过阶段交通流数据接收端9中的Flume 接收器子模块,从文本文件格式的数据解析成json格式的数据,送入kafka队列;
S6:公安交通指挥平台4采用流式计算技术从kafka队列10中消费数据,进行后续计算和操作。
本发明在公安网内采用FLUME采集并解析同步过来的阶段交通流数据,更适合交管系统的数据复杂性、实时性技术特点的技术需要,使整个系统具有高可靠性和可恢复性的特点。通过阶段交通流数据发送端和阶段交通流数据接收端的配合使用,能够在极低的延时下保证数据接收处理的顺序,解决了现有技术中,因为视频专网和公安内网的数据同步技术只能处理批式数据的问题。
使用kafka队列实现大数据的实时计算,在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这种设置允许用户独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。如果在使用过程中,根据现实需求的变化,现有的数据有变化,只需要数据消费端,比如公安交通指挥平台4修改处理过程,而信号控制系统2、阶段交通流数据发送端5、公安信息通信网边界接入平台3、阶段交通流数据接收端9这些数据交换用子系统都无需做修改。如果有新类型的数据,只需要在kafka上新增一个队列,数据消费端,比如公安交通指挥平台4,从这个新增队列消费数据;同时阶段交通流数据接收端能够将ftp文件还原成流式数据,保证发往kafka队列的数据顺序与视频专网内信号控制系统发送的数据顺序一致,即可实现对数据变化的对应。即,在本发明的技术方案中,因数据变化而引起的修改工作量大大降低,本发明的技术方案更易于维护和扩展。
如图3所示,阶段交通流数据发送端5包括:socket通信子模块、数据解析转化子模块、文本文件写入子模块、文本文件发送子模块、日志子模块;
socket通信子模块:负责与信号控制系统建立连接,并且接收信号控制系统发过来的阶段交通流数据;
数据解析转化子模块:将阶段交通流数据从xml格式进行解析转化成json格式的字符串;
文本文件写入子模块:将json格式的阶段交通流数据存储成文本文件格式;文本文件写入子模块根据时间阈值TH进行判断,把每次接收的json格式的阶段交通流数据新建为一个文本文件、或者把数据追加在原有的文本文件中;文本文件写入子模块中将json格式的阶段交通流数据存储成文本文件的时候,包括如下步骤:
a1:确认系统中已经存在的文本文件的个数N;
a2:如果 N = 0,则新建一个文本文件;
否则,在存储文本文件之前,确认本次阶段交通流数据的接收时间T是否大于时间阈值TH;时间阈值TH 设为1秒:
T = 本次阶段交通流数据接收完毕的时间 - 当前系统时间;
a3:如果T大于等于TH,则新建一个文本文件;
否则,如果T小于TH,则在原来的文本文件中,将数据追加写入下面的一行;
文本文件的文件名格式为:xxxx年-xx月-xx日-xx小时-xx分钟-xx秒.log;
文本文件发送子模块:其为一个独立线程,会定时扫描在其监控目录下的文本文件信息,将文件的最后修改时间在述时间阈值TH以前的所有文本文件,按照文件名顺序依次发送至FTP同步目录;
日志子模块:记录阶段交通流数据发送端5中各个模块运行情况。
文本文件发送子模块设置为独立的一个线程,保证文本文件发送子模块与其他子模块的低耦合性,不会因为故障影响其他模块的运行;同时,按照文件名顺序发送至FTP同步目录,保证数据顺序与信号控制系统发来的顺序一致,防止后到的数据先同步到公安内网中,影响后续交通控制中判断分析的结果。
如图4所示,阶段交通流数据接收端9包括:FTP客户端子模块、Flume FTP接收器子模块、Flume通道子模块、Flume接收器子模块、日志子模块;
FTP客户端子模块:与位于前置机中的接收端FTP目录建立通信连接;
Flume ftp接收器子模块:基于Flume接收器的自定义实现的模块,判断接收端FTP目录下文本文件中未处理的文件,然后通过FTP客户端子模块依次拉取接收端FTP目录下的所有为处理的文本文件,按行解析包装成event格式发送至Flume 通道子模块;
Flume通道子模块:是一种短暂的存储容器,将从Flume ftp接收器处接收到的event格式的数据缓存起来;
Flume 接收器子模块:是一个独立线程,不间断的接收Flume 通道子模块中的event格式的数据,将其解析成json格式的阶段交通流数据,发送至外部kafka队列中;
日志子模块:记录阶段交通流数据接收端中各个模块运行情况。
在公安网内部,阶段交通流数据接收端9为基于Flume 的扩展并进行二次开发的服务器;根据具体情况设置一个扫描周期,阶段交通流数据接收端9定期扫描接收端FTP目录8中的文本格式的阶段交通流数据,且进行解析成json格式,送入kafka队列,如图5所示,其具体流程包括如下步骤:
b1:阶段交通流数据接收端9通过FTP客户端子模块与接收端FTP目录8建立连接之后,Flume ftp接收器子模块会定期扫描接收端FTP目录8,通过FTP客户端子模块获取接收端FTP目录8下的所有文本格式的阶段交通流数据的元信息,元信息包括:文件名和文件大小;
b2:把所有文件的元信息按照文件名排序,形成文件临时列表;按照文件名排序为的是保证数据顺序保证与信号控制系统发来的顺序一致;
b3:Flume ftp接收器子模块在拉取文本文件之前,从文件临时列表中,依次取出每个文件的元信息,在本地的已处理文件信息集合的数据进行检索比对;
已处理文件信息集合包括:所有已经处理过的阶段交通数据的元信息;
检索比对的过程,包括如下步骤:
b3-1:取出每个文件元信息中的文件名作为待检索文件名,在已处理文件信息集合的数据中检索;
b3-2:如果已处理文件信息集合中不包含待检索文件名,则,比对结果设置为:集合中不存在该文件,结束本次检索比对;
否则,在已处理文件信息集合中,找出所有的与待检索文件文件名相同的文件,用这些同名文件的元信息构成同名文件列表;
b3-3:取出待检索文件名对应的文件大小,在同名文件列表中检索;
如果同名文件列表中存在相同的文件大小,则比对结果设置为:集合中包含该文件,结束本次检索比对;
否则,比对结果设置为:集合中不存在该文件,结束本次检索比对;
b4:如果比对结果为:集合中包含该文件,代表该文件已被处理过,无需对其进行后续处理;
如果比对结果为:集合中不存在该文件,代表改文件没有被处理过需要进行后续处理,执行步骤b5;
b5:Flume ftp接收器子模块把需要进行后续处理的文本格式的阶段交通流数据,基于FTP协议,依次拉取到阶段交通流数据接收端9本地;
b6:对该文件按行解析,且把每行数据解析并封装成一个event,暂存在Flume通道子模块中;
b7:Flume 接收器子模块不间断的对暂存在在Flume通道子模块中的每个event进行解析,还原成json格式的阶段交通流数据;
b8:Flume 接收器子模块把json格式的阶段交通流数据发送到kafka队列10中。
在步骤b3中先只拉取文件的元信息,通过对元信息的检索比对找到未处理文件,再在步骤b5中,把未处理文件整体数据拉到阶段交通流数据接收端9本地;先传输元信息后传输未处理文件的数据的设计方式,减少了网络通信的数据量,保证低延时性;且先通过元信息检索比对找出未处理文件的设计,避免了复杂的逻辑判断,易于实现,且从计算量和计算速度上来讲,更进一步降低了系统的计算量,更适合实时系统对时效性的要求。在公安网内部,基于FLUME技术采集同步过来的阶段交通流数据文件后,同样会将每个文件的名称和大小记录,即每个文件的元信息数据,保存进本地文件,用以保证当FLUME不可用或者重启后,不会重复采集,保证数据的一致性。
Claims (8)
1.一种网间交通流数据的交换方法,其包括以下步骤:
S1:信号控制系统获取道路交通监控设备抓取的阶段交通流数据;
其特征在于,其还包括下面的步骤:
S2:将所述信号控制系统传入的所述阶段交通流数据,从xml格式转换成json格式;
S3:将json格式的所述阶段交通流数据存储成文本文件;
设置一个时间阈值TH,且以TH秒/次的频度发送到设置在视频专网的FTP同步目录中;
S4:公安信息通信网边界接入平台定期扫描所述FTP同步目录;
以TH秒/每次的频度,将文本格式的所述阶段交通流数据以FTP协议同步到位于公安网内部的接收端FTP目录中;
S5:在公安网内部,把位于所述接收端FTP目录中的文本格式的所述阶段交通流数据解析成json格式的数据,送入kafka队列;
S6:公安交通指挥平台采用流式计算技术从所述kafka队列中消费数据,进行后续计算和操作;
步骤S3中,将json格式的所述阶段交通流存储成文本文件的时候,包括如下步骤:
a1:确认系统中已经存在的文本文件的个数N;
a2:如果 N = 0,则新建一个文本文件;
否则,存储文本文件之前确认本次所述阶段交通流数据的接收时间T是否大于所述时间阈值TH:
T = 本次所述阶段交通流数据接收完毕的时间 - 当前系统时间;
a3:如果T大于等于TH,则新建一个文本文件;
否则,如果T小于TH,则在原来的文本文件中,将数据追加写入下面的一行。
2.根据权利要求1所述一种网间交通流数据的交换方法,其特征在于:步骤S3中,所述时间阈值TH 设为1秒。
3.根据权利要求1所述一种网间交通流数据的交换方法,其特征在于:步骤S3中,将json格式的所述阶段交通流数据存储成文本文件,存储的文本文件的文件名格式为:xxxx年-xx月-xx日-xx小时-xx分钟-xx秒.log。
4.根据权利要求1所述一种网间交通流数据的交换方法,其特征在于:步骤S5中,在公安网内部,设置阶段交通流数据接收端,所述阶段交通流数据接收端为基于Flume 的扩展并进行二次开发的服务器;所述阶段交通流数据接收端定期扫描所述接收端FTP目录中的文本格式的所述阶段交通流数据,且进行解析成json格式,送入所述kafka队列,其包括如下步骤:
b1:所述阶段交通流数据接收端定期扫描所述接收端FTP目录,获取所述接收端FTP目录下的所有文本格式的所述阶段交通流数据的元信息,所述元信息包括:文件名和文件大小;
b2:把所有文件的所述元信息按照文件名排序,形成文件临时列表;
b3:从所述文件临时列表中,依次取出每个文件的所述元信息,在本地的已处理文件信息集合的数据进行检索比对;
所述已处理文件信息集合包括:所有已经处理过的阶段交通数据的所述元信息;
b4:如果比对结果为:集合中包含该文件,代表该文件已被处理过,无需对其进行后续处理;
如果比对结果为:集合中不存在该文件,代表改文件没有被处理过需要进行后续处理,执行步骤b5;
b5:把需要进行后续处理的文本格式的所述阶段交通流数据,基于FTP协议,依次拉取到所述阶段交通流数据接收端本地;
b6:对该文件按行解析,且把每行数据解析并封装成一个event;
b7:对每个event进行解析,还原成json格式的所述阶段交通流数据;
b8:把json格式的所述阶段交通流数据发送到所述kafka队列中。
5.根据权利要求4所述一种网间交通流数据的交换方法,其特征在于:步骤b3中,所述文件临时列表中的每个文件的所述元信息,与所述已处理文件信息集合中的数据进行检索比对的过程,包括如下步骤:
b3-1:取出每个文件所述元信息中的文件名作为待检索文件名,在所述已处理文件信息集合的数据中检索;
b3-2:如果所述已处理文件信息集合中不包含所述待检索文件名,则,所述比对结果设置为:集合中不存在该文件,结束本次检索比对;
否则,在所述已处理文件信息集合中,找出所有的与所述待检索文件文件名相同的文件,用这些同名文件的所述元信息构成同名文件列表;
b3-3:取出所述待检索文件名对应的文件大小,在所述同名文件列表中检索;
如果所述同名文件列表中存在相同的文件大小,则所述比对结果设置为:集合中包含该文件,结束本次检索比对;
否则,所述比对结果设置为:集合中不存在该文件,结束本次检索比对。
6.基于权利要求1所述一种网间交通流数据的交换方法,实现网间交通流数据的交换系统,其包括位于视频网络的信号控制系统,所述信号控制系统从道路交通监控设备获取阶段交通流数据,所述视频网络的中的数据通过公安信息通信网边界接入平台接入到位于公安专网中的公安交通指挥平台中,其特征在于,其还包括:
位于所述视频网络中,设置于所述信号控制系统、所述公安信息通信网边界接入平台之间的阶段交通流数据发送端;以及位于所述公安专网中,设置于所述公安信息通信网边界接入平台、所述公安交通指挥平台之间的阶段交通流数据接收端;
所述阶段交通流数据发送端通过socket通信连接所述所述信号控制系统,接收所述阶段交通流数据,并将数据保存为文本文件格式的数据后,放入位于所述阶段交通流数据发送端内部的FTP同步目录中;所述阶段交通流数据发送端基于FTP通信协议,同步所述阶段交通流数据至所述公安信息通信网边界接入平台;
所述阶段交通流数据接收端为基于Flume 的扩展并进行二次开发的服务器,其与所述公安信息通信网边界接入平台之间设置前置机;所述公安信息通信网边界接入平台定期扫描阶段位于所述阶段交通流数据发送端中的所述FTP同步目录,将里面的所述阶段交通流数据同步至设置于所述前置机中的接收端FTP目录中;所述阶段交通流数据接收端定期扫描所述接收端FTP目录,将未处理的所述阶段交通流数据接收至本地,解析后传输至kafka队列;所述公安交通指挥平台采用流式计算技术从所述kafka队列中消费数据,进行后续计算和操作。
7.根据权利要求6所述一种网间交通流数据的交换系统,其特征在于:所述阶段交通流数据发送端包括:socket通信子模块、数据解析转化子模块、文本文件写入子模块、文本文件发送子模块;
所述socket通信子模块:负责与所述信号控制系统建立连接,并且接收所述信号控制系统发过来的所述阶段交通流数据;
所述数据解析转化子模块:将所述阶段交通流数据从xml格式进行解析转化成json格式的字符串;
所述文本文件写入子模块:将json格式的所述阶段交通流数据存储成文本文件格式;所述文本文件写入子模块根据时间阈值TH进行判断,把每次接收的json格式的所述阶段交通流数据新建为一个文本文件、或者把数据追加在原有的文本文件中;
所述文本文件发送子模块:其为一个独立线程,会定时扫描在其监控目录下的文本文件信息,将文件的最后修改时间在时间阈值TH以前的所有文本文件,按照文件名顺序依次发送至所述FTP同步目录。
8.根据权利要求6所述一种网间交通流数据的交换系统,其特征在于:所述阶段交通流数据接收端包括:FTP客户端子模块、Flume FTP接收器子模块、Flume通道子模块、Flume接收器子模块;
所述FTP客户端子模块:与位于所述前置机中的所述接收端FTP目录建立通信连接;
所述Flume ftp接收器子模块:基于Flume接收器的自定义实现的模块,判断所述接收端FTP目录下文本文件中未处理的文件,然后通过所述FTP客户端子模块依次拉取所述接收端FTP目录下的所有为处理的文本文件,按行解析包装成event格式发送至所述Flume 通道子模块;
所述Flume通道子模块:是一种短暂的存储容器,将从所述Flume ftp接收器处接收到的event格式的数据缓存起来;
所述Flume 接收器子模块:是一个独立线程,不间断的接收所述Flume 通道子模块中的event格式的数据,将其解析成json格式的所述阶段交通流数据,发送至外部所述kafka队列中。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201910811244.0A CN110505307B (zh) | 2019-08-30 | 2019-08-30 | 一种网间交通流数据的交换方法及系统 |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201910811244.0A CN110505307B (zh) | 2019-08-30 | 2019-08-30 | 一种网间交通流数据的交换方法及系统 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN110505307A CN110505307A (zh) | 2019-11-26 |
CN110505307B true CN110505307B (zh) | 2022-04-26 |
Family
ID=68590603
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN201910811244.0A Active CN110505307B (zh) | 2019-08-30 | 2019-08-30 | 一种网间交通流数据的交换方法及系统 |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN110505307B (zh) |
Families Citing this family (5)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN113507530B (zh) * | 2021-09-09 | 2022-01-04 | 深圳市安软慧视科技有限公司 | 数据转发方法、相关系统和设备及存储介质 |
CN113901271A (zh) * | 2021-12-10 | 2022-01-07 | 飞狐信息技术(天津)有限公司 | 基于电子表格配置系统的方法、装置、存储介质和设备 |
CN114546972A (zh) * | 2022-02-23 | 2022-05-27 | 罗普特科技集团股份有限公司 | 一种全生命周期运维系统的三网业务数据同步方法与系统 |
CN115695920A (zh) * | 2022-09-30 | 2023-02-03 | 武汉兴图新科电子股份有限公司 | 一种基于Flume的rtsp摄像头视频流接入方法 |
CN115794735B (zh) * | 2022-11-18 | 2023-08-29 | 中远海运散货运输有限公司 | 电子海图数据更新的文件转换方法、系统、设备及介质 |
Citations (5)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN101094026A (zh) * | 2006-06-19 | 2007-12-26 | 上海全成通信技术有限公司 | 大量数据同步、传输和入数据库的方法 |
CN105530125A (zh) * | 2015-12-12 | 2016-04-27 | 公安部交通管理科学研究所 | 一种网间数据交换系统 |
CN106600969A (zh) * | 2016-12-19 | 2017-04-26 | 安徽百诚慧通科技有限公司 | 一种智慧化道路综合防控方法及系统 |
CN107967348A (zh) * | 2017-12-13 | 2018-04-27 | 武汉烽火众智数字技术有限责任公司 | 基于多网数据融合应用提高视频侦查效率的系统及其方法 |
CN109462592A (zh) * | 2018-11-20 | 2019-03-12 | 北京旷视科技有限公司 | 数据共享方法、装置、设备及存储介质 |
Family Cites Families (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
GB2518257A (en) * | 2013-09-13 | 2015-03-18 | Vodafone Ip Licensing Ltd | Methods and systems for operating a secure mobile device |
CA3059788A1 (en) * | 2017-05-18 | 2018-11-22 | Expanse, Inc. | Correlation-driven threat assessment and remediation |
-
2019
- 2019-08-30 CN CN201910811244.0A patent/CN110505307B/zh active Active
Patent Citations (5)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN101094026A (zh) * | 2006-06-19 | 2007-12-26 | 上海全成通信技术有限公司 | 大量数据同步、传输和入数据库的方法 |
CN105530125A (zh) * | 2015-12-12 | 2016-04-27 | 公安部交通管理科学研究所 | 一种网间数据交换系统 |
CN106600969A (zh) * | 2016-12-19 | 2017-04-26 | 安徽百诚慧通科技有限公司 | 一种智慧化道路综合防控方法及系统 |
CN107967348A (zh) * | 2017-12-13 | 2018-04-27 | 武汉烽火众智数字技术有限责任公司 | 基于多网数据融合应用提高视频侦查效率的系统及其方法 |
CN109462592A (zh) * | 2018-11-20 | 2019-03-12 | 北京旷视科技有限公司 | 数据共享方法、装置、设备及存储介质 |
Non-Patent Citations (6)
Title |
---|
GIS-T在广州市智能交通管理指挥的应用研究;钟永康等;《广东公安科技》;20100330(第01期);全文 * |
Smart Cities: A Survey on Data Management, Security, and Enabling Technologies;Ammar Gharaibeh;《IEEE》;20170807;全文 * |
保定市智能交通综合管理平台系统的设计与实现;徐铁强;《中国优秀硕士学位论文全文数据库信息科技辑》;20130531;全文 * |
基于云计算的机动车缉查布控联网系统设计与应用;徐晓东;《中国优秀硕士学位论文全文数据库工程科技Ⅱ辑》;20170331;第1-5章 * |
基于代理服务的公安交通管理云平台接口实现;邵志骅等;《中国公共安全(学术版)》;20150915(第03期);全文 * |
基于大数据的智能交通资源中心体系建设研究;吴小刚等;《智能城市》;20190514(第09期);全文 * |
Also Published As
Publication number | Publication date |
---|---|
CN110505307A (zh) | 2019-11-26 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN110505307B (zh) | 一种网间交通流数据的交换方法及系统 | |
CN106452819B (zh) | 数据采集系统及数据采集方法 | |
CN105512201A (zh) | 数据收集和加工方法及装置 | |
CN111046100B (zh) | 一种关系型数据库到非关系型数据库的同步方法和系统 | |
CN103095819A (zh) | 推送数据信息的方法及数据信息推送系统 | |
CN112988916B (zh) | 针对Clickhouse的全量和增量同步方法、设备和存储介质 | |
CN111400405A (zh) | 一种基于分布式的监控视频数据并行处理系统及方法 | |
CN117573619A (zh) | 光学遥感数据流式分段分景编目方法、装置、设备及介质 | |
CN113114968A (zh) | 一种视频处理方法、装置、设备及存储介质 | |
CN106412513A (zh) | 视频处理系统及处理方法 | |
CN113872814A (zh) | 内容分发网络的信息处理方法、装置和系统 | |
CN112835978A (zh) | 一种数据存储方法、装置及计算机设备 | |
CN115277723B (zh) | 边缘采集历史模块基于缓冲事件的断点续传方法及系统 | |
CN108683643B (zh) | 一种基于流式处理的数据脱敏系统及其脱敏方法 | |
CN105763484A (zh) | 基于流组合压缩的信令流汇聚装置及其方法 | |
WO2023231723A1 (zh) | 流媒体数据处理方法及系统 | |
CN110691164A (zh) | 一种基于手机终端实现应急广播的方法及系统 | |
CN104735097A (zh) | 信息的收集方法和系统 | |
CN116185298A (zh) | 一种日志分布式存储的方法 | |
CN104507107B (zh) | 一种信令数据的预处理方法 | |
Barth et al. | A Modernized Architecture for the Post Mortem System at CERN | |
CN116800991B (zh) | 一种基于边缘计算设备的视频推流方法和系统 | |
CN112363835B (zh) | 一种基于网络大数据的智能资源调整方法及系统 | |
CN111143280B (zh) | 一种数据调度方法、系统、装置及存储介质 | |
CN115361262B (zh) | 一种传输设备性能文件ftp上报的实现方法和系统 |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |