CN111177271B - kafka数据持久化到hdfs的数据存储方法、装置、计算机设备 - Google Patents
kafka数据持久化到hdfs的数据存储方法、装置、计算机设备 Download PDFInfo
- Publication number
- CN111177271B CN111177271B CN201911417388.4A CN201911417388A CN111177271B CN 111177271 B CN111177271 B CN 111177271B CN 201911417388 A CN201911417388 A CN 201911417388A CN 111177271 B CN111177271 B CN 111177271B
- Authority
- CN
- China
- Prior art keywords
- data
- kafka
- type
- hdfs
- data type
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/27—Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/28—Databases characterised by their database models, e.g. relational or object models
-
- Y—GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
- Y02—TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
- Y02D—CLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
- Y02D10/00—Energy efficient computing, e.g. low power processors, power management or thermal management
Landscapes
- Engineering & Computer Science (AREA)
- Databases & Information Systems (AREA)
- Theoretical Computer Science (AREA)
- Data Mining & Analysis (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computing Systems (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本发明提供了一种kafka数据持久化到hdfs的数据存储方法、装置、计算机设备和存储介质。其中,该方法应用于单机版流式存储,包括:利用FixedThreadPool线程池对kafka数据进行kafka topic解析,其中,每个topic对应FixedThreadPool线程池中的一个工作线程;根据kafka topic解析结果确定存储目标hive表;消费kafka topic数据,向hive表关联的hdfs目录写入kafka数据。通过本发明,可提高kafka数据持久化到hdfs的灵活性,同时降低了资源的占用率。
Description
技术领域
本发明涉及大数据存储技术领域,尤其涉及一种kafka数据持久化到hdfs的数据存储方法、装置、计算机设备。
背景技术
在现有的kafka数据持久化到hdfs技术中,存在几种较为常规的处理方法,例如,基于spark-sql流式存储的技术实现、基于flume技术实现等方案。
基于spark-sql流式存储技术在实现时,会生成很多碎片文件,为缓解存储生成的碎片文件需要较多的运行资源;并且,spark-sql流式存储运行还需要依赖spark大数据开源组件的部署,占用较多使用资源。
基于flume技术在实现时,因flume运行环境过度依赖预先的静态配置文件,使得不能灵活的控制需要存储的kafka topic以及目标hive表;并且,flume技术方案不能灵活的支持对更为复杂的数据类型(如array、map等)的转换。
针对现有技术中实现kafka数据持久化到hdfs过程占用较多使用资源且灵活性差的问题,目前尚未提出有效的解决方案。
发明内容
本发明的目的是提供一种kafka数据持久化到hdfs的数据存储方法、装置、计算机设备和存储介质,用于解决现有技术中实现kafka数据持久化到hdfs过程占用较多使用资源且灵活性差的问题。
为实现上述目的,根据本发明实施例的一个方面,提供了一种kafka数据持久化到hdfs的数据存储方法。
该kafka数据持久化到hdfs的数据存储方法应用于单机版流式存储,包括:利用FixedThreadPool线程池对kafka数据进行kafka topic解析,其中,每个topic对应FixedThreadPool线程池中的一个工作线程;根据kafka topic解析结果确定存储目标hive表;消费kafka topic数据,向hive表关联的hdfs目录写入kafka数据。
进一步地,消费kafka topic数据,向hive表关联的hdfs目录写入kafka数据,包括:利用预先定制的类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象。
进一步地,利用预先定制的类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象,包括:获取到hive的FieldSchema集合信息;通过FieldSchema集合信息转换出spark sql中的StructType;遍历StructType中的字段field,并针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型。
进一步地,上述field的类型至少包括如下类型之一:基本数据类型、struct数据类型、map数据类型、array数据类型。
进一步地,在field的类型为基本数据类型时,针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:使用fastjson工具包的TypeUtils的cast方法把源数据转换成目标数据类型的变量值。
在field的类型为struct数据类型时,针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历struct数据类型中的所有子元素以获取其数据类型和变量值,再使用fastjson工具包的TypeUtils的cast方法将子元素源数据转换成目标数据类型的变量值;
在field的类型为map数据类型时,针对field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历map数据类型中的所有key value pair以获取其所有key和value的数据类型及变量值,再使用fastjson工具包的TypeUtils的cast方法分别把map的key和value源数据转换成目标数据类型的变量值;
在field的类型为array数据类型时,针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历array数据类型中的数组元素的公共数据类型及数组,再变量所有数组变量值并且使用fastjson工具包的TypeUtils的cast方法把数组元素源数据转换成目标数据类型的变量值。
进一步地,FixedThreadPool线程池还包括定时任务线程,用于在开启定时任务线程时,通过间隔时间控制kafka数据持久化到hdfs的数据存储过程中生成文件的容量大小。
进一步地,在消费kafka topic数据时,在向hive表关联的hdfs目录写入kafka数据之前,方法还包括:判断是否命中缓存;若命中缓存,则在缓存中获取writer上下文;若未命中缓存,初始化writer上下文,并进行缓存。
为实现上述目的,根据本发明实施例的另一个方面,提供了一种kafka数据持久化到hdfs的数据存储装置。
该kafka数据持久化到hdfs的数据存储装置包括:解析模块,用于利用FixedThreadPool线程池对kafka数据进行kafka topic解析,其中,每个topic对应FixedThreadPool线程池中的一个work线程;确定模块,用于根据kafka topic解析结果确定存储目标hive表;存储模块,用于消费kafka topic数据,向hive表关联的hdfs目录写入kafka数据。
进一步地,存储模块利用预先定制的类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象。
进一步地,存储模块还包括:获取单元,用于获取到hive的FieldSchema集合信息;第一转换单元,用于通过FieldSchema集合信息转换出spark sql中的StructType;第二转换单元,用于遍历StructType中的字段field,并针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型。
进一步地,上述field的类型至少包括如下类型之一:基本数据类型、struct数据类型、map数据类型、array数据类型。
进一步地,第二转换单元包括:第一转换子单元,用于在field的类型为基本数据类型时,使用fastjson工具包的TypeUtils的cast装置把源数据转换成目标数据类型的变量值;第二转换子单元,用于在field的类型为struct数据类型时,遍历struct数据类型中的所有子元素以获取其数据类型和变量值,再使用fastjson工具包的TypeUtils的cast装置将子元素源数据转换成目标数据类型的变量值;第三转换子单元,用于在field的类型为map数据类型时,遍历map数据类型中的所有key value pair以获取其所有key和value的数据类型及变量值,再使用fastjson工具包的TypeUtils的cast装置分别把map的key和value源数据转换成目标数据类型的变量值;第四转换子单元,用于在field的类型为array数据类型时,遍历array数据类型中的数组元素的公共数据类型及数组,再变量所有数组变量值并且使用fastjson工具包的TypeUtils的cast装置把数组元素源数据转换成目标数据类型的变量值。
进一步地,解析模块包括定时控制单元,用于在kafka数据持久化到hdfs的数据存储过程中,在开启FixedThreadPool线程池的定时任务线程时,通过间隔时间控制kafka数据持久化到hdfs的数据存储过程中生成文件的容量大小。
进一步地,存储模块还包括:缓存单元,用于在消费kafka topic数据时,在向hive表关联的hdfs目录写入kafka数据之前,判断是否命中缓存;若命中缓存,则在缓存中获取writer上下文;若未命中缓存,初始化writer上下文,并进行缓存。
为实现上述目的,根据本发明实施例的另一个方面,提供了一种计算机设备,包括存储器、处理器以及存储在存储器上并可在处理器上运行的计算机程序,该处理器执行计算机程序时实现上述kafka数据持久化到hdfs的数据存储方法的步骤。
根据本发明实施例的一个方面,为实现上述目的,本发明还提供计算机可读存储介质,其上存储有计算机程序,该计算机程序被处理器执行时实现上述kafka数据持久化到hdfs的数据存储方法的步骤。
本发明提供的kafka数据持久化到hdfs的数据存储方法、装置、计算机设备和存储介质,应用于单机版流式存储,不依赖于其他开源组件,部署轻量快捷,同时,在数据处理过程中,利用FixedThreadPool线程池动态kafka topic解析,不依赖静态的配置文件,通过动态分析配置实现灵活控制数据读取和对目标表的写入,最终完成kafka数据持久化到hdfs。本发明提供的技术方案有效地解决了现有技术中实现kafka数据持久化到hdfs过程占用较多使用资源且灵活性差的问题,提高kafka数据持久化到hdfs的灵活性,同时降低了资源的占用率。
附图说明
通过阅读下文优选实施方式的详细描述,各种其他的优点和益处对于本领域普通技术人员将变得清楚明了。附图仅用于示出优选实施方式的目的,而并不认为是对本发明的限制。而且在整个附图中,用相同的参考符号表示相同的部件。在附图中:
图1为本发明实施例一提供的kafka数据持久化到hdfs的数据存储方法的一种可选的流程图;
图2为本发明实施例一提供的kafka数据持久化到hdfs的数据存储方法的另一种可选的流程图;
图3为本发明实施例二提供的kafka数据持久化到hdfs的数据存储装置的一种可选的结构框图;
图4为本发明实施例三提供的计算机设备的硬件结构图。
具体实施方式
为了使本发明的目的、技术方案及优点更加清楚明白,以下结合附图及实施例,对本发明进行进一步详细说明。应当理解,此处所描述的具体实施例仅用以解释本发明,并不用于限定本发明。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。
现有基于spark-sql流式存储技术在实现时生成很多碎片文件,为缓解存储生成的碎片文件需要较多的运行资源;并且,spark-sql流式存储运行还需要依赖spark大数据开源组件的部署,占用较多使用资源。针对于此,本发明提供了一种在处理存储环节不依赖其他开源组件,部署轻量快捷的实施方式;
现有基于flume技术在实现时,因flume运行环境过度依赖预先的静态配置文件,使得不能灵活的控制需要存储的kafka topic以及目标hive表;针对于此,本发明提供了一种启动运行不依赖静态的配置文件,可动态分析配置来灵活控制数据读取和对目标表的写入的实施方式。
此外,申请人还发现,现有以parquet数据格式关联hive表存在如下技术痛点:
1)在以parquet数据格式落地到存储介质hdfs时,flume技术方案必须依赖avro作为中间schema转换环节,针对于此,本发明提供了一种可直接灵活控制JSONObject与spark-sql的StructType之间的类型转换,不需要avro来显式给出数据schema信息,实现JSONObject与parquet之间正常流转的存储过程的实施方式。
2)现有flume技术方案不能灵活的支持对更为复杂的数据类型(如array、map等)的转换,针对于此,本发明提供了一种基于spark-sql的ParquetWriteSupport类型转换技术实现了对各类复杂嵌套数据类型的写入支持的实施方式。
关于本发明提供的kafka数据持久化到hdfs的数据存储方法、装置、计算机设备和存储介质的具体实施例,将在下文中详细描述。
实施例一
本发明实施例提供了一种kafka数据持久化到hdfs的数据存储方法,该kafka数据持久化到hdfs的数据存储方法可应用于单机版流式存储的数据处理框架下,通过该方法,可提高kafka数据持久化到hdfs的灵活性,同时降低资源的占用率。具体地,图1为本发明实施例一提供的kafka数据持久化到hdfs的数据存储方法的一种可选的流程图,如图1所示,该实施例提供的kafka数据持久化到hdfs的数据存储方法包括如下的步骤S101至步骤S103。
步骤S101:利用FixedThreadPool线程池对kafka数据进行kafka topic解析,其中,每个topic对应FixedThreadPool线程池中的一个工作线程;
具体实现时,本发明使用FixedThreadPool线程池动态kafka topic解析,为提高处理性能,将本方案的系统设计为多线程并发运行环境,针对每个topic开启一个线程,所有线程托管给固定大小的线程池,方便线程管理。此外,线程管理时的策略还对系统运行所需要的上下文进行合理规划cache key后缓存,以大大降低在大数据量下的频繁对象初始化开销。
其中,作为一种优选的实施方式,本实施例中的FixedThreadPool线程池还设置定时任务线程,用于在开启定时任务线程时,通过间隔时间控制kafka数据持久化到hdfs的数据存储过程中生成文件的容量大小。本发明通过调整定时任务线程的间隔运行时间来控制写入hdfs的文件大小,进而加大间隔时间以解决写入碎片文件问题。
本发明实施方式为轻量级单机部署,灵活动态的把kafka topic数据流式持久化到hdfs并关联hive表(包括分区表和非分区表)中,解决写入hdfs的碎片文件过多问题,以尽量少的使用资源来达到不低于1万eps(event per second)的处理性能。
此外,实施本kafka数据持久化到hdfs的数据存储方法对应的系统可以以jar包形式部署,于jvm中启动运行,部署方式简单,部署环境适用性广,只需支持JVM即可部署。
步骤S102:根据kafka topic解析结果确定存储目标hive表;
通过topic动态分析出存储目标hive表。在本实施例中,本方案的系统可主动探测进行动态kafka topic解析,在topic解析后根据解析结果动态分析出存储目标hive表。
步骤S103:消费kafka topic数据,向hive表关联的hdfs目录写入kafka数据。
请结合参考图2,图2中示出上述kafka数据持久化到hdfs的数据存储方法的另一种流程示意图,在开始执行后,FixedThreadPool线程池可动态kafka topic解析,每个topic一个work线程,之后,通过topic动态分析出存储目标hive表。在分析出存储目标hive表后,消费kafka topic数据。
此处作为一种优选的实施方式,还引入缓存机制,先判断是否命中缓存,若判断结果为是,则在缓存中获取writer上下文,若判断结果为否,则初始化writer上下文,并缓存。然后向hive表关联的hdfs目录写入数据。此处需要说明的是,hive表包括分区表和非分区表,若为分区表时,需动态追加分区元数据以关联hive。本实施例中的线程对系统运行所需要的上下文进行合理规划cache key后缓存,以大大降低在大数据量下的频繁对象初始化开销。
在本实施例的另一个可选的实施方式中,还对上述方案进行了优化,参见图2,FixedThreadPool线程池还设置定时任务线程,在启动定时任务线程时,利用间隔时间确定生成文件的容量大小。本发明通过调整定时任务线程的间隔运行时间来控制写入hdfs的文件大小,进而加大间隔时间以解决写入碎片文件问题。
此处需要说明的是,关于定时任务线程的执行时序在图2中仅仅是一种示意性的说明,本领域技术人员可以根据其自身设计,选择其自身所需的时序执行。
在本实施例的另一个可选的实施方式中,还对上述方案进行了进一步的优化,参见图2,kafka数据持久化到hdfs的数据存储方法中还增加停止信号捕捉器的处理逻辑,用于在捕获到停止信号后释放系统占用资源。通过设置信号捕捉器的处理逻辑,使得kafka数据持久化到hdfs的数据存储方法中停止时序变得可控,保证kafka数据持久化到hdfs的数据存储顺利进行的同时,也提升了方案的完整性。
在本实施例的另一个可选的实施方式中,还提供了消费kafka topic数据,向hive表关联的hdfs目录写入kafka数据过程中一种细粒度的控制逻辑,具体来说,消费kafkatopic数据,向hive表关联的hdfs目录写入kafka数据,包括:利用预先定制的类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象。
申请人发现,在以parquet数据格式落地到存储介质hdfs时,必须依赖avro作为中间schema转换环节,较为复杂,针对此,本发明定制类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象,可直接灵活控制JSONObject与spark-sql的StructType之间的类型转换,而不再需要avro来显式给出数据schema信息,进而实现JSONObject与parquet之间正常流转的存储过程。提高灵活性。
在实施本kafka数据持久化到hdfs的数据存储方法对应的系统中的writer可支持的有ParquetWriter、TextWriter、ORCWriter三类。ParquetWriter中使用的WriterSupport采用spark-sql包中的ParquetWriteSupport扩展对象实现,定制类型转换器实现JSONObject和ParquetWriteSupport所需的InternalRow的数据类型转换,以支持对各类嵌套复杂hive数据类型的存储写入,并以此脱离了必须利用avro获取中间schema的转换依据过程依赖。
具体实现时,本实施例还提供了一种利用预先定制的类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象的方法,包括如下步骤:获取到hive的FieldSchema集合信息;通过FieldSchema集合信息转换出spark sql中的StructType;遍历StructType中的字段field,并针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型。
优选地,上述field的类型至少包括如下类型之一:基本数据类型、struct数据类型、map数据类型、array数据类型。其中,上述struct数据类型、map数据类型、array数据类型均为复杂数据类型,关于是否属于复杂数据类型,可以根据本领域人员的经验进行判断,也可以通过预设复杂数据类型筛选条件或者设置复杂数据类型对应的集合的方式来实现。在设置复杂数据类型条件者设置复杂数据类型对应的集合后,该复杂数据类型对应清楚的保护范围。
具体实现时,本实施例中还分别对针对上述不同field的类型,提供了对应处理逻辑,具体来说,在field的类型为基本数据类型时,针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型的处理逻辑包括:使用fastjson工具包的TypeUtils的cast方法把源数据转换成目标数据类型的变量值。
在field的类型为struct数据类型时,针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历struct数据类型中的所有子元素以获取其数据类型和变量值,再使用fastjson工具包的TypeUtils的cast方法将子元素源数据转换成目标数据类型的变量值;
在field的类型为map数据类型时,针对field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历map数据类型中的所有key value pair以获取其所有key和value的数据类型及变量值,再使用fastjson工具包的TypeUtils的cast方法分别把map的key和value源数据转换成目标数据类型的变量值;
在field的类型为array数据类型时,针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历array数据类型中的数组元素的公共数据类型及数组,再变量所有数组变量值并且使用fastjson工具包的TypeUtils的cast方法把数组元素源数据转换成目标数据类型的变量值。
本实施例中提供的kafka数据持久化到hdfs的数据存储方法,应用于单机版流式存储,不依赖于其他开源组件,部署轻量快捷,同时,在数据处理过程中,利用FixedThreadPool线程池动态kafka topic解析,不依赖静态的配置文件,通过动态分析配置实现灵活控制数据读取和对目标表的写入,最终完成kafka数据持久化到hdfs。本发明提供的技术方案有效地解决了现有技术中实现kafka数据持久化到hdfs过程占用较多使用资源且灵活性差的问题,提高kafka数据持久化到hdfs的灵活性,同时降低了资源的占用率。
实施例二
对应于上述实施例一,本发明实施例二提供了一种Kafka数据持久化到hdfs的数据存储装置,相关技术特征和对应的技术效果可参考上述实施例一,在此处不再赘述。图3为本发明实施例二提供的kafka数据持久化到hdfs的数据存储装置的一种优选的结构框图,如图3所示,该kafka数据持久化到hdfs的数据存储装置包括:解析模块301,用于利用FixedThreadPool线程池对kafka数据进行kafka topic解析,其中,每个topic对应FixedThreadPool线程池中的一个work线程;确定模块302,用于根据kafka topic解析结果确定存储目标hive表;存储模块303,用于消费kafka topic数据,向hive表关联的hdfs目录写入kafka数据。
进一步地,存储模块利用预先定制的类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象。
进一步地,存储模块还包括:获取单元,用于获取到hive的FieldSchema集合信息;第一转换单元,用于通过FieldSchema集合信息转换出spark sql中的StructType;第二转换单元,用于遍历StructType中的字段field,并针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型。
进一步地,上述field的类型至少包括如下类型之一:基本数据类型、struct数据类型、map数据类型、array数据类型。
进一步地,第二转换单元包括:第一转换子单元,用于在field的类型为基本数据类型时,使用fastjson工具包的TypeUtils的cast装置把源数据转换成目标数据类型的变量值;第二转换子单元,用于在field的类型为struct数据类型时,遍历struct数据类型中的所有子元素以获取其数据类型和变量值,再使用fastjson工具包的TypeUtils的cast装置将子元素源数据转换成目标数据类型的变量值;第三转换子单元,用于在field的类型为map数据类型时,遍历map数据类型中的所有key value pair以获取其所有key和value的数据类型及变量值,再使用fastjson工具包的TypeUtils的cast装置分别把map的key和value源数据转换成目标数据类型的变量值;第四转换子单元,用于在field的类型为array数据类型时,遍历array数据类型中的数组元素的公共数据类型及数组,再变量所有数组变量值并且使用fastjson工具包的TypeUtils的cast装置把数组元素源数据转换成目标数据类型的变量值。
进一步地,解析模块包括定时控制单元,用于在kafka数据持久化到hdfs的数据存储过程中,在开启FixedThreadPool线程池的定时任务线程时,通过间隔时间控制kafka数据持久化到hdfs的数据存储过程中生成文件的容量大小。
进一步地,存储模块还包括:缓存单元,用于在消费kafka topic数据时,在向hive表关联的hdfs目录写入kafka数据之前,判断是否命中缓存;若命中缓存,则在缓存中获取writer上下文;若未命中缓存,初始化writer上下文,并进行缓存。
本实施例中提供的kafka数据持久化到hdfs的数据存储装置,应用于单机版流式存储,不依赖于其他开源组件,部署轻量快捷,同时,在数据处理过程中,利用FixedThreadPool线程池动态kafka topic解析,不依赖静态的配置文件,通过动态分析配置实现灵活控制数据读取和对目标表的写入,最终完成kafka数据持久化到hdfs。本发明提供的技术方案有效地解决了现有技术中实现kafka数据持久化到hdfs过程占用较多使用资源且灵活性差的问题,提高kafka数据持久化到hdfs的灵活性,同时降低了资源的占用率。
实施例三
本实施例三还提供一种计算机设备,如可以执行程序的智能手机、平板电脑、笔记本电脑、台式计算机、机架式服务器、刀片式服务器、塔式服务器或机柜式服务器(包括独立的服务器,或者多个服务器所组成的服务器集群)等。如图4所示,本实施例的计算机设备01至少包括但不限于:可通过系统总线相互通信连接的存储器011、处理器012,如图4所示。需要指出的是,图4仅示出了具有组件存储器011和处理器012的计算机设备01,但是应理解的是,并不要求实施所有示出的组件,可以替代的实施更多或者更少的组件。
本实施例中,存储器011(即可读存储介质)包括闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘等。在一些实施例中,存储器011可以是计算机设备01的内部存储单元,例如该计算机设备01的硬盘或内存。在另一些实施例中,存储器011也可以是计算机设备01的外部存储设备,例如该计算机设备01上配备的插接式硬盘,智能存储卡(Smart Media Card,SMC),安全数字(Secure Digital,SD)卡,闪存卡(Flash Card)等。当然,存储器011还可以既包括计算机设备01的内部存储单元也包括其外部存储设备。本实施例中,存储器011通常用于存储安装于计算机设备01的操作系统和各类应用软件,例如实施例二的kafka数据持久化到hdfs的数据存储装置的程序代码等。此外,存储器011还可以用于暂时地存储已经输出或者将要输出的各类数据。
处理器012在一些实施例中可以是中央处理器(Central Processing Unit,CPU)、控制器、微控制器、微处理器、或其他数据处理芯片。该处理器012通常用于控制计算机设备01的总体操作。本实施例中,处理器012用于运行存储器011中存储的程序代码或者处理数据,例如kafka数据持久化到hdfs的数据存储方法等。
实施例四
本实施例四还提供一种计算机可读存储介质,如闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘、服务器、App应用商城等等,其上存储有计算机程序,程序被处理器执行时实现相应功能。本实施例的计算机可读存储介质用于kafka数据持久化到hdfs的数据存储装置,被处理器执行时实现实施例一的kafka数据持久化到hdfs的数据存储方法。
从以上描述中可以看出,本发明上述实施例提供的kafka数据持久化到hdfs的数据存储方法、装置、计算机设备和存储介质,应用于单机版流式存储,不依赖于其他开源组件,部署轻量快捷,同时,在数据处理过程中,利用FixedThreadPool线程池动态kafkatopic解析,不依赖静态的配置文件,通过动态分析配置实现灵活控制数据读取和对目标表的写入,最终完成kafka数据持久化到hdfs。本发明提供的技术方案有效地解决了现有技术中实现kafka数据持久化到hdfs过程占用较多使用资源且灵活性差的问题,提高kafka数据持久化到hdfs的灵活性,同时降低了资源的占用率。
需要说明的是,在本文中,术语“包括”、“包含”或者其任何其他变体意在涵盖非排他性的包含,从而使得包括一系列要素的过程、方法、物品或者装置不仅包括那些要素,而且还包括没有明确列出的其他要素,或者是还包括为这种过程、方法、物品或者装置所固有的要素。在没有更多限制的情况下,由语句“包括一个……”限定的要素,并不排除在包括该要素的过程、方法、物品或者装置中还存在另外的相同要素。
上述本发明实施例序号仅仅为了描述,不代表实施例的优劣。
通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。
以上仅为本发明的优选实施例,并非因此限制本发明的专利范围,凡是利用本发明说明书及附图内容所作的等效结构或等效流程变换,或直接或间接运用在其他相关的技术领域,均同理包括在本发明的专利保护范围内。
Claims (8)
1.一种kafka数据持久化到hdfs的数据存储方法,其特征在于,所述方法应用于单机版流式存储,包括:
利用FixedThreadPool线程池对kafka数据进行kafka topic解析,其中,每个topic对应所述FixedThreadPool线程池中的一个工作线程;
根据所述kafka topic解析结果确定存储目标hive表;
消费所述kafka topic数据,向所述hive表关联的hdfs目录写入kafka数据,包括:利用预先定制的类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象,具体为:
获取到hive的FieldSchema集合信息;
通过所述FieldSchema集合信息转换出spark-sql中的StructType;
遍历所述StructType中的字段field,并针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型。
2.根据权利要求1所述的kafka数据持久化到hdfs的数据存储方法,其特征在于,所述field的类型至少包括如下类型之一:基本数据类型、struct数据类型、map数据类型、array数据类型。
3.根据权利要求2所述的kafka数据持久化到hdfs的数据存储方法,其特征在于:
在所述field的类型为基本数据类型时,所述针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:使用fastjson工具包的TypeUtils的cast方法把源数据转换成目标数据类型的变量值;
在所述field的类型为struct数据类型时,所述针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历struct数据类型中的所有子元素以获取其数据类型和变量值,再使用fastjson工具包的TypeUtils的cast方法将子元素源数据转换成目标数据类型的变量值;
在所述field的类型为map数据类型时,所述针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历map数据类型中的所有key value pair以获取其所有key和value的数据类型及变量值,再使用fastjson工具包的TypeUtils的cast方法分别把map的key和value源数据转换成目标数据类型的变量值;
在所述field的类型为array数据类型时,所述针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型,包括:遍历array数据类型中的数组元素的公共数据类型及数组,再变量所有数组变量值并且使用fastjson工具包的TypeUtils的cast方法把数组元素源数据转换成目标数据类型的变量值。
4.根据权利要求1所述的kafka数据持久化到hdfs的数据存储方法,其特征在于,所述FixedThreadPool线程池还包括定时任务线程,用于在开启所述定时任务线程时,通过间隔时间控制kafka数据持久化到hdfs的数据存储过程中生成文件的容量大小。
5.根据权利要求1所述的kafka数据持久化到hdfs的数据存储方法,其特征在于,在消费所述kafka topic数据时,在所述向所述hive表关联的hdfs目录写入kafka数据之前,所述方法还包括:
判断是否命中缓存;
若命中缓存,则在缓存中获取writer上下文;
若未命中缓存,初始化writer上下文,并进行缓存。
6.一种kafka数据持久化到hdfs的数据存储装置,其特征在于,所述装置应用于单机版流式存储,包括:
解析模块,用于利用FixedThreadPool线程池对kafka数据进行kafka topic解析,其中,每个topic对应所述FixedThreadPool线程池中的一个work线程;
确定模块,用于根据所述kafka topic解析结果确定存储目标hive表;
存储模块,用于消费所述kafka topic数据,向所述hive表关联的hdfs目录写入kafka数据,包括:利用预先定制的类型转换器将kafka数据持久化到hdfs过程中的JSONObject转换成ParquetWriteSupport所需的InternalRow对象,具体为:
获取到hive的FieldSchema集合信息;
通过所述FieldSchema集合信息转换出spark-sql中的StructType;
遍历所述StructType中的字段field,并针对不同field的类型对JSONObject中的该字段值进行类型转换,转换的目标数据类型为spark-sql的InternalRow对象所需要的数据类型。
7.一种计算机设备,包括存储器、处理器以及存储在存储器上并可在处理器上运行的计算机程序,其特征在于,所述处理器执行所述计算机程序时实现权利要求1至5任一项所述方法的步骤。
8.一种计算机可读存储介质,其上存储有计算机程序,其特征在于:所述计算机程序被处理器执行时实现权利要求1至5任一项所述方法的步骤。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201911417388.4A CN111177271B (zh) | 2019-12-31 | 2019-12-31 | kafka数据持久化到hdfs的数据存储方法、装置、计算机设备 |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201911417388.4A CN111177271B (zh) | 2019-12-31 | 2019-12-31 | kafka数据持久化到hdfs的数据存储方法、装置、计算机设备 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN111177271A CN111177271A (zh) | 2020-05-19 |
CN111177271B true CN111177271B (zh) | 2023-11-10 |
Family
ID=70656010
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN201911417388.4A Active CN111177271B (zh) | 2019-12-31 | 2019-12-31 | kafka数据持久化到hdfs的数据存储方法、装置、计算机设备 |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN111177271B (zh) |
Families Citing this family (6)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN112433909A (zh) * | 2020-11-03 | 2021-03-02 | 中国南方电网有限责任公司 | 一种基于kafka的实时监控数据的处理方法 |
CN112527801A (zh) * | 2020-12-21 | 2021-03-19 | 中国人民银行清算总中心 | 关系型数据库与大数据系统间的数据同步方法及系统 |
CN113094342A (zh) * | 2021-04-02 | 2021-07-09 | 上海中通吉网络技术有限公司 | 数据持久化的方法、装置及设备、存储介质 |
CN113590667A (zh) * | 2021-05-31 | 2021-11-02 | 深圳感臻科技有限公司 | 一种基于Spark Streaming的实时数据更新及管理方法 |
CN114153620B (zh) * | 2022-02-08 | 2022-05-24 | 上海柯林布瑞信息技术有限公司 | Hudi运行环境资源优化分配方法及装置 |
CN116910016B (zh) * | 2023-09-14 | 2024-06-11 | 交通运输部北海航海保障中心天津通信中心 | 一种ais数据处理方法 |
Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN108519987A (zh) * | 2018-02-24 | 2018-09-11 | 国家计算机网络与信息安全管理中心 | 一种数据持久化方法和装置 |
CN109902126A (zh) * | 2019-02-18 | 2019-06-18 | 国家计算机网络与信息安全管理中心 | 支持hive自动分区的加载系统及其实现方法 |
Family Cites Families (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US10198298B2 (en) * | 2015-09-16 | 2019-02-05 | Salesforce.Com, Inc. | Handling multiple task sequences in a stream processing framework |
US9965330B2 (en) * | 2015-09-18 | 2018-05-08 | Salesforce.Com, Inc. | Maintaining throughput of a stream processing framework while increasing processing load |
-
2019
- 2019-12-31 CN CN201911417388.4A patent/CN111177271B/zh active Active
Patent Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN108519987A (zh) * | 2018-02-24 | 2018-09-11 | 国家计算机网络与信息安全管理中心 | 一种数据持久化方法和装置 |
CN109902126A (zh) * | 2019-02-18 | 2019-06-18 | 国家计算机网络与信息安全管理中心 | 支持hive自动分区的加载系统及其实现方法 |
Non-Patent Citations (1)
Title |
---|
范家杰 ; 田熙清 ; 郑博 ; .基于流式计算的DPI数据处理方案及实践.移动通信.2018,(01),全文. * |
Also Published As
Publication number | Publication date |
---|---|
CN111177271A (zh) | 2020-05-19 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN111177271B (zh) | kafka数据持久化到hdfs的数据存储方法、装置、计算机设备 | |
US11960441B2 (en) | Retention management for data streams | |
US11340803B2 (en) | Method for configuring resources, electronic device and computer program product | |
US9058212B2 (en) | Combining memory pages having identical content | |
JP5744707B2 (ja) | メモリ使用量照会ガバナのためのコンピュータ実装方法、コンピュータ・プログラム、およびシステム(メモリ使用量照会ガバナ) | |
CN111324427B (zh) | 一种基于dsp的任务调度方法及装置 | |
US8583608B2 (en) | Maximum allowable runtime query governor | |
US10649905B2 (en) | Method and apparatus for storing data | |
CN108459913B (zh) | 数据并行处理方法、装置及服务器 | |
US9836516B2 (en) | Parallel scanners for log based replication | |
CN110737388A (zh) | 数据预读方法、客户端、服务器以及文件系统 | |
US20120331235A1 (en) | Memory management apparatus, memory management method, control program, and recording medium | |
CN109582649B (zh) | 一种元数据存储方法、装置、设备及可读存储介质 | |
US9582340B2 (en) | File lock | |
GB2545058A (en) | Flash memory management | |
US9946496B2 (en) | SSD with non-blocking flush command | |
CN105353987A (zh) | 一种文件处理方法及装置 | |
US20170004086A1 (en) | Cache management method for optimizing read performance of distributed file system | |
US8782306B2 (en) | Low-contention update buffer queuing for large systems | |
CN111177032A (zh) | 缓存空间申请方法、系统、装置及计算机可读存储介质 | |
CN105426522A (zh) | 一种文件系统性能统计方法与系统 | |
US7783849B2 (en) | Using trusted user space pages as kernel data pages | |
US8341368B2 (en) | Automatic reallocation of structured external storage structures | |
US9684525B2 (en) | Apparatus for configuring operating system and method therefor | |
US10860472B2 (en) | Dynamically deallocating memory pool subinstances |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
CB02 | Change of applicant information |
Address after: Room 332, 3 / F, Building 102, 28 xinjiekouwei street, Xicheng District, Beijing 100088 Applicant after: QAX Technology Group Inc. Applicant after: Qianxin Wangshen information technology (Beijing) Co.,Ltd. Address before: Room 332, 3 / F, Building 102, 28 xinjiekouwei street, Xicheng District, Beijing 100088 Applicant before: QAX Technology Group Inc. Applicant before: LEGENDSEC INFORMATION TECHNOLOGY (BEIJING) Inc. |
|
CB02 | Change of applicant information | ||
GR01 | Patent grant | ||
GR01 | Patent grant |