大数据spark 视频教程程

高亮话题×
& & & DesignSpark PCB作为业内唯一一款免费正版的PCB设计软件功能强大技术完备,新版本的储存文档只有63MB,旨在帮助电子工程师减少开发时间,降低开发成本以及错误发生率,让设计者可以将更多的精力放在产品开发上,将更多的成本用在元器件的采购上。视频一:DesignSpark PCB 简介优酷视频:视频二:群组功能介绍视频优酷视频:视频三:设计计算器功能优酷视频:视频四:模拟界面功能优酷视频:视频五:DesignSpark PCB 创建原理图教学视频优酷视频:视频六:DesignSpark PCB 将原理图转换成PCB教学视频优酷视频:视频七:DesignSpark PCB库创建教程一创建原理图符号教学视频优酷视频:视频八:DesignSpark PCB库创建教程二(创建PCB符号)教学视频优酷:视频九:DesignSpark PCB库创建教程三(创建元件)教学视频优酷视频:视频十:DesignSpark PCB 制造PCB教学视频优酷视频:
找不到答案?作品,技巧分享?
尊敬的用户:
您好!感谢您使用DesignSpark产品!
DesignSpark软件注册激活请注意以下事宜:
1.激活时,请阅读下载页面的激活说明;
2.RS官网注册带*的信息必填项不能为空,注册页面底部’同意DesignSpark条款’前面选择框要勾选, 否则提交会失败;
3.注册邮箱请避免填写qq邮箱(容易拦截进入垃圾箱);
4 若填写qq邮箱,请注意将@设置为白名单;
5.注意点击DesignSpark确认邮件里的激活链接才算完成注册;
您也可以联系在线支持团队把用户信息授权于我们手动注册激活。
DS社区官方团队
DesignSpark PCB 达人
简介:中国罗宾鸟,昵称小鸟,坚信事在人为,只要想做就一定能够成功!2013年9月开始接触DesignSpark PCB,对DSPCB颇有研究,目前发表关于DesignSpark帖子均为个人原创,欢迎前来探讨DSPCB技术问题。
DesignSpark PCB功能强大的免费PCB设计软件。
DesignSpark Mechanical是功能强大的免费3D设计软件。
10月15日 -12月15日
在线技术支持
DS社区专员> DesignSpark PCB库创建教程一创建原理图符号教学视频视频教程
DesignSpark PCB库创建教程一创建原理图符号教学视频视频教程
DesignSpark PCB 教学视频
验 证 码:
《电子产品世界》杂志社 版权所有 北京东晓国际技术信息咨询有限公司
Copyright & ELECTRONIC ENGINEERING & PRODUCT WORLD. All rights reserved.
京ICP060382号
北京市公安局备案:查看: 10410|回复: 0
Spark教程(2)Spark Streaming 介绍
主题帖子积分
金牌会员, 积分 6937, 距离下一级还需 3063 积分
金牌会员, 积分 6937, 距离下一级还需 3063 积分
1、如何利用DStream所提供的api,而在数据流上实时进行操作?
2、你如何理解Spark Streaming启动后的流程?
3、如何创建StreamingContext对象?
随着big data的发展,人们对大数据的处理要求也越来越高,传统的MapReduce等批处理框架在某些特定领域(如实时用户推荐,用户行为分析)已经无法满足人们对实时性的需求。因此诞生了一批如,这样的流式的、实时的计算框架。本文介绍的也正是一个这样的流式计算框架。
What is Spark Streaming
作为UC Berkeley云计算software stack的一部分,Spark Streaming是建立在Spark上的应用框架,利用Spark的底层框架作为其执行基础,并在其上构建了DStream的行为抽象。利用DStream所提供的api,用户可以在数据流上实时进行count,join,aggregate等操作。
A Spark Streaming application is very similar to a S it consists of a driver program that runs the user’s main function and continuous executes various parallel operations on input streams of data. The main abstraction Spark Streaming provides is a discretized stream (DStream), which is a continuous sequence of RDDs (distributed collections of elements) representing a continuous stream of data. DStreams can be created from live incoming data (such as data from a socket, Kafka, etc.) or can be generated by transformong existing DStreams using parallel operators like map, reduce, and window.复制代码
How to Use Spark Streaming
作为构建于Spark之上的应用框架,Spark Streaming承袭了Spark的编程风格,对于了解Spark的用户来说能够快速地上手。接下来以word count为例来介绍Spark Streaming的使用方式:
import spark.streaming.{Seconds, StreamingContext}
import spark.streaming.StreamingContext._
...
// Create the context and set up a network input stream to receive from a host:port
val ssc = new StreamingContext(args(0), &NetworkWordCount&, Seconds(1))
val lines = ssc.socketTextStream(args(1), args(2).toInt)
// Split the lines into words, count them, and print some of the counts on the master
val words = lines.flatMap(_.split(& &))
val wordCounts = words.map(x =& (x, 1)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()复制代码
1、创建StreamingContext对象
同Spark初始需要创建SparkContext对象一样,使用Spark Streaming就需要创建StreamingContext对象。创建StreamingContext对象所需的参数与SparkContext基本一致,包括指明master,设定名称(如NetworkWordCount)。需要注意的是参数Seconds(1),Spark Streaming需要指定处理数据的时间间隔,如上例所示的1s,那么Spark Streaming会以1s为时间窗口进行数据处理。此参数需要根据用户的需求和集群的处理能力进行适当的设置。
2、创建InputDStream
如同Storm的Spout,Spark Streaming需要指明数据源。如上例所示的socketTextStream,Spark Streaming以socket连接作为数据源读取数据。当然Spark Streaming支持多种不同的数据源,包括kafkaStream,flumeStream,fileStream, networkStream等。
3、操作DStream
对于从数据源得到的DStream,用户可以在其基础上进行各种操作,如上例所示的操作就是一个典型的word count执行流程:对于当前时间窗口内从数据源得到的数据首先进行分割,然后利用MapReduce算法映射和计算,当然最后还有print()输出结果。
4、启动Spark Streaming
之前所作的所有步骤只是创建了执行流程,程序没有真正连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划,当ssc.start()启动后程序才真正进行所有预期的操作。
至此对于Spark Streaming的如何使用有了一个大概的印象,接下来我们来探究一下Spark Streaming背后的代码。
Spark Streaming 源码分析
StreamingContext
Spark Streaming使用StreamingContext提供对外接口,用户可以使用StreamingContext提供的api来构建自己的Spark Streaming应用程序。
StreamingContext内部维护SparkContext实例,通过SparkContext进行RDD的操作。
在实例化StreamingContext时需要指定batchDuration,用来指示Spark Streaming recurring job的重复时间。
StreamingContext提供了多种不同的接口,可以从多种数据源创建DStream。
StreamingContext提供了起停streaming job的api。
Spark Streaming是建立在Spark基础上的,它封装了Spark的RDD并在其上抽象了流式的数据表现形式DStream:
A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data. DStreams can either be created from live data (such as, data from HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations such as map, window and reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream
复制代码
1.png (29.73 KB, 下载次数: 0)
21:42 上传
DStream内部主要结构如下所示:
abstract class DStream[T: ClassManifest] (
& & @transient protected[streaming] var ssc: StreamingContext
& & & & ) extends Serializable with Logging {
&&initLogging()
&&// =======================================================================
&&// Methods that should be implemented by subclasses of DStream
&&// =======================================================================
&&/** Time interval after which the DStream generates a RDD */
&&def slideDuration: Duration
&&/** List of parent DStreams on which this DStream depends on */
&&def dependencies: List[DStream[_]]
&&/** Method that generates a RDD for the given time */
&&/** DStream的核心函数,每一个继承于此的子类都需要实现此compute()函数。而根据不同的
& && &DStream, compute()函数都需要实现其特定功能,而计算的结果则是返回计算好的RDD*/
&&def compute (validTime: Time): Option[RDD[T]]
&&// =======================================================================
&&// Methods and fields available on all DStreams
&&// =======================================================================
&&// RDDs generated, marked as protected[streaming] so that testsuites can access it
&&/** 每一个DStream内部维护的RDD HashMap,DStream本质上封装了一组以Time为key的RDD,而对于
& && &DStream的各种操作在内部映射为对RDD的操作 */
&&@transient
&&protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
&&// Time zero for the DStream
&&protected[streaming] var zeroTime: Time = null
&&// Duration for which the DStream will remember each RDD created
&&protected[streaming] var rememberDuration: Duration = null
&&// Storage level of the RDDs in the stream
&&protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
&&// Checkpoint details
&&protected[streaming] val mustCheckpoint = false
&&protected[streaming] var checkpointDuration: Duration = null
&&protected[streaming] val checkpointData = new DStreamCheckpointData(this)
&&// Reference to whole DStream graph
&&/** 所有的DStream都注册到DStreamGraph中,调用DStreamGraph来执行所有的DStream和所有的dependencies */
&&protected[streaming] var graph: DStreamGraph = null
&&protected[streaming] def isInitialized = (zeroTime != null)
&&// Duration for which the DStream requires its parent DStream to remember each RDD created
&&protected[streaming] def parentRememberDuration = rememberDuration
&&...复制代码
DStream在内部维护了一组时间序列的RDD,对于DStream的transformation和output在内部都转化为对于RDD的transformation和output。
下面来看一下对于DStream的计算是如何映射到对于RDD的计算上去的。
protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
&&// If this DStream was not initialized (i.e., zeroTime not set), then do it
&&// If RDD was already generated, then retrieve it from HashMap
&&generatedRDDs.get(time) match {
& & // If an RDD was already generated and is being reused, then
& & // probably all RDDs in this DStream will be reused and hence should be cached
& & case Some(oldRDD) =& Some(oldRDD)
& & // if RDD was not generated, and if the time is valid
& & // (based on sliding time of this DStream), then generate the RDD
& & case None =& {
& && &if (isTimeValid(time)) {
& && &&&/** 对于每一次的计算,DStream会调用子类所实现的compute()函数来计算产生新的RDD */
& && &&&compute(time) match {
& && && & case Some(newRDD) =&
& && && && &if (storageLevel != StorageLevel.NONE) {
& && && && &&&newRDD.persist(storageLevel)
& && && && &&&logInfo(&Persisting RDD & + newRDD.id + & for time & + time + & to & + storageLevel + & at time & + time)
& && && && &}
& && && && &if (checkpointDuration != null && (time - zeroTime).isMultipleOf (checkpointDuration)) {
& && && && &&&newRDD.checkpoint()
& && && && &&&logInfo(&Marking RDD & + newRDD.id + & for time & + time + & for checkpointing at time & + time)
& && && && &}
& & & & & & & & & & & & /** 新产生的RDD会放入Hash Map中 */
& && && && &generatedRDDs.put(time, newRDD)
& && && && &Some(newRDD)
& && && & case None =&
& && && && &None
& && &&&}
& && &} else {
& && &&&None
& && &}
& & }
&&}
}复制代码
通过每次提交的job,调用getOrCompute()来计算:
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
& & case Some(rdd) =& {
& && &val jobFunc = () =& {
& && &&&val emptyFunc = { (iterator: Iterator[T]) =& {} }
& && &&&context.sparkContext.runJob(rdd, emptyFunc)
& && &}
& && &Some(new Job(time, jobFunc))
& & }
& & case None =& None
&&}
}复制代码
Job & Scheduler
从DStream可知,在调用generateJob()时,DStream会通过getOrCompute()函数来计算或是转换DStream,那么Spark Streaming会在何时调用generateJob()呢?
在实例化StreamingContext时,StreamingContext会要求用户设置batchDuration,而batchDuration则指明了recurring job的重复时间,在每个batchDuration到来时都会产生一个新的job来计算DStream,从Scheduler的代码里可以看到:
val clockClass = System.getProperty(&spark.streaming.clock&, &spark.streaming.util.SystemClock&)
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
/** Spark streaming在Scheduler内部创建了recurring timer,recurring timer的超时时间
& & 则是用户设置的batchDuration,在超时后调用Scheduler的generateJob */
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime =& generateJobs(new Time(longTime)))
generateJobs()的代码如下所示,Scheduler的generateJobs()会调用DStreamGraph的generateJobs,并对于每一个job使用JobManager来run job。
def generateJobs(time: Time) {
&&SparkEnv.set(ssc.env)
&&logInfo(&\n-----------------------------------------------------\n&)
&&graph.generateJobs(time).foreach(jobManager.runJob)
&&latestTime = time
&&doCheckpoint(time)
}复制代码
在DStreamGraph中,generateJobs()如下所示:
def generateJobs(time: Time): Seq[Job] = {
&&this.synchronized {
& & logInfo(&Generating jobs for time & + time)
& & val jobs = outputStreams.flatMap(outputStream =& outputStream.generateJob(time))
& & logInfo(&Generated & + jobs.length + & jobs for time & + time)
& & jobs
&&}
}复制代码
对于每一个outputStream调用generateJob()来转换或计算DStream,output的计算会依赖于dependecy的计算,因此最后会对所有dependency都进行计算,得出最后的outputStream。
而所有的这些操作,都在调用StreamingContext的启动函数后进行执行。
def start() {
&&if (checkpointDir != null && checkpointDuration == null && graph != null) {
& & checkpointDuration = graph.batchDuration
&&}
&&validate()
&&/** StreamingContext注册和启动所有的input stream */
&&val networkInputStreams = graph.getInputStreams().filter(s =& s match {
& && &case n: NetworkInputDStream[_] =& true
& && &case _ =& false
& & }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
&&if (networkInputStreams.length & 0) {
& & // Start the network input tracker (must start before receivers)
& & networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
& & networkInputTracker.start()
&&}
&&Thread.sleep(1000)
&&// 启动scheduler进行streaming的操作
&&scheduler = new Scheduler(this)
&&scheduler.start()
}复制代码
至此,对于Spark Streaming的使用和内部结构应该有了一个基本的了解,以一副Spark Streaming启动后的流程图来结束这篇文章。
1.png (54.47 KB, 下载次数: 1)
21:43 上传
欢迎加入about云群 、 ,云计算爱好者群,关注
积极上进,爱好学习
经常参与各类话题的讨论,发帖内容较有主见
长期对论坛的繁荣而不断努力,或多次提出建设性意见
为论坛做出突出贡献的会员
站长推荐 /5
about云|新出视频,openstack零基础入门,解决你ping不通外网难题
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
about云推出hadoop生态系统零基础视频附加hadoop2.x视频(hadoop系列及机器学习storm入门、实战、spark三套视频等)
等待验证会员请验证邮箱
新手获取积分方法
Powered byapache spark单机安装教程
- 解道Jdon
& & & &&& & &
  以windows环境安装spark为案例,首先下载spark:
然后解压编译,完成时间如下 :
C:\Develop\Source\Spark\spark-0.8.0-incubating&sbt\sbt.cmd assembly
[info] Done packaging.
[info] Packaging C:\Develop\Source\Spark\spark-0.8.0-incubating\examples\target\
scala-2.9.3\spark-examples-assembly-0.8.0-incubating.jar ...
[info] Done packaging.
[success] Total time: 1265 s, completed
Spark Shell的运行
如下启动Spark:
C:\Develop\Source\Spark\spark-0.8.0-incubating&spark-shell.cmd
Welcome to
___ _____/ /__
_\ \/ _ \/ _ `/ __/
/___/ .__/\_,_/_/ /_/\_\
version 0.8.0
13/11/04 21:45:18 INFO ui.SparkUI: Started Spark Web UI at http://haumea:4040
Spark context available as sc.
Type in expressions to have them evaluated.
Type :help for more information.
Web访问情况如下:
操作Shell的命令如下:
scala& val textFile = sc.textFile(&README.md&)
textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at &console&:12
scala& textFile.count()
res1: Long = 111
scala& textFile.first()
res2: String = # Apache Spark
scala& textFile.foreach(println(_))
上述命令运行后,结果在UI显示如下:
再次运行下面命令:
scala& val linesWithSpark = textFile.filter(line =& line.contains(&Spark&))
linesWithSpark: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at &console&:14
scala& linesWithSpark.foreach(println(_))
# Apache Spark
You can find the latest Spark documentation, including a programming
Spark requires Scala 2.9.3 (Scala 2.10 is not yet supported). The project is
Spark and its example programs, run:
Once you've built Spark, the easiest way to start using it is the shell:
Spark also comes with several sample programs in the `examples` directory.
./run-example org.apache.spark.examples.SparkLR local[2]
All of the Spark samples take a `&master&` parameter that is the cluster URL
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
Hadoop, you must build Spark against the same version that your cluster runs.
when building Spark.
When developing a Spark application, specify the Hadoop version by adding the
in the online documentation for an overview on how to configure Spark.
Apache Spark is an effort undergoing incubation at The Apache Software
## Contributing to Spark
scala& exit
下面创建一个TextCount,然后在其下创建子目录:
TextCount/src/main/scala/TextCountApp.scala
TextCount/count.sbt
TextCount/src/main/scala/TextCountApp.scala内容如下:
/*** TextCountApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object TextCountApp {
def main(args: Array[String]) {
val logFile = &C:/Develop/Source/Spark/spark-0.8.0-incubating/README.md&
val sc = new SparkContext(&local&, &TextCountApp&, &C:/Develop/Source/Spark/spark-.8.0-incubating&,
List(&target/scala-2.9.3/count-project_2.9.3-1.0.jar&))
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line =& line.contains(&a&)).count()
val numBs = logData.filter(line =& line.contains(&b&)).count()
val numSparks = logData.filter(line =& line.contains(&Spark&)).count()
println(&Lines with a: %s, Lines with b: %s, Lines with Spark: %s&.format(numAs, numBs, numSparks))
SparkContext是个参数的意义:
第一个参数 : URL (本地单机)
第二参数 : 应用程序名称
第三个参数 : spark安装目标目录
第四个参数 : 应用程序依赖的库
创建extCount / count.sbt
name := &Count Project&
version := &1.0&
scalaVersion := &2.9.3&
libraryDependencies += &org.apache.spark& %% &spark-core& % &0.8.0-incubating&
resolvers += &Akka Repository& at &http://repo.akka.io/releases/&
运行下面的命令。
然后进行编译 ,生成JAR文件:
C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount&..\sbt\sbt.cmd package
[info] Packaging C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount\target\scala-2.9.3\count-project_2.9.3-1.0.jar ...
[info] Done packaging.
[success] Total time: 7 s, completed
尝试运行建立的应用程序,看到结果如下:
C:\Develop\Source\Spark\spark-0.8.0-incubating\TextCount&..\sbt\sbt.cmd run
[info] Set current project to Count Project (in build file:/C:/Develop/Source/Spark/spark-0.8.0-incubating/TextCount/)
[info] Running TextCountApp
13/11/04 22:33:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/04 22:33:27 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/04 22:33:27 INFO mapred.FileInputFormat: Total input paths to process : 1
Lines with a: 66, Lines with b: 35, Lines with Spark: 15 //执行結果
[success] Total time: 6 s, completed
好了,我们已经可以在单机情况下运行Spark。
| 网站地图 | 设为首页

我要回帖

更多关于 王家林 spark视频下载 的文章

 

随机推荐