28.10.15

Command line to compile zeppelin for Spark 1.5.1

mvn clean  package -Pspark-1.5 -Dspark.version=1.5.1 -Dhadoop.version=2.7.1 -Phadoop-2.6 -Pyarn -DskipTests -Ppyspark -Pbuild-distr

13.8.15

Run-example from Spark in HDP 2.3.0

[spring-xd@14514aea325a ~]$ run-example SparkPi 10
/usr/bin/run-example: line 24: /usr/bin/load-spark-env.sh: No such file or directory
Failed to find Spark examples assembly in /usr/lib or /usr/examples/target
You need to build Spark before running this program

I need to run it like this.

$ /usr/hdp/2.3.0.0-2557/spark/bin/run-example SparkPi 10


11.8.15

A possible Isilon HDFS issue with Hadoop 2.7.1

A simple hadoop fs -cat command may throw the following exception:

java.lang.IllegalStateException: Must not use direct buffers with InputStream API
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:211)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:201)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:737)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:793)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:853)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:896)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:697)
at java.io.DataInputStream.readShort(DataInputStream.java:312)
at org.apache.hadoop.fs.shell.Display$Text.getInputStream(Display.java:136)
at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:102)
at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:317)
at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:289)
at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:271)
at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:255)
at org.apache.hadoop.fs.shell.Command.processRawArguments(Command.java:201)
at org.apache.hadoop.fs.shell.Command.run(Command.java:165)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:287)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)

at org.apache.hadoop.fs.FsShell.main(FsShell.java:340)


When looking into this artitle:

http://blog.sequenceiq.com/blog/2014/03/07/read-from-hdfs/

I found a possible solution and it works for me:

        <property>
                <name>dfs.client.use.legacy.blockreader</name>
                <value>true</value>
        </property>

7.8.15

Spring XD File Endpoint

src/main/java/org/springframework/integration/file/config
src/main/java/org/springframework/integration/file/event
src/main/java/org/springframework/integration/file/filters
src/main/java/org/springframework/integration/file/locking
src/main/java/org/springframework/integration/file/remote
src/main/java/org/springframework/integration/file/remote/gateway
src/main/java/org/springframework/integration/file/remote/handler
src/main/java/org/springframework/integration/file/remote/session
src/main/java/org/springframework/integration/file/remote/synchronizer
src/main/java/org/springframework/integration/file/splitter
src/main/java/org/springframework/integration/file/support
src/main/java/org/springframework/integration/file/tail
src/main/java/org/springframework/integration/file/transformer


  • config
    • Parser
      • FileInboundChannelAdapterParser
      • FileOutboundChannelAdapterParser
      • FileOutboundGatewayParser
      • FileParserUtils
      • FileSplitterParser
      • FileTailInboundChannelAdapterParser
      • FileToByteArrayTransformerParser
      • FileToStringTransformerParser
      • RemoteFileOutboundChannelAdapterParser

                  • event
                  • filters
                  • locking
                  • remote
                    • gateway
                    • handler
                    • session
                    • synchronizer
                  • splitter
                  • support
                  • tail
                  • transformer


                  Spring XD


                  上面这个可以是安装整个cluster的参考。不过现在我只想专心于Spring XD的配置和开发。


                  这个介绍有点意思,之前看过,但看完后印象不深,现在基本上忘得差不多了。所以还是需要实践,要接地气一点。

                    • wget下载
                    • rpm安装
                    • 然后service运行两个业务:spring-xd-admin,spring-xd-container

                  注意还要装jdk

                  安装好后,就可以顺着这个Quick Start来看看怎样用了。

                  基本上可以实验到--deploy --destroy 一个ticktock实验,接下来就很多关于cluster的东西。

                  当然可以选择马上进入cluster,不过作为一个job scheduler,我觉得可以先考虑学习spring-xd tasklet的编程。

                  Spring XD is a unified, distributed, and extensible service for data ingestion, real time analytics, batch processing, and data export. The foundations of XD's architecture are based on the over 100+ man years of work that have gone into the Spring Batch, Integration and Dat projects. Building upon these projects, Spring XD provides servers and a configuration DSL that you can immediately use to start processing data. You do not need to build an application yourself from a collection of jars to start using Spring XD.

                    • Runtime Architecture
                  The key components in Spring XD are the XD Admin and XD Container Servers. Using a high-level DSL, you post the description of the required processing task to the Admin server over HTTP. The Admin server then maps the processing tasks into processing modules. A module is a unit of execution and is implemented as Spring ApplicationContext. A distributed runtime is provided that will assign modules to execute across multiple XD Container servers. A single XD Container server can run multiple modules. When using the single node runtime, all modules are run in a single XD Container and the XD Admin is run in the same process.

                    • DIRT Runtime
                  A distributed runtime, called Distributed Integration Runtime, aka DIRT, will distribute the processing tasks across multiple XD Container instances. The XD Admin server breaks up a processing task into  individual module definitions and assigns each module to a container instance using ZooKeeper. Each container listens for module definitions to which it has been assigned and deploys the module, creating a Spring ApplicationContext to run it.

                  distributed node

                  Modules share data by passing messages using a configured messaging middleware (Rabbit, Redis, or Local for single node). To reduce the number of hops across messaging middleware between them, multiple modules may be composed into larger deployment units that act as a single modules. 

                  Import concept:

                  • Container Server Architecture
                    • Streams
                      • Streams define how event driven data is collected, processed, and stored or forwarded. For example, a stream might collect syslog data, filter, and store it in HDFS.
                    • Jobs
                      • Jobs define how coarse grained and time consuming batch processing steps are orchestrated, for example a job could be defined to coordinate performing HDFS operations and the subsequent execution of multiple MapReduce processing tasks.
                    • Taps
                      • Taps are used to process data in a non-invasive way as data is being processed by a Stream or a Job. Much like wiretaps used on telephones, a Tap on a Stream lets you consume data at any point along the Stream's processing pipeline. The behavior of the original stream is unaffected by the presence of the Tap.

                  tap jobs streams


                  The programming model for processing streams in Spring XD is based on the well known Enterprise Integration Patterns as implemented by components in the Spring Integration project. The programming model was designed so that it is easy to test components.

                  • Stream Deployment
                  The Container Server listens for module deployment events initiated from the Admin Server via ZooKeeper. When the container node handles a module deployment event, it connects the module's input and output channels to the data bus used to transport messages during stream processing. In a single node configuration, the data bus uses in-memory direct channels. In a distributed configuration , the data bus communications are backed by the configured trasnport middleware. Redis and Rabbit are both provided with the Spring XD distribution, but  other transports are envisioned for future release.

                  anatomyOfAStreamSingleNode



                  anatomyOfAStreamV3


                  21.6.15

                  Play with Play Framework within Docker

                  1. Installation

                  https://www.playframework.com/documentation/2.4.x/Installing

                  I ran a Java 8 docker container and copied typesafe-activator-1.3.5.zip to the target container. Thanks to boot2docker, /Users director in Mac has already been mount to boot2docker host, and I could simply do a cp command between my Mac host system and the container file system.

                  2. Port forwarding

                  https://github.com/boot2docker/boot2docker/blob/master/doc/WORKAROUNDS.md

                  According to this link, I ran the following command to forward port 8888 on my Mac to my target container's port 8888.

                  Firstly I need to find out the IP address of the target container:

                  boot2docker ssh docker inspect <container name> | grep Address

                          "MacAddress": "",
                          "GlobalIPv6Address": "",
                          "IPAddress": "172.17.0.42",
                          "LinkLocalIPv6Address": "fe80::42:acff:fe11:2a",

                          "MacAddress": "02:42:ac:11:00:2a",

                  172.17.0.42 was the one I needed.

                  boot2docker ssh -vnNTL 8888:172.17.0.42:8888

                  Done

                  3. Start activator and have it listen on 0.0.0.0:8888 instead of 127.0.0.1:8888


                  According to this post:

                  $activator -Dhttp.address=0.0.0.0 -Dhttp.port=8888 ui


                  Experiencing ZooKeeper in Docker

                  Build a Docker Image for ZK Server

                  Installing ZK server is simple, so as building a docker image for it. You can simple use the Dockerfile from the following github project:


                  I create a very simple one for just using Standalone ZK server:


                  It is good enough for developing zk clients.

                  Using nc Command from a Client Container

                  The simplest way to try a zk Server is to use the "nc" command.

                  Step1: Run a container with --link to the zk Server

                  docker run -t -i --name zkc --link zk:zk ubuntu:14.04 /bin/bash

                  Step2: Use the four-letter-word command to test the zk Server

                  http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html#sc_zkCommands

                  Using zkCli.sh from Client Containers

                  Although the four-letter-commands are convenient, they are just for quickly testing the server. zkCli is the shell command line tool to explore the server more.

                  Step1: Install zk binary

                  curl -fsSL http://mirrors.sonic.net/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz | tar -C /opt -xz 

                  But you can actually run the zk Server image as a client container.

                  $./bin/zkCli.sh -server zk:2181

                  [zk: zk:2181(CONNECTED) 0] \h
                  ZooKeeper -server host:port cmd args
                  connect host:port
                  get path [watch]
                  ls path [watch]
                  set path data [version]
                  rmr path
                  delquota [-n|-b] path
                  quit 
                  printwatches on|off
                  create [-s] [-e] path data acl
                  stat path [watch]
                  close 
                  ls2 path [watch]
                  history 
                  listquota path
                  setAcl path acl
                  getAcl path
                  sync path
                  redo cmdno
                  addauth scheme auth
                  delete path [version]
                  setquota -n|-b val path








                  18.6.15

                  Apache Kafka源码分析 – Broker Server


                  How to fix Eclipse's problem when importing Kafka project


                  I saw this:

                  Description Resource Path Location Type
                  scalatest_2.10-1.9.1.jar of core build path is cross-compiled with an incompatible version of Scala (2.10.0). In case this report is mistaken, this check can be disabled in the compiler preference page. core Unknown Scala Version Problem

                  Fixing:

                  1. Right click on project core.
                  2. Properties
                  3. Scala compiler
                  4. Click on "Use project settings"
                  5. Select Use Latest 2.10 bundle(dynamic)

                  20.5.15

                  关于Hadoop和Cassandra性能问题的讨论 - ImportNew


                  18.5.15

                  Errors: CDA - 20150518


                  1. For the default threefold replication, Hadoop’s rack placement policy is to write the first copy of a block on a node in one rack, then the other two copies on two nodes in a different rack. Since the first copy is written to rack2, the other two will either be written to two nodes on rack1, or two nodes on rack3.
                  2. Apache HBase provides random, realtime read/write access to your data. HDFS does not allow random writes. HDFS is built for scalability, fault tolerance, and batch processing.
                  3. Each slave node in a cluster configured to run MapReduce v2 (MRv2) on YARN typically runs a DataNode daemon (for HDFS functions) and NodeManager daemon (for YARN functions). The NodeManager handles communication with the ResourceManager, oversees application container lifecycles, monitors CPU and memory resource use of the containers, tracks the node health, and handles log management. It also makes available a number of auxiliary services to YARN applications. Further Reading
                    • Configure YARN Daemons
                  4. When the first NameNode is started, it connects to ZooKeeper and registers itself as the Active NameNode. The next NameNode then sees that information and sets itself up in Standby mode (in fact, the ZooKeeper Failover Controller is the software responsible for the actual communication with ZooKeeper). Clients never connect to ZooKeeper to discover anything about the NameNodes. In an HDFS HA scenario, ZooKeeper is not used to keep track of filesystem changes. That is the job of the Quorum Journal Manager daemons.
                    • You decide to create a cluster which runs HDFS in High Availability mode with automatic failover, using Quorum-based Storage. Which service keeps track of which NameNode is active at any given moment?
                  5. Which three describe functions of the ResourceManager in YARN?
                    • Tracking heartbeats from the NodeManagers
                    •  Running a scheduler to determine how resources are allocated
                    •  Monitoring the status of the ApplicationMaster container and restarting on failure
                    • Explanation:
                      • The ResourceManager has two main components: Scheduler and ApplicationsManager.
                      • The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc.
                      • The ResourceManger tracks heartbeats from the NodeManagers to determine available resources then schedules those resources based on the scheduler specific configuration.
                      • The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
                      • The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress. Depending on the type of application, this may include monitoring the map and reduce tasks progress, restarting tasks, and archiving job history and meta-data.
                      • The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.
                      • http://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-site/YARN.html
                  6. You are running a Hadoop cluster with a NameNode on host hadoopNN01, a Secondary NameNode on host hadoopNN02 and several DataNodes.
                    • How can you determine when the last checkpoint happened?
                      • Connect to the web UI of the Secondary NameNode (http://hadoopNN02:50090/) and look at the “Last Checkpoint” information.
                    • Explanation
                      • The Secondary NameNode’s Web UI contains information on when it last performed its checkpoint operation. This is not displayed via the NameNode’s Web UI, and is not available via the hdfs dfsadmin command. 
                  7. A specific node in your cluster appears to be running slower than other nodes with the same hardware configuration. You suspect a RAM failure in the system. Which commands may be used to the view the memory seen in the system?
                    • free
                    • top
                    • dmidecode
                    • Explanation
                      • dmidecode shows bios information on a running system. The amount of installed RAM and size of the modules in each slot can be found in the output.
                      • Additionally memory and swap usage can be viewed with cat /proc/meminfo or vmstat. 
                  8. You are running a Hadoop cluster with a NameNode on host mynamenode. What are two ways you can determine available HDFS space in your cluster?
                      • Run hdfs dfsadmin -report and locate the DFS Remaining value.
                        • Connect to http://mynamenode:50070/ and locate the DFS Remaining value.
                        • Explanation:

                      1. When a client wishes to read a file from HDFS, it contacts the NameNode and requests the locations and names of the first few blocks in the file. It then directly contacts the DataNodes containing those blocks to read the data. It would be very wasteful to move blocks around the cluster based on a client’s read request, so that is never done. Similarly, if all data was passed via the NameNode, the NameNode would immediately become a serious bottleneck and would slow down the cluster operation dramatically.
                      2. As well as the block itself, a separate file is written containing checksums for the data in the file. These checksums are used when the data is being written, to ensure that no data corruption has occurred. No metadata regarding the name of the file of which the block is a part, or information about that file, is stored on the DataNode.
                        • Hadoop: The Definitive Guide, 3rd Edition, Chapter 4, under the section “Data Integrity in HDFS.”
                      3. The NameNode needs to know which DataNodes hold each HDFS block. How is that block location information managed?
                        • The NameNode stores the block locations in RAM. They are never stored on disk.
                        • Explanation
                          • The NameNode never stores the HDFS block locations on disk; it only stores the names of the blocks associated with each file. After the NameNode starts up, each DataNode heartbeats in and sends its block report, which lists all the blocks it holds. The NameNode keeps that information in RAM.
                          • Hadoop The Definitive Guide, Chapter 3, under the section "Namenodes and Datanodes"
                      4. What are the permissions of a file in HDFS with the following: rw-rw-r-x?
                        • No one can modify the contents of the file.
                        • Ex:
                          • The permissions show that the file can be read from and written to (appended to) by the owner and anyone in the owner's group, and read from by anyone else (it is 'world readable'). The execute permission on a file in HDFS is ignored. 
                          • That the file's existing contents cannot be modified by anyone because HDFS is a write-once filesystem. Once a file has been written, its contents cannot be changed.
                            • See the Overview section of the docs for a discussion of capabilities encapsulated by each permission mode.  
                      5. Hadoop performs best when disks on slave nodes are configured as JBOD (Just a Bunch Of Disks). There is no need for RAID on individual nodes -- Hadoop replicates each block three times on different nodes for reliability. A single Linux LVM will not perform as well as having the disks configured as separate volumes and, importantly, the failure of one disk in an LVM volume would result in the loss of all data on that volume, whereas the failure of a single disk if all were configured as separate volumes would only result in the loss of data on that one disk (and, of course, each block on that disk is replicated on two other nodes in the cluster).
                      6. MRv1: $ hadoop fsck / 
                          • The number of DataNodes in the cluster
                            •  The number of under-replicated blocks in the cluster
                              • In its standard form, hadoop fsck / will return information about the cluster including the number of DataNodes and the number of under-replicated blocks. 
                              • To view a list of all the files in the cluster, the command would be hadoop fsck / -files
                              • To view a list of all the blocks, and the locations of the blocks, the command would be hadoop fsck / -files -blocks -locations
                              • Additionally, the -racks option would display the rack topology information for each block. 
                              • Further Reading
                              • Hadoop Operations: A Guide for Developers and Administrators, Chapter 8, under the heading “Checking Filesystem Integrity with fsck” Hadoop: The Definitive Guide, 3rd Edition, Chapter 10, under the heading “Tools”
                          1. On a cluster which is NOT running HDFS High Availability, which four pieces of information does the NameNode store on disk?
                            • Names of the files in HDFS
                            • The directory structure of the files in HDFS
                            • An edit log of changes that have been made since the last snapshot compaction by the Secondary NameNode
                            • File permissions of the files in HDFS
                              • The NameNode holds its metadata in RAM for fast access. However, it also needs to persist that information to disk. The initial metadata on disk is stored in a file called fsimage. Metadata in fsimage includes the names of all the files in HDFS, their locations (the directory structure), and file permissions. Whenever a change is made to the metadata (such as a new file being created, or a file being deleted), that information is written to a file on disk called edits. Periodically, the Secondary NameNode coalesces the edits and fsimage files and writes the result back as a new fsimage file, at which point the NameNode can delete its old edits file. 
                              • The NameNode has no knowledge of when it was last backed up. Heartbeat information from the DataNodes is held in RAM but is not persisted to disk.
                          2. Block size is a client-side parameter; the value used will be whatever value is present on the machine creating the file. Setting the value on the NameNode has no effect unless a process on the NameNode is acting as a client and writing a file to HDFS. 
                          3. What is the basis of resource allocation using the Fair Scheduler in YARN?
                            • Resources are allocated in terms of minimum and maximum memory and cpu usage per queue.
                              • In a YARN cluster using the Fair Scheduler, resources are allocated based on the queue usage of either memory or a combination of memory and cpu. To be "fair" the queue using the least resources (and having demand for more resources) receives the next allocation of available resources. The allocations file is used to specify the minimum and maximum resources allocated to a queue.
                              • For more information see http://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
                              • "By default, the Fair Scheduler bases scheduling fairness decisions only on memory. It can be configured to schedule with both memory and CPU, using the notion of Dominant Resource Fairness developed by Ghodsi et al. When there is a single app running, that app uses the entire cluster. When other apps are submitted, resources that free up are assigned to the new apps, so that each app eventually on gets roughly the same amount of resources."
                              • "The scheduler organizes apps further into "queues", and shares resources fairly between these queues. By default, all users share a single queue, named “default”. If an app specifically lists a queue in a container resource request, the request is submitted to that queue. It is also possible to assign queues based on the user name included with the request through configuration. Within each queue, a scheduling policy is used to share resources between the running apps. The default is memory-based fair sharing, but FIFO and multi-resource with Dominant Resource Fairness can also be configured. Queues can be arranged in a hierarchy to divide resources and configured with weights to share the cluster in specific proportions."
                              • Also review the architecture of YARN at http://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-site/YARN.html
                              • "The ResourceManager has two main components: Scheduler and ApplicationsManager."
                              • "The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc." 
                              • "The Scheduler has a pluggable policy plug-in, which is responsible for partitioning the cluster resources among the various queues, applications etc. The current Map-Reduce schedulers such as the CapacityScheduler and the FairScheduler would be some examples of the plug-in.The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler."

                          Example list in Learning Spark

                          Example 2-2. Scala line count
                          Example 2-3. Examining the sc variable
                          Example 2-4. Python filtering example
                          Example 2-5. Scala filtering example
                          Example 2-6. Running a Python script
                          Example 2-7. Initializing Spark in Python
                          Example 2-9. Initializing Spark in Java
                          Example 2-10. Word count Java application?don?t worry about the details yet
                          Example 2-11. Word count Scala application?don?t worry about the details yet
                          Example 2-13. Maven build file
                          Example 2-14. Scala build and run
                          Example 2-15. Maven build and run
                          Example 3-1.
                          Example 3-1. Creating an RDD of strings with textFile() in Python
                          Example 3-2. Calling the filter() transformation
                          Example 3-3. Calling the first() action
                          Example 3-4. Persisting an RDD in memory
                          Example 3-5. parallelize() method in Python
                          Example 3-6. parallelize() method in Scala
                          Example 3-7. parallelize() method in Java
                          Example 3-8. textFile() method in Python
                          Example 3-9. textFile() method in Scala
                          Example 3-10. textFile() method in Java
                          Example 3-11. filter() transformation in Python
                          Example 3-12. filter() transformation in Scala
                          Example 3-13. filter() transformation in Java
                          Example 3-16. Scala error count using actions
                          Example 3-17. Java error count using actions
                          Example 3-18. Passing functions in Python
                          Example 3-19. Passing a function with field references (don?t do this!)
                          Example 3-20. Python function passing without field references
                          Example 3-22. Java function passing with anonymous inner class
                          Example 3-23. Java function passing with named class
                          Example 3-24.
                          Example 3-24. Java function class with parameters
                          Example 3-25. Java function passing with lambda expression in Java 8
                          Example 3-26. Python squaring the values in an RDD
                          Example 3-27. Scala squaring the values in an RDD
                          Example 3-28. Java squaring the values in an RDD
                          Example 3-29. flatMap() in Python, splitting lines into words
                          Example 3-30. flatMap() in Scala, splitting lines into multiple words
                          Example 3-33. reduce() in Scala
                          Example 3-34. reduce() in Java
                          Example 3-36. aggregate() in Scala
                          Example 3-37. aggregate() in Java
                          Example 3-38. Creating DoubleRDD in Java
                          Example 3-39. Double execution in Scala
                          Example 3-40. persist() in Scala
                          Example 4-1. Creating a pair RDD using the first word as the key in Python
                          Example 4-2. Creating a pair RDD using the first word as the key in Scala
                          Example 4-3. Creating a pair RDD using the first word as the key in Java
                          Example 4-4. Simple filter on second element in Python
                          Example 4-5. Simple filter on second element in Scala
                          Example 4-7. Per-key average with reduceByKey() and mapValues() in Python
                          Example 4-8. Per-key average with reduceByKey() and mapValues() in Scala
                          Example 4-9. Word count in Python
                          Example 4-10. Word count in Scala
                          Example 4-11. Word count in Java
                          Example 4-12. Per-key average using combineByKey() in Python
                          Example 4-13. Per-key average using combineByKey() in Scala
                          Example 4-14. Per-key average using combineByKey() in Java
                          Example 4-16. reduceByKey() with custom parallelism in Scala
                          Example 4-17. Scala shell inner join
                          Example 4-18. leftOuterJoin() and rightOuterJoin()
                          Example 4-19. Custom sort order in Python, sorting integers as if strings
                          Example 4-20. Custom sort order in Scala, sorting integers as if strings
                          Example 4-21. Custom sort order in Java, sorting integers as if strings
                          Example 4-22.
                          Example 4-22. Scala simple application
                          Example 4-23.
                          Example 4-25. Scala PageRank
                          Example 4-26. Scala custom partitioner
                          Example 4-27. Python custom partitioner
                          Example 5-1. Loading a text file in Python
                          Example 5-2. Loading a text file in Scala
                          Example 5-3. Loading a text file in Java
                          Example 5-4. Average value per file in Scala
                          Example 5-5. Saving as a text file in Python
                          Example 5-7. Loading JSON in Scala
                          Example 5-8. Loading JSON in Java
                          Example 5-9. Saving JSON in Python
                          Example 5-10. Saving JSON in Scala
                          Example 5-11. Saving JSON in Java
                          Example 5-12. Loading CSV with textFile() in Python
                          Example 5-13. Loading CSV with textFile() in Scala
                          Example 5-14. Loading CSV with textFile() in Java
                          Example 5-15. Loading CSV in full in Python
                          Example 5-17. Loading CSV in full in Java
                          Example 5-18. Writing CSV in Python
                          Example 5-19. Writing CSV in Scala
                          Example 5-21. Loading a SequenceFile in Scala
                          Example 5-22. Loading a SequenceFile in Java
                          Example 5-23.
                          Example 5-23. Saving a SequenceFile in Scala
                          Example 5-24. Loading KeyValueTextInputFormat() with old-style API in Scala
                          Example 5-25. Loading LZO-compressed JSON with Elephant Bird in Scala
                          Example 5-26. Saving a SequenceFile in Java
                          Example 5-27. Sample protocol buffer definition
                          Example 5-28. Elephant Bird protocol buffer writeout in Scala
                          Example 5-29. Loading a compressed text file from the local filesystem in Scala
                          Example 5-30. Creating a HiveContext and selecting data in Python
                          Example 5-31. Creating a HiveContext and selecting data in Scala
                          Example 5-32. Creating a HiveContext and selecting data in Java
                          Example 5-33. Sample tweets in JSON
                          Example 5-34. JSON loading with Spark SQL in Python
                          Example 5-35. JSON loading with Spark SQL in Scala
                          Example 5-36. JSON loading with Spark SQL in Java
                          Example 5-37. JdbcRDD in Scala
                          Example 5-38. sbt requirements for Cassandra connector
                          Example 5-39. Maven requirements for Cassandra connector
                          Example 5-40. Setting the Cassandra property in Scala
                          Example 5-42. Loading the entire table as an RDD with key/value data in Scala
                          Example 5-43. Loading the entire table as an RDD with key/value data in Java
                          Example 5-45. Scala example of reading from HBase
                          Example 5-46. Elasticsearch output in Scala
                          Example 5-47. Elasticsearch input in Scala
                          Example 6-1. Sample call log entry in JSON, with some fields removed
                          Example 6-2. Accumulator empty line count in Python
                          Example 6-3. Accumulator empty line count in Scala
                          Example 6-4. Accumulator empty line count in Java
                          Example 6-5. Accumulator error count in Python
                          Example 6-6. Country lookup in Python
                          Example 6-7. Country lookup with Broadcast values in Python
                          Example 6-8. Country lookup with Broadcast values in Scala
                          Example 6-10. Shared connection pool in Python
                          Example 6-11. Shared connection pool and JSON parser in Scala
                          Example 6-12. Shared connection pool and JSON parser in Java
                          Example 6-13. Average without mapPartitions() in Python
                          Example 6-14. Average with mapPartitions() in Python
                          Example 6-15. R distance program
                          Example 6-16. Driver program using pipe() to call finddistance.R in Python
                          Example 6-17. Driver program using pipe() to call finddistance.R in Scala
                          Example 6-18. Driver program using pipe() to call finddistance.R in Java
                          Example 6-20. Removing outliers in Scala
                          Example 6-21. Removing outliers in Java
                          Example 7-1. Submitting a Python application
                          Example 7-2. Submitting an application with extra arguments
                          Example 7-3. General format for spark-submit
                          Example 7-4. Using spark-submit with various options
                          Example 7-5. pom.xml file for a Spark application built with Maven
                          Example 7-6. Packaging a Spark application built with Maven
                          Example 7-8. Adding the assembly plug-in to an sbt project build
                          Example 8-1. Creating an application using a SparkConf in Python
                          Example 8-3. Creating an application using a SparkConf in Java
                          Example 8-5. Setting configuration values at runtime using a defaults file
                          Example 8-6. input.txt, the source file for our example
                          Example 8-7. Processing text data in the Scala Spark shell
                          Example 8-9. Collecting an RDD
                          Example 8-10. Computing an already cached RDD
                          Example 8-11. Coalescing a large RDD in the PySpark shell
                          Example 8-12. Registering a class allows Kryo to avoid writing full class names with
                          Example 9-1. Maven coordinates for Spark SQL with Hive support
                          Example 9-2. Scala SQL imports
                          Example 9-3. Scala SQL implicits
                          Example 9-4. Java SQL imports
                          Example 9-5. Python SQL imports
                          Example 9-6. Constructing a SQL context in Scala
                          Example 9-7. Constructing a SQL context in Java
                          Example 9-8. Constructing a SQL context in Python
                          Example 9-9. Loading and quering tweets in Scala
                          Example 9-10. Loading and quering tweets in Java
                          Example 9-11. Loading and quering tweets in Python
                          Example 9-12. Accessing the text column (also first column) in the topTweets
                          Example 9-13. Accessing the text column (also first column) in the topTweets
                          Example 9-14. Accessing the text column in the topTweets DataFrame in Python
                          Example 9-15. Hive load in Python
                          Example 9-16. Hive load in Scala
                          Example 9-17. Hive load in Java
                          Example 9-18. Parquet load in Python
                          Example 9-20. Parquet file save in Python
                          Example 9-21. Input records
                          Example 9-22. Loading JSON with Spark SQL in Python
                          Example 9-23. Loading JSON with Spark SQL in Scala
                          Example 9-25. Resulting schema from printSchema()
                          Example 9-26. Partial schema of tweets
                          Example 9-27.
                          Example 9-27. SQL query nested and array elements
                          Example 9-28. Creating a DataFrame using Row and named tuple in Python
                          Example 9-29. Creating a DataFrame from case class in Scala
                          Example 9-30. Creating a DataFrame from a JavaBean in Java
                          Example 9-32. Connecting to the JDBC server with Beeline
                          Example 9-33. Load table
                          Example 9-35. Spark SQL shell EXPLAIN
                          Example 9-36. Python string length UDF
                          Example 9-37. Scala string length UDF
                          Example 9-38. Java UDF imports
                          Example 9-40. Spark SQL multiple sums
                          Example 9-41. Beeline command for enabling codegen
                          Example 10-1. Maven coordinates for Spark Streaming
                          Example 10-2. Scala streaming imports
                          Example 10-3. Java streaming imports
                          Example 10-4. Streaming filter for printing lines containing ?error? in Scala
                          Example 10-5. Streaming filter for printing lines containing ?error? in Java
                          Example 10-6. Streaming filter for printing lines containing ?error? in Scala
                          Example 10-7. Streaming filter for printing lines containing ?error? in Java
                          Example 10-9.
                          Example 10-9. Log output from running Example 10-8
                          Example 10-10. map() and reduceByKey() on DStream in Scala
                          Example 10-11. map() and reduceByKey() on DStream in Java
                          Example 10-12. Joining two DStreams in Scala
                          Example 10-14. transform() on a DStream in Scala
                          Example 10-15. transform() on a DStream in Java
                          Example 10-16. Setting up checkpointing
                          Example 10-17. How to use window() to count data over a window in Scala
                          Example 10-18. How to use window() to count data over a window in Java
                          Example 10-19. Scala visit counts per IP address
                          Example 10-20. Java visit counts per IP address
                          Example 10-21. Windowed count operations in Scala
                          Example 10-22. Windowed count operations in Java
                          Example 10-23. Running count of response codes using updateStateByKey() in Scala
                          Example 10-24. Running count of response codes using updateStateByKey() in Java
                          Example 10-25. Saving DStream to text files in Scala
                          Example 10-26. Saving SequenceFiles from a DStream in Scala
                          Example 10-27. Saving SequenceFiles from a DStream in Java
                          Example 10-28. Saving data to external systems with foreachRDD() in Scala
                          Example 10-29. Streaming text files written to a directory in Scala
                          Example 10-30. Streaming text files written to a directory in Java
                          Example 10-31. Streaming SequenceFiles written to a directory in Scala
                          Example 10-32. Apache Kafka subscribing to Panda?s topic in Scala
                          Example 10-33. Apache Kafka subscribing to Panda?s topic in Java
                          Example 10-34. Apache Kafka directly reading Panda?s topic in Scala
                          Example 10-35. Apache Kafka directly reading Panda?s topic in Java
                          Example 10-36. Flume configuration for Avro sink
                          Example 10-37. FlumeUtils agent in Scala
                          Example 10-38. FlumeUtils agent in Java
                          Example 10-39.
                          Example 10-39. Maven coordinates for Flume sink
                          Example 10-40. Flume configuration for custom sink
                          Example 10-41. FlumeUtils custom sink in Scala
                          Example 10-43. SparkFlumeEvent in Scala
                          Example 10-45. Setting up a driver that can recover from failure in Scala
                          Example 10-46. Setting up a driver that can recover from failure in Java
                          Example 10-47.
                          Example 10-47. Launching a driver in supervise mode
                          Example 10-48. Enable the Concurrent Mark-Sweep GC
                          Example 11-1. Spam classifier in Python
                          Example 11-2. Spam classifier in Scala
                          Example 11-3. Spam classifier in Java
                          Example 11-4. Creating vectors in Python
                          Example 11-5. Creating vectors in Scala
                          Example 11-6. Creating vectors in Java
                          Example 11-7. Using HashingTF in Python
                          Example 11-8. Using TF-IDF in Python
                          Example 11-9. Scaling vectors in Python
                          Example 11-10. Linear regression in Python
                          Example 11-11. Linear regression in Scala
                          Example 11-12. Linear regression in Java
                          Example 11-13. PCA in Scala
                          Example 11-14. SVD in Scala
                          Example 11-15. Pipeline API version of spam classification in Scala


                          Various ways to run Scala code | ajduke's blog

                          Various ways to run Scala code | ajduke's blog

                          TOC of Learning Spark

                          Preface | 5
                          Audience | 5
                          How This Book is Organized | 6
                          Supporting Books | 6
                          Code Examples | 7
                          Early Release Status and Feedback | 7

                          Chapter 1. Introduction to Data Analysis with Spark | 8
                          What is Apache Spark? | 8
                          A Unified Stack | 8
                          Who Uses Spark, and For What? | 11
                          A Brief History of Spark | 13
                          Spark Versions and Releases | 13
                          Spark and Hadoop | 14

                          Chapter 2. Downloading and Getting Started | 15
                          Downloading Spark | 15
                          Introduction to Spark?s Python and Scala Shells | 16
                          Introduction to Core Spark Concepts | 20
                          Standalone Applications | 23
                          Conclusion | 25

                          Chapter 3. Programming with RDDs | 26
                          RDD Basics | 26
                          Creating RDDs | 28
                          RDD Operations | 28
                          Passing Functions to Spark | 32
                          Common Transformations and Actions | 36
                          Persistence (Caching) | 46
                          Conclusion | 48

                          Chapter 4. Working with Key-Value Pairs | 49
                          Motivation | 49
                          Creating Pair RDDs | 49
                          Transformations on Pair RDDs | 50
                          Actions Available on Pair RDDs | 60
                          Data Partitioning | 61
                          Conclusion | 70

                          Chapter 5. Loading and Saving Your Data | 71
                          Motivation | 71
                          Choosing a Format | 71
                          Formats | 72
                          File Systems | 88
                          Compression | 89
                          Databases | 91
                          Conclusion | 93

                          About the Authors | 95

                          17.5.15

                          Intro to Apache Spark函数与方法的词频统计

                          39    | sc.textFile
                          25    | messages.filter
                          13    | lines.filter
                          13    | errors.map
                          13    | messages.cache
                          11    | sc.parallelize
                          10    | l.split
                          9     | x.split
                          8     | distFile.flatMap
                          7     | f.flatMap
                          6     | ")).collect
                          6     | ')).collect
                          6     | distFile.map
                          4     | logData.filter
                          4     | ")).map
                          4     | sc.accumulator
                          4     | wc.saveAsTextFile
                          3     |
                          3     | teenagers.map
                          3     | org.apache.spark.sql.SQLContext
                          3     | ')).map
                          3     | 1)).reduceByKey
                          3     | p
                          3     | people.txt").map
                          2     | people.registerTempTable
                          2     | t
                          2     | }).count
                          2     | line.contains
                          2     | s.contains
                          2     | 1)).cache
                          2     | rdd.foreach
                          2     | reg.join
                          2     | Arrays.asList
                          2     | words.reduceByKey
                          2     | r
                          2     | 4)).foreach
                          2     | _).collect.foreach
                          2     | sc.broadcast
                          2     | w.reduceByKey
                          1     | ssc.awaitTermination
                          1     | parquetFile.registerTempTable
                          1     | Click
                          1     | .reduce
                          1     | sqlCtx.inferSchema
                          1     | SparkConf
                          1     | teenNames.collect
                          1     | 13).where
                          1     | %s".format
                          1     | KMeans.train
                          1     | 2).cache
                          1     | words.map
                          1     | peopleTable.registerTempTable
                          1     | lines.flatMap
                          1     | counts.collect
                          1     | people.saveAsParquetFile
                          1     | ssc.start
                          1     | ssc.socketTextStream
                          1     | counts.saveAsTextFile
                          1     | pairs.reduceByKey
                          1     | println
                          1     | org.apache.spark.sql.hive.HiveContext
                          1     | teenagers.collect
                          1     | wordCounts.print
                          1     | 19).select
                          1     | 10).collect
                          1     | distData.filter
                          1     | spark.parallelize
                          1     | value").collect
                          1     | model.predict
                          1     | people.where
                          1     | line.split
                          1     | }.reduce
                          1     | args
                          1     | c
                          1     | sqlContext.parquetFile
                          1     | java.text.SimpleDateFormat
                          1     | List
                          1     | spark.stop
                          1     | lines.map
                          1     | parts.map
                          1     | Register
                          1     | System.out.println
                          1     | sqlCtx.sql
                          1     | test_data.map
                          1     | g.triplets.filter

                          16.5.15

                          Scala for the Impatient

                          Scala for the Impatient

                          学习一种新的语言,需要学习下面的问题:

                          1. 程序的大致风格(过程语言,面向对象,函数语言,编译性,解释性)
                          2. 程序的整体结构(简单的),编译,与运行(典型的hello world)
                            • 应该可以分两种
                              • 最简单的程序可以是执行脚本,不需要定义类
                              • 复杂的程序则需要定义Object XXX以及main方法,和Java一致
                                • Scala程序最终可以编译成Java ByteCode并在JVM下执行
                          3. 类型,常量,与变量的定义,表达式
                            • val定义常量
                            • var定义变量
                            • 强类型系统,但可以不指定类型,由系统猜得
                              • 见Page7,1.2/1.3
                                • Byte
                                • Char
                                • Short
                                • Int
                                • Long
                                • Float
                                • Double
                                • String
                              • toString()
                              • to()
                              • RichXXX
                            • 表达式
                              • P7 1.4
                                • a + b vs. a.+(b)
                                • 1.to(10) vs 1 to 10
                              • no ++/-- but +=1/-=1
                                • 为什么没有++/--
                                  • 因为Int is immutable,所以其值不可变,这样就没法实现++/--了
                            • calling functions and methods
                              • p10 1.5
                            • import
                              • import math._
                                • _相当于Java中的*
                                • import math._ 相当于 import scala.math_
                            • static method vs. singleton objects
                              • Scala没有static method,但有一种叫singleton objects的东西
                            • Scala methods without parameters often don't use parentheses.
                            • .apply() method
                          4. 流程
                            • An if expression has a value
                            • A block has a value - the value of its last expression
                            • The Scala for loop like an "enhanced" Java for loop
                            • Semicolons are (mostly) optional
                            • The void type is Unit
                            • Avoid using return in a function
                            • Beware of missing = in a function definition
                            • Exceptions work just like in Java or C++, but you use a "pattern matching" syntax for catch.
                            • Scala has no checked exceptions.
                          5. 函数/对象定义与使用
                          6. 数组,以及其他标准数据结构(list, set, map)
                          7. 哪里获得在线帮助?
                            1. p12 1.7 scaladoc
                              1. http://www.scala-lang.org/api
                          8. 类库与大系统make


                          1. The Basics (A1)
                          1.1. The Scala Interpreter
                          1.2. Declaring Variables
                          1.3. Commonly Used Types
                          1.4. Arithmetic and Operator Overloading
                          1.6. The apply Method
                          1.7.Scaladoc
                          1.8. Exercises

                          2. Control Structures and Functions (A1)
                          2.1. Conditional Expressions
                          2.2. Statement Termination
                          2.3. Block Expressions and Assignments
                          2.4. Loops
                          2.6. _unctions
                          2.7. Derault and Yamed Arguments (L1)
                          2.8. Variable Arguments (L1)
                          2.9. Procedures
                          2.10. Lazy Values (L1)
                          2.11. Exceptions
                          2.12. Exercises

                          3. Arrays (A1)
                          3.1. _ixed Size Arrays
                          3.2. Variable Size Arrays: Array Burrers
                          3.3. Traversing Arrays (and Array Burrers)
                          3.4. Transrorming Arrays
                          3.6. Deciphering ScalaDoc
                          3.7. Multi-Dimensional Arrays
                          3.8. Interoperating with _ava
                          3.9. Exercises

                          4. Maps and Tuples (A1)
                          4.1. Constructing a Map
                          4.2. Accessing Map Values
                          4.3. Updating Map Values
                          4.4. Iterating Over Maps
                          4.6. Interoperating with _ava
                          4.7. Tuples
                          4.8. Zipping
                          4.9. Exercises

                          5. Classes (A1)
                          _.1. Simple Classes and Parameter-Less Methods
                          _.2. Properties with Getters and Setters
                          _.3. Properties with Only Getters
                          _.4. ObJect-Private _ields
                          _.6. Auxiliary Constructors
                          _.7. The Primary Constructor
                          _.8. Yested Classes (L1)
                          _.9. Exercises

                          6. Objects (A1)
                          6.1. Singletons6.2. Companion ObJects
                          6.3. ObJects Extending a Class or Trait
                          6.4. The apply Method
                          6.6. Enumerations
                          6.7. Exercises

                          7. Packages and Imports (A1)
                          7.1.Packages
                          7.2. Scope Rules
                          7.3. Chained Package Clauses
                          7.4. Top-or-_ile Yotation
                          7.6. Package Visibility
                          7.7. Imports
                          7.8. Imports Can Be Anywhere
                          7.9. Renaming and Hiding Members
                          7.10.ImplicitImports
                          7.11. Exercises

                          8. Inheritance(A1)
                          8.1. Extending a Class
                          8.2. Overriding Methods
                          8.3. Type Checks and Casts
                          8.4. Protected _ields and Methods
                          8.6. Overriding _ields
                          8.7. Anonymous Subclasses
                          8.8. Abstract Classes
                          8.9. Abstract _ields
                          8.10. Construction Order and Early Der_nitions (L3)
                          8.11. The Scala Inheritance Hierarchy
                          8.12. ObJect Equality (L1)
                          8.13. Exercises

                          9. Files and Regular Expressions (A1)
                          9.1. Reading Lines9.2. Reading Characters
                          9.3. Reading Tokens and Yumbers
                          9.4. Reading rrom URLs and Other Sources
                          9.6. Writing Text _iles
                          9.7. Visiting Directories
                          9.8.Serialization
                          9.9. Regular Expressions
                          9.10.Regular Expression Groups
                          9.11. Exercises

                          辛湜 Spark项目理念以及第项目未来的展望 - 高清在线观看 - 腾讯视频

                          辛湜 Spark项目理念以及第项目未来的展望 - 高清在线观看 - 腾讯视频

                          15.5.15

                          Pretty Printing with Gson in Groovy

                          #!/usr/bin/env groovy
                          @Grab('com.google.code.gson:gson:2.3.1')
                          
                          import com.google.gson.*;
                          
                          def gson = new GsonBuilder().setPrettyPrinting().create();
                          def jp = new JsonParser();
                          
                          System.in.eachLine { line ->
                              def je = jp.parse(line);
                              println(gson.toJson(je));
                          }
                          
                          

                          14.5.15

                          ITAS Workshop Goals


                          • open a Spark Shell
                            • 这个没什么,就是把Spark下载下来,解压,然后就可以打开Spark shell。有两个shell,一个是python的,另一个是scala的。
                          • develop Spark apps for typical use cases
                            • 一会看看有什么typical use cases。
                              • 在hadoop学习时,最常用的例子是word count,那么简单的基于频率的sum是其中一个例子,如果再推广,那么可独立的统计是一种应用
                              • 再参考数据库查询,各种条件查询,汇总应该也是一类
                          • tour of the Spark API
                            • 这个不错,可以当学习题纲
                          • explore data sets loaded from HDFS, etc.
                            • 注意到“loaded from HDFS”——比较好奇两者之间的关系。我估计spark应该能直接应用hdfs上的数据。然而怎样在cluster上用好整个cluster的运算能力,这是一个问题。如果是基于yarn建立的分布式运算,可以从底层进行认识,这个有意思,而且难度也不大。基本上是由Resource manager进行整体资源的管理和定位,再由本地的node manager进行具体的控制,而application master这部分我还是一知半解,需要进一步学习。
                            • Spark的特点是建立RDD,如果没有设置运算中的cache的话,所有的查询结果都会从最原始的数据开始,一步步的算过来。它比map/reduce有优势的地方是map/reduce的每一步都涉及到大量的i/o,而spark似乎只是在内存中进行计算 —— 它是否完全放弃disk?我有点怀疑。这个可以在后面的学习中了解到。
                          • review of Spark SQL, Spark Streaming, MLlib
                            • Spark SQL应该不难,死记一些常用的,和SQL 92对比学习就可以了。
                            • Spark Streaming一直看到但不知道是什么。Hadoop Streaming基本上是没什么意思的东西,其实就是把hdfs的数据通过流的样式发送给python等不能直接该问hadoop/hdfs的语言。希望Spark Streaming不是这样的东西。
                            • MLlib —— 我需要学习吧?
                          • follow-up courses and certification
                            • 嗯嗯
                          • developer community resources, events, etc.
                            • 有点意思,可以了解一下。
                          • return to workplace and demo use of Spark!
                            • 啊哈

                          Spark认证相关资料

                          Apache Spark Developer Certification Program

                          Exam Preparation


                          To prepare for the Spark certification exam, we recommend that you:
                          • Are comfortable coding the advanced exercises in Spark Camp or a related training (example exercises can be found here). Spark Camp and Spark Advanced Trainings are held at Strata+Hadoop Worldevents. Additional training opportunities are provided by Databricks.
                          • Watch Introduction to Apache Spark by Paco Nathan. In this video training, you’ll complete hands-on technical exercises, and get up to speed on how to use Spark for data exploration, analysis, and building big data applications in Python, Java, or Scala.
                          • Have mastered the material released so far in the O’Reilly book,Learning Spark.
                          • Have some hands-on experience developing Spark apps in production already.
                          • If you’re taking the online version of the exam, please read this information that addresses known issues you may experience when preparing to take the exam.
                          • If you are in China, ChinaHadoop delivers Spark classes in Chinese. Register for the online course.
                          The test will include questions in Scala, Python, Java, and SQL. However, deep proficiency in any of those languages is not required, since the questions focus on Spark and its model of computation.


                          大数据培训

                          小象学院 - 中国最专业的Hadoop,Spark大数据在线教育平台——权威课程:Hadoop培训,Spark培训,HBase培训,Hive培训,Mahout培训等

                          Jason4Zhu: VCore Configuration In Hadoop

                          Jason4Zhu: VCore Configuration In Hadoop

                          13.5.15

                          Apache YARN Scheduler


                          • The FIFO Scheduler
                            • Places applications in a queue and runs them in the order of submission.
                            • Requests for the first application in the queue are allocated first, then once its requests have been satisfied the next application in the queue is served, and so on.
                            • The good part:
                              • simple to understand
                              • not needing any configuration
                            • The bad side:
                              • not suitable for shared clusters.
                                • Large applications will use all the resources in a cluster
                                  • So each application has to wait its turn.
                              • On a shared cluster it is better to use the Capacity Scheduler or the Fair Scheduler.
                                • Both of these allow long-running jobs to complete in a timely manner,
                                  • while still allowing users who are running concurrent smaller ad hoc queries to get results back in a reasonable time.
                          • The Capacity Scheduler
                            • A separate dedicated queue allows the small job to start as soon as it is submitted,
                              • although this is at the cost of
                                • overall cluster utilization
                                  • since the queue capacity is reserved for jobs in that queue.
                                    • This means that the large job finishes later than when using the FIFO Scheduler.
                          • The Fair Scheduler
                            • There is no need to reserve a set amount of capacity since it will dynamically balance resources between all running jobs.
                            • When the second (small) job starts it is allocated half of the cluster resources so that each job is using its fair share of resources.
                          • Delay Scheduling

                          How YARN was designed to address the limitations in MRv1?

                          All from the Book: Hadoop: The Definitive Guide.
                          • Scalability
                            • MRv1:
                              • because the jobtracker has to manage both jobs and tasks, MRv1 hits scalability bottlenecks in the region of 4,000 nodes, and 40,000 tasks.
                            • YARN/MRv2 overcomes these limitations by virtue of its split resource manager/application master architecture, which means it is designed to scale up to 10,000 nodes, and 100,000 tasks.
                          • Availability
                            • With the jobtracker's responsibilities splits between the resource manager and application master in YARN, making the service highly-available became a divide-and-conquer problem: provide HA for the resource manager, then for YARN applications (on a per-application basis). And indeed Hadoop 2 supports HA for both the resource manager, and for the application master for MapReduce jobs, which is similar to my own product.
                          • Utilization
                            • In MRv1:
                              • each tasktracker is configured with a static allocation of fixed size "slots", which are divided into map slots and reduce slots at configuration time.
                              • A map slot can only be used to run a map task, and a reduce slot can only be used for a reduce task.
                            • In YARN/MRv2:
                              • a node manager manages a pool of resources, rather than a fixed number of designated slots. 
                              • MapReduce running on YARN will not hit the situation where a reduce task has to wait because only map slots are available on the cluster.
                              • If the resources to run the task are available, then the application will be eligible for them.
                              • Furthermore, resources in YARN are fine-grained, so an application can make a request for what it needs, rather than for an indivisible slot.
                          • Multitenancy

                          Apache YARN


                          • YARN:
                            • Yet Another Resource Negotiator
                          • YARN 
                            • is Hadoop's cluster resource management system.
                            • was introduced in Hadoop 2
                            • to improve the MapReduce implementation.
                            • but is general enough to support other distributed computing paradigms as well.
                          • YARN
                            • provides APIs for requesting and working with cluster resources.
                              • but these APIs are not typically used directly by user code.
                              • (Instead,) users write to higher-level APIs provided by distributed computing frameworks,
                                • which themselves are built on YARN and hide the resource management details from the user.

                          (There is a further layer of applications that build on the frameworks shown in the figure above. Pig, Hive and Crunch for examples)



                          Figures in Hadoop: The Definitive Guide

                          Figure 1-1. Structure of the book: there are various pathways through the book





                          Figure 2-1. MapReduce logical data flow


                          • MapReduce works by breaking the processing into two phases:
                            • the map phase and
                            • the reduce phase
                          • Each phase has key-value pairs as input and output,
                            • the types of which may be chosen by the programmer

                          Figure 2-2. Data-local (a), rack-local (b), and off-rack (c) map tasks


                          Figure 2-3. MapReduce data flow with a single reduce task



                          Figure 2-4. MapReduce data flow with multiple reduce tasks


                          Figure 2-5. MapReduce data flow with no reduce tasks


                          Figure 3-1. Accessing HDFS over HTTP directly and via a bank of HDFS proxies



                          Figure 3-2. A client reading data from HDFS


                          Figure 3-3. Network distance in Hadoop

                          Figure 3-4. A client writing data to HDFS

                          Figure 3-5. A typical replica pipeline


                          Figure 4-1. YARN applications



                          Figure 4-2. How YARN runs an application



                          Notice from Figure 4-2 that YARN itself does not provide any way for the parts of the
                          The difference between schedulers is illustrated in Figure 4-3, which shows that under
                          With the Capacity Scheduler (ii. in Figure 4-3), a separate dedicated queue allows the
                          With the Fair Scheduler (iii. in Figure 4-3) there is no need to reserve a set amount of

                          Figure 4-3. Cluster utilization over time when running a large job and a small job un‐
                          Figure 4-3 contrasts the basic operation of the Capacity Scheduler and the Fair Sched‐
                          As we saw in Figure 4-3, a single job does not use more resources than its queue’s
                          the same share of resources. We saw in Figure 4-3 how fair sharing works for applications
                          each with their own queue, see Figure 4-4. A starts a job and it is allocated all the re‐

                          Figure 4-4. Fair sharing between user queues
                          doop.io package. They form the class hierarchy shown in Figure 5-1.

                          Figure 5-1. Writable class hierarchy
                          A sequence file consists of a header followed by one or more records (see Figure 5-2).

                          Figure 5-2. The internal structure of a sequence file with no compression and with re‐
                          opportunity to take advantage of similarities between records. (See Figure 5-3.) Records

                          Figure 5-3. The internal structure of a sequence file with block compression
                          so on. This is shown diagrammatically in Figure 5-4.

                          Figure 5-4. Row-oriented versus column-oriented storage
                          ped. Consider a query of the table in Figure 5-4 that processes only column 2. With
                          A screenshot of the home page is shown in Figure 6-1. The “Cluster Metrics” section
                          Figure 6-1. Screenshot of the resource manager page
                          takes us to the job page, illustrated in Figure 6-2.

                          Figure 6-2. Screenshot of the job page
                          for all of the map tasks on one page. The screenshot in Figure 6-3 shows this page for

                          Figure 6-3. Screenshot of the tasks page
                          and allowed transitions between them are shown in Figure 6-4.
                          Figure 6-4. Transition diagram of an Oozie workflow
                          The whole process is illustrated in Figure 7-1. At the highest level, there are five inde‐
                          Figure 7-1. How Hadoop runs a MapReduce job
                          submitJobInternal() on it (step 1 in Figure 7-1). Having submitted the job, waitFor
                          Streaming runs special map and reduce tasks for the purpose of launching the usersupplied executable and communicating with it (Figure 7-2).

                          Figure 7-2. The relationship of the Streaming executable to the node manager and the
                          The process is illustrated in Figure 7-3.

                          Figure 7-3. How status updates are propagated through the MapReduce system
                          some presorting for efficiency reasons. Figure 7-4 shows what happens.
                          Figure 7-4. Shuffle and sort in MapReduce
                          for the final round. The process is illustrated in Figure 7-5.
                          Figure 7-5. Efficiently merging 40 file segments with a merge factor of 10
                          listed in Table 8-3 and shown in a diagram of the data flow path in Figure 8-1.

                          Figure 8-1. Where separators are used in a Streaming MapReduce job
                          as their data source (see Figure 8-2). It provides two things: a place to define which files

                          Figure 8-2. InputFormat class hierarchy
                          Figure 8-3 shows an example. A single file is broken into lines, and the line boundaries
                          Figure 8-3. Logical records and HDFS blocks for TextInputFormat
                          previous section. The OutputFormat class hierarchy appears in Figure 8-4.

                          Figure 8-4. OutputFormat class hierarchy
                          Figure 2-2.
                          of temperature buckets. For example, Figure 9-1 shows the distribution for buckets of

                          Figure 9-1. Temperature distribution for the weather dataset
                          inlined in each output row. This is illustrated in Figure 9-2.

                          Figure 9-2. Inner join of two datasets
                          illustrated in Figure 10-1. Typically there are 30 to 40 servers per rack (only three are

                          Figure 10-1. Typical two-level network architecture for a Hadoop cluster
                          For the network in Figure 10-1, the rack topology is described by two network locations,
                          tribution Center (KDC). The process is shown graphically in Figure 10-2.

                          Figure 10-2. The three-step Kerberos ticket exchange protocol
                          proceeds as follows (and is shown schematically in Figure 11-1 for the edit log and image

                          Figure 11-1. The checkpointing process
                          is illustrated in Figure 5-4.) This chapter looks at Parquet in more depth, but there are
                          chunk is written in pages; this is illustrated in Figure 13-1.
                          Figure 13-1. The internal structure of a Parquet file
                          system is illustrated in Figure 14-1.
                          Figure 14-1. Flume agent with a spooling directory source and a logger sink connected
                          just like in the previous examples. The flow is illustrated in Figure 14-2.
                          Figure 14-2. Flume agent with a spooling directory source and fanning out to an HDFS
                          writing them to HDFS, see Figure 14-3. Further tiers may be warranted for very large
                          Figure 14-3. Using a second agent tier to aggregate Flume events from the first tier
                          The system is illustrated in Figure 14-4.
                          Figure 14-4. Two Flume agents connected by an Avro sink-source pair
                          purposes, see Figure 14-5. If a second tier agent is unavailable, then events will be de‐

                          Figure 14-5. Using multiple sinks for load balancing or failover
                          A diagram of the whole system is show in Figure 14-6.
                          Figure 14-6. Load balancing between two agents
                          At a high level, Figure 15-1 demonstrates how Sqoop interacts with both the database
                          Figure 15-1. Sqoop’s import process
                          Figure 15-2. When scanning through rows to determine which rows match the criteria
                          rows, as in Figure 15-3. Accessing a large object often requires “opening” it through the
                          Figure 15-2. Database tables are typically physically represented as an array of rows,

                          Figure 15-3. Large objects are usually held in a separate area of storage; the main row
                          performs imports. (See Figure 15-4.) Before performing the export, Sqoop picks a strat‐
                          Figure 15-4. Exports are performed in parallel using MapReduce
                          Hive clients and Hive services is illustrated in Figure 17-1.

                          Figure 17-1. Hive architecture
                          (see Figure 17-2).
                          Figure 17-2. Metastore configurations
                          trated in Figure 17-3):

                          Figure 17-3. Data flow with partial results for a UDAF
                          The plan diagram generated from this pipeline is show in Figure 18-1.
                          Figure 18-1. Plan diagram for a Crunch pipeline for calculating a histogram of word
                          in Figure 19-1), which passes the call onto the scheduler that runs as a part of the driver

                          Figure 19-1. How Spark runs a job
                          forces a shuffle stage.5 The resulting DAG is illustrated in Figure 19-2.

                          Figure 19-2. The stages and RDDs in a Spark job for calculating a histogram of word
                          Going back to Figure 19-1, once the DAG scheduler has constructed the complete DAG
                          Assigned tasks are launched through a scheduler backend (step 4 in Figure 19-1), which
                          instance is constructed by the driver program (step 1 in Figure 19-3). The context sub‐
                          Figure 19-3. How Spark executors are started in YARN client mode
                          The spark-submit client will launch the YARN application (step 1 in Figure 19-4), but

                          Figure 19-4. How Spark executors are started in YARN cluster mode
                          is shown in Figure 20-1.

                          Figure 20-1. The HBase data model, illustrated for a table storing photos
                          regionserver workers (see Figure 20-2). The HBase master is responsible for bootstrap‐

                          Figure 20-2. HBase cluster members
                          with the name of the group members (servers). This is shown in Figure 21-1.

                          Figure 21-1. ZooKeeper znodes
                          but it has no control over this and cannot even know if this is the case.7 See Figure 21-2.
                          Figure 21-2. Reads are satisfied by followers, whereas writes are committed by the lead‐
                          Figure 21-3). You can query its state at any time by using the getState() method:

                          Figure 21-3. ZooKeeper state transitions
                          never reconnect (refer to Figure 21-3). We simply rethrow the exception10 and let the
                          Figure 22-1.
                          Figure 22-1. Operational data flow
                          Figure 22-2 shows how components can be wired into one another in novel ways, with
                          Figure 22-2. Composable datasets and functions
                          eration”) sequencing technology; see Figure 22-3. Sequencing machines are scientific
                          Figure 22-3. Timeline of Big Data Technology and Cost of Sequencing a Genome
                          see Figure 22-4. This deterministic pairing enables a “copying mechanism”: the DNA
                          Figure 22-4. DNA Double Helix Structure
                          the reference genome; see Figure 22-512. A complete human genome is about 3 billion

                          Figure 22-5. Aligning Reads to a Reference Genome14
                          map, and then to the next reduce, and so on (Figure 22-6). That is, key-value pairs are
                          Figure 22-6. Counting and sorting in MapReduce
                          pipes (Figure 22-7).

                          Figure 22-7. Pipes linked by fields and tuples
                          (Figure 22-8):

                          Figure 22-8. Pipe types
                          streams (sinks). See Figure 22-9.
                          Figure 22-9. A simple PipeAssembly
                          operations that are applied either to individual tuples or groups of tuples (Figure 22-10):

                          Figure 22-10. Operation types
                          Operations are bound to pipes when the pipe assembly is created (Figure 22-11).
                          Figure 22-11. An assembly of operations
                          (Figure 22-12). That is, the same pipe assembly can be “instantiated” many times into

                          Figure 22-12. A Flow
                          between them (Figure 22-13).
                          Figure 22-13. How a Flow translates to chained MapReduce jobs
                          In this overview, we will focus on the “log processing pipeline” (Figure 22-14). This

                          Figure 22-14. The ShareThis log processing pipeline
                          just source and sink Taps, trap Taps were planned in (Figure 22-15). Normally, when

                          Figure 22-15. The ShareThis log processing Flow

                          Interfaces

                          FSDataInputStream, Seekable, PositionedReadable


                          • The open() method of FileSystem returns a FSDataInputStream
                            • rather than a standard java.io class
                          • FSDataInputStream is a specialization of java.io.DataInputStream
                            • with support of random access
                            • so you can read from any part of the stream
                              • Those two interface help for this purpose.

                          FSDataOutputStream, Progressable

                          Glob patterns and PathFilter

                          • Hadoop supports the same set of glob characters as Unix bash
                          • When glob patterns are not powerful enough to describe a set of files you want to access, you can use PathFilter.
                          public FileStatus[] globStatus(Path pathPattern) throws IOException
                          public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws
                            IOException
                          
                          Here is the interface of PathFilter
                            package org.apache.hadoop.fs;
                          
                            public interface PathFilter {
                              boolean accept(Path path);
                            }
                          


                          Writable


                          package org.apache.hadoop.io;
                          
                          import java.io.DataOutput;
                          import java.io.DataInput;
                          import java.io.IOException;
                          
                          public interface Writable {
                            void write(DataOutput out) throws IOException;
                            void readFields(DataInput in) throws IOException;
                          }
                          

                          The Writable interface defines two methods:

                          • One for writing its state to a DataOutput binary stream and
                          • One for reading its state from a DataInput binary stream

                              public static void main(String[] args) throws Exception {
                                  IntWritable iw = new IntWritable(1024);
                                  
                                  ByteArrayOutputStream baos = new  ByteArrayOutputStream();
                                  DataOutputStream dos = new DataOutputStream(baos);
                                  iw.write(dos);
                                  dos.flush();
                                  byte[] data = baos.toByteArray();
                                  
                                  System.out.println(StringUtils.byteToHexString(data));
                                  
                                  ByteArrayInputStream bais = new ByteArrayInputStream(data);
                                  DataInputStream dis = new DataInputStream(bais);
                                  IntWritable iw2 = new IntWritable();
                                  iw2.readFields(dis);
                                  System.out.println(iw2.get());
                              }
                          

                          WritableComparable and comparators


                          • IntWritable implements the WritableComparable interface, which is just a subinterface of the Writable and java.lang.Comparable interface:
                          package org.apache.hadoop.io;
                          
                          public interface WritableComparable extends Writable, Comparable {
                          }
                          


                          • Comparison of types is crucial for MapReduce
                            • where there is a sorting phase during which keys are compared with one another.
                          • RawComparator is an optimization that Hadoop provides
                            • extension of Java's Comparator
                            • allows implementors to compare records read from a stream without deserializing them into objects
                              • Using big endian may help this also.
                          • WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes.
                            • It provides two main functions.
                              • A default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method.
                              • acts as a factory for RawComparator instances
                          RawComparator comparator = WritableComparator.get(IntWritable.class);
                          
                          IntWritable w1 = new IntWritable(163);
                          IntWritable w2 = new IntWritable(67);
                          assertThat(comparator.compare(w1, w2), greaterThan(0));
                          

                          GenericOptionsParser, Tool interface and ToolRunner

                          • GenericOptionsParser is a class that interprets common Hadoop command-line options and sets them on a Configuration object for your applicatio to use as desired.
                          • You don't usually use GenericOptionsParser directly, as it's more convenient to implement the Tool interface and run your application with the ToolRunner
                            • which uses GenericOptionsParser internally
                          • You can have your App class derives from Configured, which is an implementation of the Configurable interface.
                            • All implementation of Tool need to implement Configurable
                            • and subclassing Configured is often the easiest way to achieve this.
                          • ToolRunner.run() method takes care of creating a Configuration object for the Tool before calling its run() method.
                          • ToolRunner also uses a GenericOptionsParser to pick up any standard options specified on the command line and to set them on the Configuration instance. 
                            • -conf <conf file>
                          • GenericOptionsParser also allows you to set individual properties.
                            • hadoop ConfigurationPrinter -D color=yellow | grep color
                            • The -D option is used to set the configuration property with key color to the value yellow.
                            • Options specified with -D take priority over properties from the configuration files.

                          InputSampler, Sampler



                          • The InputSampler class defines a nested Sampler interface whose implementations return a sample of keys given an InputFormat and Job
                          • This interface usually is not called directly by clients. Instead, the writePartitionFile()  static method on InputSampler is used, which creates a sequence file to store the keys that define the partitions.
                          • The sequence file is used by TotalOrderPartitioner to create partitions for the sort job.