首页> 云计算&大数据> 大数据之Spark技术

[文章]大数据之Spark技术

置顶帖 精帖 收藏
0 887 4

一、什么是Spark

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。

谈到大数据,相信大家对Hadoop和Spark这两个名字并不陌生。但我们往往对它们的理解只是提留在字面上,并没有对它们进行深入的思考,下面不妨跟我一块看下它们究竟有什么异同。

1、解决问题的层面不一样

首先,Hadoop和Apache Spark两者都是大数据框架,但是各自存在的目的不尽相同。Hadoop实质上更多是一个分布式数据基础设施: 它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,意味着您不需要购买和维护昂贵的服务器硬件。

同时,Hadoop还会索引和跟踪这些数据,让大数据处理和分析效率达到前所未有的高度。Spark,则是那么一个专门用来对那些分布式存储的大数据进行处理的工具,它并不会进行分布式数据的存储。

2、两者可合可分

Hadoop除了提供为大家所共识的HDFS分布式数据存储功能之外,还提供了叫做MapReduce的数据处理功能。所以这里我们完全可以抛开Spark,使用Hadoop自身的MapReduce来完成数据的处理。

相反,Spark也不是非要依附在Hadoop身上才能生存。但如上所述,毕竟它没有提供文件管理系统,所以,它必须和其他的分布式文件系统进行集成才能运作。这里我们可以选择Hadoop的HDFS,也可以选择其他的基于云的数据系统平台。但Spark默认来说还是被用在Hadoop上面的,毕竟,大家都认为它们的结合是最好的。


Spark在伯克利的数据分析软件栈BDAS中的位置

以下是对MapReduce的最简洁明了的解析,其中把人理解成计算机就好了:

我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。

现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。

3、Spark数据处理速度秒杀MapReduce

Spark因为其处理数据的方式不一样,会比MapReduce快上很多。MapReduce是分步对数据进行处理的: ”从集群中读取数据,进行一次处理,将结果写到集群,从集群中读取更新后的数据,进行下一次的处理,将结果写到集群,等等…“ BoozAllen Hamilton的数据科学家Kirk Borne如此解析。

反观Spark,它会在内存中以接近“实时”的时间完成所有的数据分析:“从集群中读取数据,完成所有必须的分析处理,将结果写回集群,完成,” Born说道。Spark的批处理速度比MapReduce快近10倍,内存中的数据分析速度则快近100倍。

如果需要处理的数据和结果需求大部分情况下是静态的,且你也有耐心等待批处理的完成的话,MapReduce的处理方式也是完全可以接受的。

但如果你需要对流数据进行分析,比如那些来自于工厂的传感器收集回来的数据,又或者说你的应用是需要多重数据处理的,那么你也许更应该使用Spark进行处理。

大部分机器学习算法都是需要多重数据处理的。此外,通常会用到Spark的应用场景有以下方面:实时的市场活动,在线产品推荐,网络安全分析,机器日记监控等。

4、灾难恢复

两者的灾难恢复方式迥异,但是都很不错。因为Hadoop将每次处理后的数据都写入到磁盘上,所以其天生就能很有弹性的对系统错误进行处理。

Spark的数据对象存储在分布于数据集群中的叫做弹性分布式数据集(RDD:Resilient Distributed Dataset)中。“这些数据对象既可以放在内存,也可以放在磁盘,所以RDD同样也可以提供完成的灾难恢复功能,”Borne指出。



二、Spark生态系统

除了Spark核心API之外,Spark生态系统中还包括其他附加库,可以在大数据分析和机器学习领域提供更多的能力。


这些库包括:

•Spark Streaming: Spark Streaming基于微批量方式的计算和处理,可以用于处理实时的流数据。它使用DStream,简单来说就是一个弹性分布式数据集(RDD)系列,处理实时数据。


•Spark SQL: Spark SQL可以通过JDBC API将Spark数据集暴露出去,而且还可以用传统的BI和可视化工具在Spark数据上执行类似SQL的查询。用户还可以用Spark SQL对不同格式的数据(如JSON,Parquet以及数据库等)执行ETL,将其转化,然后暴露给特定的查询。


•Spark MLlib: MLlib是一个可扩展的Spark机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。


•Spark GraphX: GraphX是用于图计算和并行图计算的新的(alpha)Spark API。通过引入弹性分布式属性图(ResilientDistributed Property Graph),一种顶点和边都带有属性的有向多重图,扩展了SparkRDD。为了支持图计算,GraphX暴露了一个基础操作符集合(如subgraph,joinVertices和aggregateMessages)和一个经过优化的PregelAPI变体。此外,GraphX还包括一个持续增长的用于简化图分析任务的图算法和构建器集合。


除了这些库以外,还有一些其他的库,如BlinkDB和Tachyon。

BlinkDB是一个近似查询引擎,用于在海量数据上执行交互式SQL查询。BlinkDB可以通过牺牲数据精度来提升查询响应时间。通过在数据样本上执行查询并展示包含有意义的错误线注解的结果,操作大数据集合。

Tachyon是一个以内存为中心的分布式文件系统,能够提供内存级别速度的跨集群框架(如Spark和MapReduce)的可信文件共享。它将工作集文件缓存在内存中,从而避免到磁盘中加载需要经常读取的数据集。通过这一机制,不同的作业/查询和框架可以以内存级的速度访问缓存的文件。

此外,还有一些用于与其他产品集成的适配器,如Cassandra(Spark Cassandra 连接器)和R(SparkR)。Cassandra Connector可用于访问存储在Cassandra数据库中的数据并在这些数据上执行数据分析。

下图展示了在Spark生态系统中,这些不同的库之间的相互关联。


Spark框架中的库



三、Spark体系架构

Spark体系架构包括如下三个主要组件:

•数据存储


•API


•管理框架


接下来让我们详细了解一下这些组件。

1、数据存储:

Spark用HDFS文件系统存储数据。它可用于存储任何兼容于Hadoop的数据源,包括HDFS,HBase,Cassandra等。



2、API:

利用API,应用开发者可以用标准的API接口创建基于Spark的应用。Spark提供Scala,JavaPython三种程序设计语言的API。

下面是三种语言SparkAPI的网站链接。

•Scala API:http://spark.apache.org/docs/latest/api/scala/index.html


•Java:http://spark.apache.org/docs/latest/api/java/index.html


•Python:http://spark.apache.org/docs/latest/api/python/index.html




3、资源管理:

Spark既可以部署在一个单独的服务器也可以部署在像Mesos或YARN这样的分布式计算框架之上。

下图展示了Spark体系架构模型中的各个组件。


Spark体系架构

4、弹性分布式数据集(RDDs)

弹性分布式数据集(基于Matei的研究论文)或RDD是Spark框架中的核心概念。可以将RDD视作数据库中的一张表。其中可以保存任何类型的数据。Spark将数据存储在不同分区上的RDD之中。

RDD可以帮助重新安排计算并优化数据处理过程。

此外,它还具有容错性,因为RDD知道如何重新创建和重新计算数据集。

RDD是不可变的。你可以用变换(Transformation)修改RDD,但是这个变换所返回的是一个全新的RDD,而原有的RDD仍然保持不变。

RDD支持两种类型的操作:

•变换(Transformation):变换的返回值是一个新的RDD集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个RDD作为参数,然后返回一个新的RDD。变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。


•行动(Action):行动操作计算并返回一个新的值。当在一个RDD对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。行动操作包括:reduce,collect,count,first,take,countByKey以及foreach。




四、如何使用Apache Spark?

最近,我偶然看到一篇文章。文章中提到利用Twitter流式数据来检测地震。有趣的是,实验表明该技术可以比日本气象局更快地告知百姓地震的情况。即使他们在文章使用了不同的方法,但我认为这是一个很好的例子,它可以用来检验我们如何利用简化的片段代码而不使用粘接代码将Spark付诸实践。

首先,我们必须将与“地震”或者“抖动”有关的推文过滤出来。我们可以非常轻易地利用SparkStreaming来实现该目标:

TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") ||_.getText.contains("shaking"))

接下来我们可以对处理完的推文数据做一些语义分析,并判断是否能反映出当前的地震情况。比如,类似于“地震”或者“现在正在晃动!”的推文将被视为具有正效应,而类似于“参加地震会议。”或者“昨天的地震真恐怖。”则被视为无影响效应。文章作者利用支持向量机模型来实现该目标,我们将在此基础上利用流式数据版本的模型来实现。下文是利用MLlib生成的代码示例:

// We would prepare some earthquake tweet data and load it inLIBSVM format. val data = MLUtils.loadLibSVMFile(sc,"sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%). val splits =data.randomSplit(Array(0.6,0.4),seed = 11L) val training = split(0).cache() valtest = splits(1) // Run trainingalgorithm to build the model val numIterations = 100 val model =SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. valscoreAndLabels = test.map{point => val score = model.predict(point.features) (score, point.label)} // Get evaluation metrics. val metric = newBinaryClassificationMetrics(scoreAndLabels) val auROC =metrics.areaUnderROC() println("Area under ROC =" + auROC)

如果我们关注模型的预测准确率,那么我们可以进一步对检测到地震做出反应。需要注意的是,对于包含地理信息的推文,我们还可以获取震源位置。利用这个信息,我们可以通过SparkSQL从现有的Hive table(储存需要接收地震提醒的用户信息)中提取出他们的邮箱地址并发送一封私人电子邮件:

//sc is an existing SparkContext. val sqlContext = neworg.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom functionsqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName,city, email") .collect().foreach(sendEmail)



ApacheSpark的其他应用

除检测地震情况外,Spark还有许多潜在的应用。以下是Spark在大数据中的部分应用:

1.在游戏领域中,从实时的潜在游戏事件中迅速地挖掘出有价值的模式可以创造出巨大的商业利益,比如用户返回率情况、如何制定定向广告以及如何自动调整游戏的复杂度等。


2.在电子商务领域中,实时交易数据将被传递到k均值算法或者ALS等协同过滤流算法中。这些运算结果将和顾客评论等非结构化数据结合起来,用于不断改进交易模式以适应新趋势的发展。


3.在金融或证券领域中,Spark堆栈技术可以被应用到信用诈骗和风险管控系统中。通过获取大量的历史数据和其他一些外泄数据以及一些连接/请求信息(IP地理信息或时间信息),我们可以取得非常好的模型结果。


总而言之,Spark帮助人们简化了处理大规模数据的步骤流程。不管是处理结构化还是非结构化数据,Spark将许多复杂的功能(比如机器学习算法和图算法)无缝地结合起来。Spark使得大量的从业者都可以进行大数据分析,让我们一探究竟吧!


云计算&大数据
最近热帖
{{item.Title}} {{item.ViewCount}}
近期热议
{{item.Title}} {{item.PostCount}}