本文作者:admin

sparkhbase微服务(spark hbase)

admin 2023年01月05日 10:55:29 2

本文目录一览:

如何使用scala+spark读写hbase

如何使用scala+spark读写Hbase

软件版本如下:

scala2.11.8

spark2.1.0

hbase1.2.0

公司有一些实时数据处理的项目sparkhbase微服务,存储用的是hbasesparkhbase微服务,提供实时的检索sparkhbase微服务,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。

接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。

关于批量操作Hbase,一般我们都会用MapReduce来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。

整个流程如下:

(1)全量读取hbase表的数据

(2)做一系列的ETL

(3)把全量数据再写回hbase

核心代码如下:

//获取conf

val conf=HBaseConfiguration.create() //设置读取的表

conf.set(TableInputFormat.INPUT_TABLE,tableName) //设置写入的表

conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//创建sparkConf

val sparkConf=new SparkConf() //设置spark的任务名

sparkConf.setAppName("read and write for hbase ") //创建spark上下文

val sc=new SparkContext(sparkConf)

//为job指定输出格式和输出表名

val newAPIJobConfiguration1 = Job.getInstance(conf)

newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)

newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

//全量读取hbase表

val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]

,classOf[ImmutableBytesWritable]

,classOf[Result]

)

//过滤空数据,然后对每一个记录做更新,并转换成写入的格式

val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)

//转换后的结果,再次做过滤

val save_rdd=final_rdd.filter(checkNull)

//最终在写回hbase表

save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)

sc.stop()

从上面的代码可以看出来,使用spark+scala操作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:

第一个:checkNotEmptyKs

作用:过滤掉空列簇的数据

def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true

}

第二个:forDatas

作用:读取每一条数据,做update后,在转化成写入操作

def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //获取Result

val put:Put=new Put(r.getRow) //声明put

val ks=Bytes.toBytes("ks") //读取指定列簇

val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala

map.foreach(kv={//遍历每一个rowkey下面的指定列簇的每一列的数据做转化

val kid= Bytes.toString(kv._1)//知识点id

var value=Bytes.toString(kv._2)//知识点的value值

value="修改后的value"

put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put对象

}

) if(put.isEmpty) null else (new ImmutableBytesWritable(),put)

}

第三个:checkNull 作用:过滤最终结果里面的null数据

def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true

}

上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。

除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark操作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:

spark 读 hbase parquet 哪个快

spark读hbasesparkhbase微服务,生成task受所查询tablesparkhbase微服务的region个数限制sparkhbase微服务,任务数有限sparkhbase微服务,例如查询的40G数据,10G一个region,很可能就4~6个region,初始的task数就只有4~6个左右,RDD后续可以partition设置task数sparkhbase微服务

spark读parquet按默认的bolck个数生成task个数,例如128M一个bolck,差不多就是300多个task,初始载入情况就比hbase快,而且直接载入parquet文件到spark的内存,而hbase还需要同regionserver交互把数据传到spark的内存也是需要消耗时间的。

总体来说,读parquet更快

hadoop/spark/hbase集群 动态缩容/扩容

卸载节点一般不建议直接停掉,需要先将其排除在集群之外,在主节点上编辑配置文件:${HADOOP_HOME}/etc/hadoop/hdfs-site.xml

在datanode-deny.list中加入要卸载的节点名称,如:slave3

[图片上传失败...(image-268887-1589181513073)]

卸载节点后,刷新节点信息:

./bin/hdfs dfsdmin -refreshNodes会看到节点变成Dead和Decommissioned状态,如下图:

该命令执行后,同时会强制重新加载配置 ,在后台进行Block块的移动

[图片上传失败...(image-71d33c-1589181513074)]

卸载成功后,再停止节点:

./sbin/hadoop-daemon.sh stop datanode

./sbin/yarn-daemon.sh stop nodemanager

如果希望下次自动启动,修改集群中所有节点的slaves配置文件,具体目录 ${HADOOP_HOME}/etc/hadoop/slaves

首先准备一台hadoop节点,参考 hadoop、spark install

在hadoop子节点上,HADOOP_HOME目录下启动hdfs: ./sbin/hadoop-daemon.sh --config /usr/hadoop/hadoop-2.7.7/etc/hadoop/ start datanode

启动nodeManage(这个命令会确定其他子节点的nodeManage,可以把该节点的slaves文件中其他节点去掉): ./yarn-daemons.sh --config /usr/hadoop/hadoop-2.7.7/etc/hadoop/ start nodemanager

在主节点上刷新节点信息: ./bin/hdfs dfsadmin -refreshNodes;使用命令可以均衡数据:./sbin/start-balancer.sh

如下图,节点又重新加入了集群

[图片上传失败...(image-a892c6-1589181513074)]

如果希望下次自动启动,修改集群里机器的所有slaves配置文件,具体目录 ${HADOOP_HOME}/etc/hadoop/slaves

在想要去掉的节点上,SPARK_HOME 目录下执行 ./sbin/stop-slave.sh 即可去掉spark节点。这样操作可以临时去掉spark节点,如果想永久去掉节点,需要在主节点的SPARK_HOME/conf/slaves 文件中去掉子节点

如图,原有三个spark节点

[图片上传失败...(image-50e48c-1589181513074)]

执行./sbin/stop-slave.sh 后,一台节点的状态变成了DEAD

[图片上传失败...(image-c12af8-1589181513074)]

如果希望下次自动启动,修改集群里机器的所有slaves配置文件,具体目录 ${SPARK_HOME}/conf/slaves

首先准备一台spark节点,参考 hadoop、spark install

在spark子节点上,SPARK_HOME 目录下执行 ./sbin/ start-slave.sh spark://master:7077,该子节点即可加入mster集群。

如图,扩容后,集群增加了子节点

[图片上传失败...(image-35bb79-1589181513074)]

如果希望下次自动启动,修改集群里机器的所有slaves配置文件,具体目录 ${SPARK_HOME}/conf/slaves

在要卸载的子节点上,HBASE_HOME/bin目录下,执行 ./graceful_stop.sh 子节点的hostname,即可卸载子节点。

该命令会自动关闭Load Balancer,然后Assigned Region,之后会将该节点关闭。除此之外,你还可以查看remove的过程,已经assigned了多少个Region,还剩多少个Region,每个Region 的Assigned耗时。

最终要在hbase shell 中关闭balance_switch :balance_switch false,然后再开启:balance_switch true

首先准备一台hbase节点,参考 hbase(2.0.0) install

在新的子节点 HBASE_HOME 目录下,执行 ./bin/hbase-daemon.sh start regionserver 启动节点

在主节点上编辑HBASE_HOME/conf/regionservers文件,增加新的子节点

在新的子节点上,打开hbase shell,执行 balance_switch true 开启集群自动balance.

hbase 本来有两个节点,如下图:

[图片上传失败...(image-182c39-1589181513074)]

增加新节点后变成了三个:

[图片上传失败...(image-991e21-1589181513074)]

参考:

什么是hadoop,spark,hbase

这个就比较负责sparkhbase微服务sparkhbase微服务,可以用hadoop+hbase+spark/storm进行平台构建sparkhbase微服务,spark用于数据分析和处理、hbase用于将处理后sparkhbase微服务的数据保存、hadoop用于离线分析和原始数据存储,具体的还得结合应用场景

微服务容器平台面对大数据存储是怎么做的

整体而言,大数据平台从平台部署和数据分析过程可分为如下几步:

1、linux系统安装

一般使用开源版的Redhat系统--CentOS作为底层平台。为了提供稳定的硬件基础,在给硬盘做RAID和挂载数据存储节点的时,需要按情况配置。例如,可以选择给HDFS的namenode做RAID2以提高其稳定性,将数据存储与操作系统分别放置在不同硬盘上,以确保操作系统的正常运行。

2、分布式计算平台/组件安装

目前国内外的分布式系统的大多使用的是Hadoop系列开源系统。Hadoop的核心是HDFS,一个分布式的文件系统。在其基础上常用的组件有Yarn、Zookeeper、Hive、Hbase、Sqoop、Impala、ElasticSearch、Spark等。

先说下使用开源组件的优点:1)使用者众多,很多bug可以在网上找的答案(这往往是开发中最耗时的地方)。2)开源组件一般免费,学习和维护相对方便。3)开源组件一般会持续更新,提供必要的更新服务『当然还需要手动做更新操作』。4)因为代码开源,若出bug可自由对源码作修改维护。

再简略讲讲各组件的功能。分布式集群的资源管理器一般用Yarn,『全名是Yet Another Resource Negotiator』。常用的分布式数据数据『仓』库有Hive、Hbase。Hive可以用SQL查询『但效率略低』,Hbase可以快速『近实时』读取行。外部数据库导入导出需要用到Sqoop。Sqoop将数据从Oracle、MySQL等传统数据库导入Hive或Hbase。Zookeeper是提供数据同步服务,Yarn和Hbase需要它的支持。Impala是对hive的一个补充,可以实现高效的SQL查询。ElasticSearch是一个分布式的搜索引擎。针对分析,目前最火的是Spark『此处忽略其他,如基础的MapReduce 和 Flink』。Spark在core上面有ML lib,Spark Streaming、Spark QL和GraphX等库,可以满足几乎所有常见数据分析需求。

值得一提的是,上面提到的组件,如何将其有机结合起来,完成某个任务,不是一个简单的工作,可能会非常耗时。

3、数据导入

前面提到,数据导入的工具是Sqoop。用它可以将数据从文件或者传统数据库导入到分布式平台『一般主要导入到Hive,也可将数据导入到Hbase』。

4、数据分析

数据分析一般包括两个阶段:数据预处理和数据建模分析。

数据预处理是为后面的建模分析做准备,主要工作时从海量数据中提取可用特征,建立大宽表。这个过程可能会用到Hive SQL,Spark QL和Impala。

数据建模分析是针对预处理提取的特征/数据建模,得到想要的结果。如前面所提到的,这一块最好用的是Spark。常用的机器学习算法,如朴素贝叶斯、逻辑回归、决策树、神经网络、TFIDF、协同过滤等,都已经在ML lib里面,调用比较方便。

5、结果可视化及输出API

可视化一般式对结果或部分原始数据做展示。一般有两种情况,行熟悉展示,和列查找展示。在这里,要基于大数据平台做展示,会需要用到ElasticSearch和Hbase。Hbase提供快速『ms级别』的行查找。 ElasticSearch可以实现列索引,提供快速列查找。

平台搭建主要问题:

1、稳定性 Stability

理论上来说,稳定性是分布式系统最大的优势,因为它可以通过多台机器做数据及程序运行备份以确保系统稳定。但也由于大数据平台部署于多台机器上,配置不合适,也可能成为最大的问题。 曾经遇到的一个问题是Hbase经常挂掉,主要原因是采购的硬盘质量较差。硬盘损坏有时会到导致Hbase同步出现问题,因而导致Hbase服务停止。由于硬盘质量较差,隔三差五会出现服务停止现象,耗费大量时间。结论:大数据平台相对于超算确实廉价,但是配置还是必须高于家用电脑的。

2、可扩展性 Scalability

如何快速扩展已有大数据平台,在其基础上扩充新的机器是云计算等领域应用的关键问题。在实际2B的应用中,有时需要增减机器来满足新的需求。如何在保留原有功能的情况下,快速扩充平台是实际应用中的常见问题。

上述是自己项目实践的总结。整个平台搭建过程耗时耗力,非一两个人可以完成。一个小团队要真正做到这些也需要耗费很长时间。

目前国内和国际上已有多家公司提供大数据平台搭建服务,国外有名的公司有Cloudera,Hortonworks,MapR等,国内也有华为、明略数据、星环等。另外有些公司如明略数据等还提供一体化的解决方案,寻求这些公司合作对 于入门级的大数据企业或没有大数据分析能力的企业来说是最好的解决途径。

对于一些本身体量较小或者目前数据量积累较少的公司,个人认为没有必要搭建这一套系统,暂时先租用AWS和阿里云就够了。对于数据量大,但数据分析需求较简单的公司,可以直接买Tableau,Splunk,HP Vertica,或者IBM DB2等软件或服务即可。

-

阅读
分享