20.5.15

关于Hadoop和Cassandra性能问题的讨论 - ImportNew


18.5.15

Errors: CDA - 20150518


  1. For the default threefold replication, Hadoop’s rack placement policy is to write the first copy of a block on a node in one rack, then the other two copies on two nodes in a different rack. Since the first copy is written to rack2, the other two will either be written to two nodes on rack1, or two nodes on rack3.
  2. Apache HBase provides random, realtime read/write access to your data. HDFS does not allow random writes. HDFS is built for scalability, fault tolerance, and batch processing.
  3. Each slave node in a cluster configured to run MapReduce v2 (MRv2) on YARN typically runs a DataNode daemon (for HDFS functions) and NodeManager daemon (for YARN functions). The NodeManager handles communication with the ResourceManager, oversees application container lifecycles, monitors CPU and memory resource use of the containers, tracks the node health, and handles log management. It also makes available a number of auxiliary services to YARN applications. Further Reading
    • Configure YARN Daemons
  4. When the first NameNode is started, it connects to ZooKeeper and registers itself as the Active NameNode. The next NameNode then sees that information and sets itself up in Standby mode (in fact, the ZooKeeper Failover Controller is the software responsible for the actual communication with ZooKeeper). Clients never connect to ZooKeeper to discover anything about the NameNodes. In an HDFS HA scenario, ZooKeeper is not used to keep track of filesystem changes. That is the job of the Quorum Journal Manager daemons.
    • You decide to create a cluster which runs HDFS in High Availability mode with automatic failover, using Quorum-based Storage. Which service keeps track of which NameNode is active at any given moment?
  5. Which three describe functions of the ResourceManager in YARN?
    • Tracking heartbeats from the NodeManagers
    •  Running a scheduler to determine how resources are allocated
    •  Monitoring the status of the ApplicationMaster container and restarting on failure
    • Explanation:
      • The ResourceManager has two main components: Scheduler and ApplicationsManager.
      • The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc.
      • The ResourceManger tracks heartbeats from the NodeManagers to determine available resources then schedules those resources based on the scheduler specific configuration.
      • The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
      • The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress. Depending on the type of application, this may include monitoring the map and reduce tasks progress, restarting tasks, and archiving job history and meta-data.
      • The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.
      • http://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-site/YARN.html
  6. You are running a Hadoop cluster with a NameNode on host hadoopNN01, a Secondary NameNode on host hadoopNN02 and several DataNodes.
    • How can you determine when the last checkpoint happened?
      • Connect to the web UI of the Secondary NameNode (http://hadoopNN02:50090/) and look at the “Last Checkpoint” information.
    • Explanation
      • The Secondary NameNode’s Web UI contains information on when it last performed its checkpoint operation. This is not displayed via the NameNode’s Web UI, and is not available via the hdfs dfsadmin command. 
  7. A specific node in your cluster appears to be running slower than other nodes with the same hardware configuration. You suspect a RAM failure in the system. Which commands may be used to the view the memory seen in the system?
    • free
    • top
    • dmidecode
    • Explanation
      • dmidecode shows bios information on a running system. The amount of installed RAM and size of the modules in each slot can be found in the output.
      • Additionally memory and swap usage can be viewed with cat /proc/meminfo or vmstat. 
  8. You are running a Hadoop cluster with a NameNode on host mynamenode. What are two ways you can determine available HDFS space in your cluster?
      • Run hdfs dfsadmin -report and locate the DFS Remaining value.
        • Connect to http://mynamenode:50070/ and locate the DFS Remaining value.
        • Explanation:

      1. When a client wishes to read a file from HDFS, it contacts the NameNode and requests the locations and names of the first few blocks in the file. It then directly contacts the DataNodes containing those blocks to read the data. It would be very wasteful to move blocks around the cluster based on a client’s read request, so that is never done. Similarly, if all data was passed via the NameNode, the NameNode would immediately become a serious bottleneck and would slow down the cluster operation dramatically.
      2. As well as the block itself, a separate file is written containing checksums for the data in the file. These checksums are used when the data is being written, to ensure that no data corruption has occurred. No metadata regarding the name of the file of which the block is a part, or information about that file, is stored on the DataNode.
        • Hadoop: The Definitive Guide, 3rd Edition, Chapter 4, under the section “Data Integrity in HDFS.”
      3. The NameNode needs to know which DataNodes hold each HDFS block. How is that block location information managed?
        • The NameNode stores the block locations in RAM. They are never stored on disk.
        • Explanation
          • The NameNode never stores the HDFS block locations on disk; it only stores the names of the blocks associated with each file. After the NameNode starts up, each DataNode heartbeats in and sends its block report, which lists all the blocks it holds. The NameNode keeps that information in RAM.
          • Hadoop The Definitive Guide, Chapter 3, under the section "Namenodes and Datanodes"
      4. What are the permissions of a file in HDFS with the following: rw-rw-r-x?
        • No one can modify the contents of the file.
        • Ex:
          • The permissions show that the file can be read from and written to (appended to) by the owner and anyone in the owner's group, and read from by anyone else (it is 'world readable'). The execute permission on a file in HDFS is ignored. 
          • That the file's existing contents cannot be modified by anyone because HDFS is a write-once filesystem. Once a file has been written, its contents cannot be changed.
            • See the Overview section of the docs for a discussion of capabilities encapsulated by each permission mode.  
      5. Hadoop performs best when disks on slave nodes are configured as JBOD (Just a Bunch Of Disks). There is no need for RAID on individual nodes -- Hadoop replicates each block three times on different nodes for reliability. A single Linux LVM will not perform as well as having the disks configured as separate volumes and, importantly, the failure of one disk in an LVM volume would result in the loss of all data on that volume, whereas the failure of a single disk if all were configured as separate volumes would only result in the loss of data on that one disk (and, of course, each block on that disk is replicated on two other nodes in the cluster).
      6. MRv1: $ hadoop fsck / 
          • The number of DataNodes in the cluster
            •  The number of under-replicated blocks in the cluster
              • In its standard form, hadoop fsck / will return information about the cluster including the number of DataNodes and the number of under-replicated blocks. 
              • To view a list of all the files in the cluster, the command would be hadoop fsck / -files
              • To view a list of all the blocks, and the locations of the blocks, the command would be hadoop fsck / -files -blocks -locations
              • Additionally, the -racks option would display the rack topology information for each block. 
              • Further Reading
              • Hadoop Operations: A Guide for Developers and Administrators, Chapter 8, under the heading “Checking Filesystem Integrity with fsck” Hadoop: The Definitive Guide, 3rd Edition, Chapter 10, under the heading “Tools”
          1. On a cluster which is NOT running HDFS High Availability, which four pieces of information does the NameNode store on disk?
            • Names of the files in HDFS
            • The directory structure of the files in HDFS
            • An edit log of changes that have been made since the last snapshot compaction by the Secondary NameNode
            • File permissions of the files in HDFS
              • The NameNode holds its metadata in RAM for fast access. However, it also needs to persist that information to disk. The initial metadata on disk is stored in a file called fsimage. Metadata in fsimage includes the names of all the files in HDFS, their locations (the directory structure), and file permissions. Whenever a change is made to the metadata (such as a new file being created, or a file being deleted), that information is written to a file on disk called edits. Periodically, the Secondary NameNode coalesces the edits and fsimage files and writes the result back as a new fsimage file, at which point the NameNode can delete its old edits file. 
              • The NameNode has no knowledge of when it was last backed up. Heartbeat information from the DataNodes is held in RAM but is not persisted to disk.
          2. Block size is a client-side parameter; the value used will be whatever value is present on the machine creating the file. Setting the value on the NameNode has no effect unless a process on the NameNode is acting as a client and writing a file to HDFS. 
          3. What is the basis of resource allocation using the Fair Scheduler in YARN?
            • Resources are allocated in terms of minimum and maximum memory and cpu usage per queue.
              • In a YARN cluster using the Fair Scheduler, resources are allocated based on the queue usage of either memory or a combination of memory and cpu. To be "fair" the queue using the least resources (and having demand for more resources) receives the next allocation of available resources. The allocations file is used to specify the minimum and maximum resources allocated to a queue.
              • For more information see http://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
              • "By default, the Fair Scheduler bases scheduling fairness decisions only on memory. It can be configured to schedule with both memory and CPU, using the notion of Dominant Resource Fairness developed by Ghodsi et al. When there is a single app running, that app uses the entire cluster. When other apps are submitted, resources that free up are assigned to the new apps, so that each app eventually on gets roughly the same amount of resources."
              • "The scheduler organizes apps further into "queues", and shares resources fairly between these queues. By default, all users share a single queue, named “default”. If an app specifically lists a queue in a container resource request, the request is submitted to that queue. It is also possible to assign queues based on the user name included with the request through configuration. Within each queue, a scheduling policy is used to share resources between the running apps. The default is memory-based fair sharing, but FIFO and multi-resource with Dominant Resource Fairness can also be configured. Queues can be arranged in a hierarchy to divide resources and configured with weights to share the cluster in specific proportions."
              • Also review the architecture of YARN at http://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-site/YARN.html
              • "The ResourceManager has two main components: Scheduler and ApplicationsManager."
              • "The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc." 
              • "The Scheduler has a pluggable policy plug-in, which is responsible for partitioning the cluster resources among the various queues, applications etc. The current Map-Reduce schedulers such as the CapacityScheduler and the FairScheduler would be some examples of the plug-in.The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler."

          Example list in Learning Spark

          Example 2-2. Scala line count
          Example 2-3. Examining the sc variable
          Example 2-4. Python filtering example
          Example 2-5. Scala filtering example
          Example 2-6. Running a Python script
          Example 2-7. Initializing Spark in Python
          Example 2-9. Initializing Spark in Java
          Example 2-10. Word count Java application?don?t worry about the details yet
          Example 2-11. Word count Scala application?don?t worry about the details yet
          Example 2-13. Maven build file
          Example 2-14. Scala build and run
          Example 2-15. Maven build and run
          Example 3-1.
          Example 3-1. Creating an RDD of strings with textFile() in Python
          Example 3-2. Calling the filter() transformation
          Example 3-3. Calling the first() action
          Example 3-4. Persisting an RDD in memory
          Example 3-5. parallelize() method in Python
          Example 3-6. parallelize() method in Scala
          Example 3-7. parallelize() method in Java
          Example 3-8. textFile() method in Python
          Example 3-9. textFile() method in Scala
          Example 3-10. textFile() method in Java
          Example 3-11. filter() transformation in Python
          Example 3-12. filter() transformation in Scala
          Example 3-13. filter() transformation in Java
          Example 3-16. Scala error count using actions
          Example 3-17. Java error count using actions
          Example 3-18. Passing functions in Python
          Example 3-19. Passing a function with field references (don?t do this!)
          Example 3-20. Python function passing without field references
          Example 3-22. Java function passing with anonymous inner class
          Example 3-23. Java function passing with named class
          Example 3-24.
          Example 3-24. Java function class with parameters
          Example 3-25. Java function passing with lambda expression in Java 8
          Example 3-26. Python squaring the values in an RDD
          Example 3-27. Scala squaring the values in an RDD
          Example 3-28. Java squaring the values in an RDD
          Example 3-29. flatMap() in Python, splitting lines into words
          Example 3-30. flatMap() in Scala, splitting lines into multiple words
          Example 3-33. reduce() in Scala
          Example 3-34. reduce() in Java
          Example 3-36. aggregate() in Scala
          Example 3-37. aggregate() in Java
          Example 3-38. Creating DoubleRDD in Java
          Example 3-39. Double execution in Scala
          Example 3-40. persist() in Scala
          Example 4-1. Creating a pair RDD using the first word as the key in Python
          Example 4-2. Creating a pair RDD using the first word as the key in Scala
          Example 4-3. Creating a pair RDD using the first word as the key in Java
          Example 4-4. Simple filter on second element in Python
          Example 4-5. Simple filter on second element in Scala
          Example 4-7. Per-key average with reduceByKey() and mapValues() in Python
          Example 4-8. Per-key average with reduceByKey() and mapValues() in Scala
          Example 4-9. Word count in Python
          Example 4-10. Word count in Scala
          Example 4-11. Word count in Java
          Example 4-12. Per-key average using combineByKey() in Python
          Example 4-13. Per-key average using combineByKey() in Scala
          Example 4-14. Per-key average using combineByKey() in Java
          Example 4-16. reduceByKey() with custom parallelism in Scala
          Example 4-17. Scala shell inner join
          Example 4-18. leftOuterJoin() and rightOuterJoin()
          Example 4-19. Custom sort order in Python, sorting integers as if strings
          Example 4-20. Custom sort order in Scala, sorting integers as if strings
          Example 4-21. Custom sort order in Java, sorting integers as if strings
          Example 4-22.
          Example 4-22. Scala simple application
          Example 4-23.
          Example 4-25. Scala PageRank
          Example 4-26. Scala custom partitioner
          Example 4-27. Python custom partitioner
          Example 5-1. Loading a text file in Python
          Example 5-2. Loading a text file in Scala
          Example 5-3. Loading a text file in Java
          Example 5-4. Average value per file in Scala
          Example 5-5. Saving as a text file in Python
          Example 5-7. Loading JSON in Scala
          Example 5-8. Loading JSON in Java
          Example 5-9. Saving JSON in Python
          Example 5-10. Saving JSON in Scala
          Example 5-11. Saving JSON in Java
          Example 5-12. Loading CSV with textFile() in Python
          Example 5-13. Loading CSV with textFile() in Scala
          Example 5-14. Loading CSV with textFile() in Java
          Example 5-15. Loading CSV in full in Python
          Example 5-17. Loading CSV in full in Java
          Example 5-18. Writing CSV in Python
          Example 5-19. Writing CSV in Scala
          Example 5-21. Loading a SequenceFile in Scala
          Example 5-22. Loading a SequenceFile in Java
          Example 5-23.
          Example 5-23. Saving a SequenceFile in Scala
          Example 5-24. Loading KeyValueTextInputFormat() with old-style API in Scala
          Example 5-25. Loading LZO-compressed JSON with Elephant Bird in Scala
          Example 5-26. Saving a SequenceFile in Java
          Example 5-27. Sample protocol buffer definition
          Example 5-28. Elephant Bird protocol buffer writeout in Scala
          Example 5-29. Loading a compressed text file from the local filesystem in Scala
          Example 5-30. Creating a HiveContext and selecting data in Python
          Example 5-31. Creating a HiveContext and selecting data in Scala
          Example 5-32. Creating a HiveContext and selecting data in Java
          Example 5-33. Sample tweets in JSON
          Example 5-34. JSON loading with Spark SQL in Python
          Example 5-35. JSON loading with Spark SQL in Scala
          Example 5-36. JSON loading with Spark SQL in Java
          Example 5-37. JdbcRDD in Scala
          Example 5-38. sbt requirements for Cassandra connector
          Example 5-39. Maven requirements for Cassandra connector
          Example 5-40. Setting the Cassandra property in Scala
          Example 5-42. Loading the entire table as an RDD with key/value data in Scala
          Example 5-43. Loading the entire table as an RDD with key/value data in Java
          Example 5-45. Scala example of reading from HBase
          Example 5-46. Elasticsearch output in Scala
          Example 5-47. Elasticsearch input in Scala
          Example 6-1. Sample call log entry in JSON, with some fields removed
          Example 6-2. Accumulator empty line count in Python
          Example 6-3. Accumulator empty line count in Scala
          Example 6-4. Accumulator empty line count in Java
          Example 6-5. Accumulator error count in Python
          Example 6-6. Country lookup in Python
          Example 6-7. Country lookup with Broadcast values in Python
          Example 6-8. Country lookup with Broadcast values in Scala
          Example 6-10. Shared connection pool in Python
          Example 6-11. Shared connection pool and JSON parser in Scala
          Example 6-12. Shared connection pool and JSON parser in Java
          Example 6-13. Average without mapPartitions() in Python
          Example 6-14. Average with mapPartitions() in Python
          Example 6-15. R distance program
          Example 6-16. Driver program using pipe() to call finddistance.R in Python
          Example 6-17. Driver program using pipe() to call finddistance.R in Scala
          Example 6-18. Driver program using pipe() to call finddistance.R in Java
          Example 6-20. Removing outliers in Scala
          Example 6-21. Removing outliers in Java
          Example 7-1. Submitting a Python application
          Example 7-2. Submitting an application with extra arguments
          Example 7-3. General format for spark-submit
          Example 7-4. Using spark-submit with various options
          Example 7-5. pom.xml file for a Spark application built with Maven
          Example 7-6. Packaging a Spark application built with Maven
          Example 7-8. Adding the assembly plug-in to an sbt project build
          Example 8-1. Creating an application using a SparkConf in Python
          Example 8-3. Creating an application using a SparkConf in Java
          Example 8-5. Setting configuration values at runtime using a defaults file
          Example 8-6. input.txt, the source file for our example
          Example 8-7. Processing text data in the Scala Spark shell
          Example 8-9. Collecting an RDD
          Example 8-10. Computing an already cached RDD
          Example 8-11. Coalescing a large RDD in the PySpark shell
          Example 8-12. Registering a class allows Kryo to avoid writing full class names with
          Example 9-1. Maven coordinates for Spark SQL with Hive support
          Example 9-2. Scala SQL imports
          Example 9-3. Scala SQL implicits
          Example 9-4. Java SQL imports
          Example 9-5. Python SQL imports
          Example 9-6. Constructing a SQL context in Scala
          Example 9-7. Constructing a SQL context in Java
          Example 9-8. Constructing a SQL context in Python
          Example 9-9. Loading and quering tweets in Scala
          Example 9-10. Loading and quering tweets in Java
          Example 9-11. Loading and quering tweets in Python
          Example 9-12. Accessing the text column (also first column) in the topTweets
          Example 9-13. Accessing the text column (also first column) in the topTweets
          Example 9-14. Accessing the text column in the topTweets DataFrame in Python
          Example 9-15. Hive load in Python
          Example 9-16. Hive load in Scala
          Example 9-17. Hive load in Java
          Example 9-18. Parquet load in Python
          Example 9-20. Parquet file save in Python
          Example 9-21. Input records
          Example 9-22. Loading JSON with Spark SQL in Python
          Example 9-23. Loading JSON with Spark SQL in Scala
          Example 9-25. Resulting schema from printSchema()
          Example 9-26. Partial schema of tweets
          Example 9-27.
          Example 9-27. SQL query nested and array elements
          Example 9-28. Creating a DataFrame using Row and named tuple in Python
          Example 9-29. Creating a DataFrame from case class in Scala
          Example 9-30. Creating a DataFrame from a JavaBean in Java
          Example 9-32. Connecting to the JDBC server with Beeline
          Example 9-33. Load table
          Example 9-35. Spark SQL shell EXPLAIN
          Example 9-36. Python string length UDF
          Example 9-37. Scala string length UDF
          Example 9-38. Java UDF imports
          Example 9-40. Spark SQL multiple sums
          Example 9-41. Beeline command for enabling codegen
          Example 10-1. Maven coordinates for Spark Streaming
          Example 10-2. Scala streaming imports
          Example 10-3. Java streaming imports
          Example 10-4. Streaming filter for printing lines containing ?error? in Scala
          Example 10-5. Streaming filter for printing lines containing ?error? in Java
          Example 10-6. Streaming filter for printing lines containing ?error? in Scala
          Example 10-7. Streaming filter for printing lines containing ?error? in Java
          Example 10-9.
          Example 10-9. Log output from running Example 10-8
          Example 10-10. map() and reduceByKey() on DStream in Scala
          Example 10-11. map() and reduceByKey() on DStream in Java
          Example 10-12. Joining two DStreams in Scala
          Example 10-14. transform() on a DStream in Scala
          Example 10-15. transform() on a DStream in Java
          Example 10-16. Setting up checkpointing
          Example 10-17. How to use window() to count data over a window in Scala
          Example 10-18. How to use window() to count data over a window in Java
          Example 10-19. Scala visit counts per IP address
          Example 10-20. Java visit counts per IP address
          Example 10-21. Windowed count operations in Scala
          Example 10-22. Windowed count operations in Java
          Example 10-23. Running count of response codes using updateStateByKey() in Scala
          Example 10-24. Running count of response codes using updateStateByKey() in Java
          Example 10-25. Saving DStream to text files in Scala
          Example 10-26. Saving SequenceFiles from a DStream in Scala
          Example 10-27. Saving SequenceFiles from a DStream in Java
          Example 10-28. Saving data to external systems with foreachRDD() in Scala
          Example 10-29. Streaming text files written to a directory in Scala
          Example 10-30. Streaming text files written to a directory in Java
          Example 10-31. Streaming SequenceFiles written to a directory in Scala
          Example 10-32. Apache Kafka subscribing to Panda?s topic in Scala
          Example 10-33. Apache Kafka subscribing to Panda?s topic in Java
          Example 10-34. Apache Kafka directly reading Panda?s topic in Scala
          Example 10-35. Apache Kafka directly reading Panda?s topic in Java
          Example 10-36. Flume configuration for Avro sink
          Example 10-37. FlumeUtils agent in Scala
          Example 10-38. FlumeUtils agent in Java
          Example 10-39.
          Example 10-39. Maven coordinates for Flume sink
          Example 10-40. Flume configuration for custom sink
          Example 10-41. FlumeUtils custom sink in Scala
          Example 10-43. SparkFlumeEvent in Scala
          Example 10-45. Setting up a driver that can recover from failure in Scala
          Example 10-46. Setting up a driver that can recover from failure in Java
          Example 10-47.
          Example 10-47. Launching a driver in supervise mode
          Example 10-48. Enable the Concurrent Mark-Sweep GC
          Example 11-1. Spam classifier in Python
          Example 11-2. Spam classifier in Scala
          Example 11-3. Spam classifier in Java
          Example 11-4. Creating vectors in Python
          Example 11-5. Creating vectors in Scala
          Example 11-6. Creating vectors in Java
          Example 11-7. Using HashingTF in Python
          Example 11-8. Using TF-IDF in Python
          Example 11-9. Scaling vectors in Python
          Example 11-10. Linear regression in Python
          Example 11-11. Linear regression in Scala
          Example 11-12. Linear regression in Java
          Example 11-13. PCA in Scala
          Example 11-14. SVD in Scala
          Example 11-15. Pipeline API version of spam classification in Scala


          Various ways to run Scala code | ajduke's blog

          Various ways to run Scala code | ajduke's blog

          TOC of Learning Spark

          Preface | 5
          Audience | 5
          How This Book is Organized | 6
          Supporting Books | 6
          Code Examples | 7
          Early Release Status and Feedback | 7

          Chapter 1. Introduction to Data Analysis with Spark | 8
          What is Apache Spark? | 8
          A Unified Stack | 8
          Who Uses Spark, and For What? | 11
          A Brief History of Spark | 13
          Spark Versions and Releases | 13
          Spark and Hadoop | 14

          Chapter 2. Downloading and Getting Started | 15
          Downloading Spark | 15
          Introduction to Spark?s Python and Scala Shells | 16
          Introduction to Core Spark Concepts | 20
          Standalone Applications | 23
          Conclusion | 25

          Chapter 3. Programming with RDDs | 26
          RDD Basics | 26
          Creating RDDs | 28
          RDD Operations | 28
          Passing Functions to Spark | 32
          Common Transformations and Actions | 36
          Persistence (Caching) | 46
          Conclusion | 48

          Chapter 4. Working with Key-Value Pairs | 49
          Motivation | 49
          Creating Pair RDDs | 49
          Transformations on Pair RDDs | 50
          Actions Available on Pair RDDs | 60
          Data Partitioning | 61
          Conclusion | 70

          Chapter 5. Loading and Saving Your Data | 71
          Motivation | 71
          Choosing a Format | 71
          Formats | 72
          File Systems | 88
          Compression | 89
          Databases | 91
          Conclusion | 93

          About the Authors | 95

          17.5.15

          Intro to Apache Spark函数与方法的词频统计

          39    | sc.textFile
          25    | messages.filter
          13    | lines.filter
          13    | errors.map
          13    | messages.cache
          11    | sc.parallelize
          10    | l.split
          9     | x.split
          8     | distFile.flatMap
          7     | f.flatMap
          6     | ")).collect
          6     | ')).collect
          6     | distFile.map
          4     | logData.filter
          4     | ")).map
          4     | sc.accumulator
          4     | wc.saveAsTextFile
          3     |
          3     | teenagers.map
          3     | org.apache.spark.sql.SQLContext
          3     | ')).map
          3     | 1)).reduceByKey
          3     | p
          3     | people.txt").map
          2     | people.registerTempTable
          2     | t
          2     | }).count
          2     | line.contains
          2     | s.contains
          2     | 1)).cache
          2     | rdd.foreach
          2     | reg.join
          2     | Arrays.asList
          2     | words.reduceByKey
          2     | r
          2     | 4)).foreach
          2     | _).collect.foreach
          2     | sc.broadcast
          2     | w.reduceByKey
          1     | ssc.awaitTermination
          1     | parquetFile.registerTempTable
          1     | Click
          1     | .reduce
          1     | sqlCtx.inferSchema
          1     | SparkConf
          1     | teenNames.collect
          1     | 13).where
          1     | %s".format
          1     | KMeans.train
          1     | 2).cache
          1     | words.map
          1     | peopleTable.registerTempTable
          1     | lines.flatMap
          1     | counts.collect
          1     | people.saveAsParquetFile
          1     | ssc.start
          1     | ssc.socketTextStream
          1     | counts.saveAsTextFile
          1     | pairs.reduceByKey
          1     | println
          1     | org.apache.spark.sql.hive.HiveContext
          1     | teenagers.collect
          1     | wordCounts.print
          1     | 19).select
          1     | 10).collect
          1     | distData.filter
          1     | spark.parallelize
          1     | value").collect
          1     | model.predict
          1     | people.where
          1     | line.split
          1     | }.reduce
          1     | args
          1     | c
          1     | sqlContext.parquetFile
          1     | java.text.SimpleDateFormat
          1     | List
          1     | spark.stop
          1     | lines.map
          1     | parts.map
          1     | Register
          1     | System.out.println
          1     | sqlCtx.sql
          1     | test_data.map
          1     | g.triplets.filter

          16.5.15

          Scala for the Impatient

          Scala for the Impatient

          学习一种新的语言,需要学习下面的问题:

          1. 程序的大致风格(过程语言,面向对象,函数语言,编译性,解释性)
          2. 程序的整体结构(简单的),编译,与运行(典型的hello world)
            • 应该可以分两种
              • 最简单的程序可以是执行脚本,不需要定义类
              • 复杂的程序则需要定义Object XXX以及main方法,和Java一致
                • Scala程序最终可以编译成Java ByteCode并在JVM下执行
          3. 类型,常量,与变量的定义,表达式
            • val定义常量
            • var定义变量
            • 强类型系统,但可以不指定类型,由系统猜得
              • 见Page7,1.2/1.3
                • Byte
                • Char
                • Short
                • Int
                • Long
                • Float
                • Double
                • String
              • toString()
              • to()
              • RichXXX
            • 表达式
              • P7 1.4
                • a + b vs. a.+(b)
                • 1.to(10) vs 1 to 10
              • no ++/-- but +=1/-=1
                • 为什么没有++/--
                  • 因为Int is immutable,所以其值不可变,这样就没法实现++/--了
            • calling functions and methods
              • p10 1.5
            • import
              • import math._
                • _相当于Java中的*
                • import math._ 相当于 import scala.math_
            • static method vs. singleton objects
              • Scala没有static method,但有一种叫singleton objects的东西
            • Scala methods without parameters often don't use parentheses.
            • .apply() method
          4. 流程
            • An if expression has a value
            • A block has a value - the value of its last expression
            • The Scala for loop like an "enhanced" Java for loop
            • Semicolons are (mostly) optional
            • The void type is Unit
            • Avoid using return in a function
            • Beware of missing = in a function definition
            • Exceptions work just like in Java or C++, but you use a "pattern matching" syntax for catch.
            • Scala has no checked exceptions.
          5. 函数/对象定义与使用
          6. 数组,以及其他标准数据结构(list, set, map)
          7. 哪里获得在线帮助?
            1. p12 1.7 scaladoc
              1. http://www.scala-lang.org/api
          8. 类库与大系统make


          1. The Basics (A1)
          1.1. The Scala Interpreter
          1.2. Declaring Variables
          1.3. Commonly Used Types
          1.4. Arithmetic and Operator Overloading
          1.6. The apply Method
          1.7.Scaladoc
          1.8. Exercises

          2. Control Structures and Functions (A1)
          2.1. Conditional Expressions
          2.2. Statement Termination
          2.3. Block Expressions and Assignments
          2.4. Loops
          2.6. _unctions
          2.7. Derault and Yamed Arguments (L1)
          2.8. Variable Arguments (L1)
          2.9. Procedures
          2.10. Lazy Values (L1)
          2.11. Exceptions
          2.12. Exercises

          3. Arrays (A1)
          3.1. _ixed Size Arrays
          3.2. Variable Size Arrays: Array Burrers
          3.3. Traversing Arrays (and Array Burrers)
          3.4. Transrorming Arrays
          3.6. Deciphering ScalaDoc
          3.7. Multi-Dimensional Arrays
          3.8. Interoperating with _ava
          3.9. Exercises

          4. Maps and Tuples (A1)
          4.1. Constructing a Map
          4.2. Accessing Map Values
          4.3. Updating Map Values
          4.4. Iterating Over Maps
          4.6. Interoperating with _ava
          4.7. Tuples
          4.8. Zipping
          4.9. Exercises

          5. Classes (A1)
          _.1. Simple Classes and Parameter-Less Methods
          _.2. Properties with Getters and Setters
          _.3. Properties with Only Getters
          _.4. ObJect-Private _ields
          _.6. Auxiliary Constructors
          _.7. The Primary Constructor
          _.8. Yested Classes (L1)
          _.9. Exercises

          6. Objects (A1)
          6.1. Singletons6.2. Companion ObJects
          6.3. ObJects Extending a Class or Trait
          6.4. The apply Method
          6.6. Enumerations
          6.7. Exercises

          7. Packages and Imports (A1)
          7.1.Packages
          7.2. Scope Rules
          7.3. Chained Package Clauses
          7.4. Top-or-_ile Yotation
          7.6. Package Visibility
          7.7. Imports
          7.8. Imports Can Be Anywhere
          7.9. Renaming and Hiding Members
          7.10.ImplicitImports
          7.11. Exercises

          8. Inheritance(A1)
          8.1. Extending a Class
          8.2. Overriding Methods
          8.3. Type Checks and Casts
          8.4. Protected _ields and Methods
          8.6. Overriding _ields
          8.7. Anonymous Subclasses
          8.8. Abstract Classes
          8.9. Abstract _ields
          8.10. Construction Order and Early Der_nitions (L3)
          8.11. The Scala Inheritance Hierarchy
          8.12. ObJect Equality (L1)
          8.13. Exercises

          9. Files and Regular Expressions (A1)
          9.1. Reading Lines9.2. Reading Characters
          9.3. Reading Tokens and Yumbers
          9.4. Reading rrom URLs and Other Sources
          9.6. Writing Text _iles
          9.7. Visiting Directories
          9.8.Serialization
          9.9. Regular Expressions
          9.10.Regular Expression Groups
          9.11. Exercises

          辛湜 Spark项目理念以及第项目未来的展望 - 高清在线观看 - 腾讯视频

          辛湜 Spark项目理念以及第项目未来的展望 - 高清在线观看 - 腾讯视频

          15.5.15

          Pretty Printing with Gson in Groovy

          #!/usr/bin/env groovy
          @Grab('com.google.code.gson:gson:2.3.1')
          
          import com.google.gson.*;
          
          def gson = new GsonBuilder().setPrettyPrinting().create();
          def jp = new JsonParser();
          
          System.in.eachLine { line ->
              def je = jp.parse(line);
              println(gson.toJson(je));
          }
          
          

          14.5.15

          ITAS Workshop Goals


          • open a Spark Shell
            • 这个没什么,就是把Spark下载下来,解压,然后就可以打开Spark shell。有两个shell,一个是python的,另一个是scala的。
          • develop Spark apps for typical use cases
            • 一会看看有什么typical use cases。
              • 在hadoop学习时,最常用的例子是word count,那么简单的基于频率的sum是其中一个例子,如果再推广,那么可独立的统计是一种应用
              • 再参考数据库查询,各种条件查询,汇总应该也是一类
          • tour of the Spark API
            • 这个不错,可以当学习题纲
          • explore data sets loaded from HDFS, etc.
            • 注意到“loaded from HDFS”——比较好奇两者之间的关系。我估计spark应该能直接应用hdfs上的数据。然而怎样在cluster上用好整个cluster的运算能力,这是一个问题。如果是基于yarn建立的分布式运算,可以从底层进行认识,这个有意思,而且难度也不大。基本上是由Resource manager进行整体资源的管理和定位,再由本地的node manager进行具体的控制,而application master这部分我还是一知半解,需要进一步学习。
            • Spark的特点是建立RDD,如果没有设置运算中的cache的话,所有的查询结果都会从最原始的数据开始,一步步的算过来。它比map/reduce有优势的地方是map/reduce的每一步都涉及到大量的i/o,而spark似乎只是在内存中进行计算 —— 它是否完全放弃disk?我有点怀疑。这个可以在后面的学习中了解到。
          • review of Spark SQL, Spark Streaming, MLlib
            • Spark SQL应该不难,死记一些常用的,和SQL 92对比学习就可以了。
            • Spark Streaming一直看到但不知道是什么。Hadoop Streaming基本上是没什么意思的东西,其实就是把hdfs的数据通过流的样式发送给python等不能直接该问hadoop/hdfs的语言。希望Spark Streaming不是这样的东西。
            • MLlib —— 我需要学习吧?
          • follow-up courses and certification
            • 嗯嗯
          • developer community resources, events, etc.
            • 有点意思,可以了解一下。
          • return to workplace and demo use of Spark!
            • 啊哈

          Spark认证相关资料

          Apache Spark Developer Certification Program

          Exam Preparation


          To prepare for the Spark certification exam, we recommend that you:
          • Are comfortable coding the advanced exercises in Spark Camp or a related training (example exercises can be found here). Spark Camp and Spark Advanced Trainings are held at Strata+Hadoop Worldevents. Additional training opportunities are provided by Databricks.
          • Watch Introduction to Apache Spark by Paco Nathan. In this video training, you’ll complete hands-on technical exercises, and get up to speed on how to use Spark for data exploration, analysis, and building big data applications in Python, Java, or Scala.
          • Have mastered the material released so far in the O’Reilly book,Learning Spark.
          • Have some hands-on experience developing Spark apps in production already.
          • If you’re taking the online version of the exam, please read this information that addresses known issues you may experience when preparing to take the exam.
          • If you are in China, ChinaHadoop delivers Spark classes in Chinese. Register for the online course.
          The test will include questions in Scala, Python, Java, and SQL. However, deep proficiency in any of those languages is not required, since the questions focus on Spark and its model of computation.


          大数据培训

          小象学院 - 中国最专业的Hadoop,Spark大数据在线教育平台——权威课程:Hadoop培训,Spark培训,HBase培训,Hive培训,Mahout培训等

          Jason4Zhu: VCore Configuration In Hadoop

          Jason4Zhu: VCore Configuration In Hadoop

          13.5.15

          Apache YARN Scheduler


          • The FIFO Scheduler
            • Places applications in a queue and runs them in the order of submission.
            • Requests for the first application in the queue are allocated first, then once its requests have been satisfied the next application in the queue is served, and so on.
            • The good part:
              • simple to understand
              • not needing any configuration
            • The bad side:
              • not suitable for shared clusters.
                • Large applications will use all the resources in a cluster
                  • So each application has to wait its turn.
              • On a shared cluster it is better to use the Capacity Scheduler or the Fair Scheduler.
                • Both of these allow long-running jobs to complete in a timely manner,
                  • while still allowing users who are running concurrent smaller ad hoc queries to get results back in a reasonable time.
          • The Capacity Scheduler
            • A separate dedicated queue allows the small job to start as soon as it is submitted,
              • although this is at the cost of
                • overall cluster utilization
                  • since the queue capacity is reserved for jobs in that queue.
                    • This means that the large job finishes later than when using the FIFO Scheduler.
          • The Fair Scheduler
            • There is no need to reserve a set amount of capacity since it will dynamically balance resources between all running jobs.
            • When the second (small) job starts it is allocated half of the cluster resources so that each job is using its fair share of resources.
          • Delay Scheduling

          How YARN was designed to address the limitations in MRv1?

          All from the Book: Hadoop: The Definitive Guide.
          • Scalability
            • MRv1:
              • because the jobtracker has to manage both jobs and tasks, MRv1 hits scalability bottlenecks in the region of 4,000 nodes, and 40,000 tasks.
            • YARN/MRv2 overcomes these limitations by virtue of its split resource manager/application master architecture, which means it is designed to scale up to 10,000 nodes, and 100,000 tasks.
          • Availability
            • With the jobtracker's responsibilities splits between the resource manager and application master in YARN, making the service highly-available became a divide-and-conquer problem: provide HA for the resource manager, then for YARN applications (on a per-application basis). And indeed Hadoop 2 supports HA for both the resource manager, and for the application master for MapReduce jobs, which is similar to my own product.
          • Utilization
            • In MRv1:
              • each tasktracker is configured with a static allocation of fixed size "slots", which are divided into map slots and reduce slots at configuration time.
              • A map slot can only be used to run a map task, and a reduce slot can only be used for a reduce task.
            • In YARN/MRv2:
              • a node manager manages a pool of resources, rather than a fixed number of designated slots. 
              • MapReduce running on YARN will not hit the situation where a reduce task has to wait because only map slots are available on the cluster.
              • If the resources to run the task are available, then the application will be eligible for them.
              • Furthermore, resources in YARN are fine-grained, so an application can make a request for what it needs, rather than for an indivisible slot.
          • Multitenancy

          Apache YARN


          • YARN:
            • Yet Another Resource Negotiator
          • YARN 
            • is Hadoop's cluster resource management system.
            • was introduced in Hadoop 2
            • to improve the MapReduce implementation.
            • but is general enough to support other distributed computing paradigms as well.
          • YARN
            • provides APIs for requesting and working with cluster resources.
              • but these APIs are not typically used directly by user code.
              • (Instead,) users write to higher-level APIs provided by distributed computing frameworks,
                • which themselves are built on YARN and hide the resource management details from the user.

          (There is a further layer of applications that build on the frameworks shown in the figure above. Pig, Hive and Crunch for examples)



          Figures in Hadoop: The Definitive Guide

          Figure 1-1. Structure of the book: there are various pathways through the book





          Figure 2-1. MapReduce logical data flow


          • MapReduce works by breaking the processing into two phases:
            • the map phase and
            • the reduce phase
          • Each phase has key-value pairs as input and output,
            • the types of which may be chosen by the programmer

          Figure 2-2. Data-local (a), rack-local (b), and off-rack (c) map tasks


          Figure 2-3. MapReduce data flow with a single reduce task



          Figure 2-4. MapReduce data flow with multiple reduce tasks


          Figure 2-5. MapReduce data flow with no reduce tasks


          Figure 3-1. Accessing HDFS over HTTP directly and via a bank of HDFS proxies



          Figure 3-2. A client reading data from HDFS


          Figure 3-3. Network distance in Hadoop

          Figure 3-4. A client writing data to HDFS

          Figure 3-5. A typical replica pipeline


          Figure 4-1. YARN applications



          Figure 4-2. How YARN runs an application



          Notice from Figure 4-2 that YARN itself does not provide any way for the parts of the
          The difference between schedulers is illustrated in Figure 4-3, which shows that under
          With the Capacity Scheduler (ii. in Figure 4-3), a separate dedicated queue allows the
          With the Fair Scheduler (iii. in Figure 4-3) there is no need to reserve a set amount of

          Figure 4-3. Cluster utilization over time when running a large job and a small job un‐
          Figure 4-3 contrasts the basic operation of the Capacity Scheduler and the Fair Sched‐
          As we saw in Figure 4-3, a single job does not use more resources than its queue’s
          the same share of resources. We saw in Figure 4-3 how fair sharing works for applications
          each with their own queue, see Figure 4-4. A starts a job and it is allocated all the re‐

          Figure 4-4. Fair sharing between user queues
          doop.io package. They form the class hierarchy shown in Figure 5-1.

          Figure 5-1. Writable class hierarchy
          A sequence file consists of a header followed by one or more records (see Figure 5-2).

          Figure 5-2. The internal structure of a sequence file with no compression and with re‐
          opportunity to take advantage of similarities between records. (See Figure 5-3.) Records

          Figure 5-3. The internal structure of a sequence file with block compression
          so on. This is shown diagrammatically in Figure 5-4.

          Figure 5-4. Row-oriented versus column-oriented storage
          ped. Consider a query of the table in Figure 5-4 that processes only column 2. With
          A screenshot of the home page is shown in Figure 6-1. The “Cluster Metrics” section
          Figure 6-1. Screenshot of the resource manager page
          takes us to the job page, illustrated in Figure 6-2.

          Figure 6-2. Screenshot of the job page
          for all of the map tasks on one page. The screenshot in Figure 6-3 shows this page for

          Figure 6-3. Screenshot of the tasks page
          and allowed transitions between them are shown in Figure 6-4.
          Figure 6-4. Transition diagram of an Oozie workflow
          The whole process is illustrated in Figure 7-1. At the highest level, there are five inde‐
          Figure 7-1. How Hadoop runs a MapReduce job
          submitJobInternal() on it (step 1 in Figure 7-1). Having submitted the job, waitFor
          Streaming runs special map and reduce tasks for the purpose of launching the usersupplied executable and communicating with it (Figure 7-2).

          Figure 7-2. The relationship of the Streaming executable to the node manager and the
          The process is illustrated in Figure 7-3.

          Figure 7-3. How status updates are propagated through the MapReduce system
          some presorting for efficiency reasons. Figure 7-4 shows what happens.
          Figure 7-4. Shuffle and sort in MapReduce
          for the final round. The process is illustrated in Figure 7-5.
          Figure 7-5. Efficiently merging 40 file segments with a merge factor of 10
          listed in Table 8-3 and shown in a diagram of the data flow path in Figure 8-1.

          Figure 8-1. Where separators are used in a Streaming MapReduce job
          as their data source (see Figure 8-2). It provides two things: a place to define which files

          Figure 8-2. InputFormat class hierarchy
          Figure 8-3 shows an example. A single file is broken into lines, and the line boundaries
          Figure 8-3. Logical records and HDFS blocks for TextInputFormat
          previous section. The OutputFormat class hierarchy appears in Figure 8-4.

          Figure 8-4. OutputFormat class hierarchy
          Figure 2-2.
          of temperature buckets. For example, Figure 9-1 shows the distribution for buckets of

          Figure 9-1. Temperature distribution for the weather dataset
          inlined in each output row. This is illustrated in Figure 9-2.

          Figure 9-2. Inner join of two datasets
          illustrated in Figure 10-1. Typically there are 30 to 40 servers per rack (only three are

          Figure 10-1. Typical two-level network architecture for a Hadoop cluster
          For the network in Figure 10-1, the rack topology is described by two network locations,
          tribution Center (KDC). The process is shown graphically in Figure 10-2.

          Figure 10-2. The three-step Kerberos ticket exchange protocol
          proceeds as follows (and is shown schematically in Figure 11-1 for the edit log and image

          Figure 11-1. The checkpointing process
          is illustrated in Figure 5-4.) This chapter looks at Parquet in more depth, but there are
          chunk is written in pages; this is illustrated in Figure 13-1.
          Figure 13-1. The internal structure of a Parquet file
          system is illustrated in Figure 14-1.
          Figure 14-1. Flume agent with a spooling directory source and a logger sink connected
          just like in the previous examples. The flow is illustrated in Figure 14-2.
          Figure 14-2. Flume agent with a spooling directory source and fanning out to an HDFS
          writing them to HDFS, see Figure 14-3. Further tiers may be warranted for very large
          Figure 14-3. Using a second agent tier to aggregate Flume events from the first tier
          The system is illustrated in Figure 14-4.
          Figure 14-4. Two Flume agents connected by an Avro sink-source pair
          purposes, see Figure 14-5. If a second tier agent is unavailable, then events will be de‐

          Figure 14-5. Using multiple sinks for load balancing or failover
          A diagram of the whole system is show in Figure 14-6.
          Figure 14-6. Load balancing between two agents
          At a high level, Figure 15-1 demonstrates how Sqoop interacts with both the database
          Figure 15-1. Sqoop’s import process
          Figure 15-2. When scanning through rows to determine which rows match the criteria
          rows, as in Figure 15-3. Accessing a large object often requires “opening” it through the
          Figure 15-2. Database tables are typically physically represented as an array of rows,

          Figure 15-3. Large objects are usually held in a separate area of storage; the main row
          performs imports. (See Figure 15-4.) Before performing the export, Sqoop picks a strat‐
          Figure 15-4. Exports are performed in parallel using MapReduce
          Hive clients and Hive services is illustrated in Figure 17-1.

          Figure 17-1. Hive architecture
          (see Figure 17-2).
          Figure 17-2. Metastore configurations
          trated in Figure 17-3):

          Figure 17-3. Data flow with partial results for a UDAF
          The plan diagram generated from this pipeline is show in Figure 18-1.
          Figure 18-1. Plan diagram for a Crunch pipeline for calculating a histogram of word
          in Figure 19-1), which passes the call onto the scheduler that runs as a part of the driver

          Figure 19-1. How Spark runs a job
          forces a shuffle stage.5 The resulting DAG is illustrated in Figure 19-2.

          Figure 19-2. The stages and RDDs in a Spark job for calculating a histogram of word
          Going back to Figure 19-1, once the DAG scheduler has constructed the complete DAG
          Assigned tasks are launched through a scheduler backend (step 4 in Figure 19-1), which
          instance is constructed by the driver program (step 1 in Figure 19-3). The context sub‐
          Figure 19-3. How Spark executors are started in YARN client mode
          The spark-submit client will launch the YARN application (step 1 in Figure 19-4), but

          Figure 19-4. How Spark executors are started in YARN cluster mode
          is shown in Figure 20-1.

          Figure 20-1. The HBase data model, illustrated for a table storing photos
          regionserver workers (see Figure 20-2). The HBase master is responsible for bootstrap‐

          Figure 20-2. HBase cluster members
          with the name of the group members (servers). This is shown in Figure 21-1.

          Figure 21-1. ZooKeeper znodes
          but it has no control over this and cannot even know if this is the case.7 See Figure 21-2.
          Figure 21-2. Reads are satisfied by followers, whereas writes are committed by the lead‐
          Figure 21-3). You can query its state at any time by using the getState() method:

          Figure 21-3. ZooKeeper state transitions
          never reconnect (refer to Figure 21-3). We simply rethrow the exception10 and let the
          Figure 22-1.
          Figure 22-1. Operational data flow
          Figure 22-2 shows how components can be wired into one another in novel ways, with
          Figure 22-2. Composable datasets and functions
          eration”) sequencing technology; see Figure 22-3. Sequencing machines are scientific
          Figure 22-3. Timeline of Big Data Technology and Cost of Sequencing a Genome
          see Figure 22-4. This deterministic pairing enables a “copying mechanism”: the DNA
          Figure 22-4. DNA Double Helix Structure
          the reference genome; see Figure 22-512. A complete human genome is about 3 billion

          Figure 22-5. Aligning Reads to a Reference Genome14
          map, and then to the next reduce, and so on (Figure 22-6). That is, key-value pairs are
          Figure 22-6. Counting and sorting in MapReduce
          pipes (Figure 22-7).

          Figure 22-7. Pipes linked by fields and tuples
          (Figure 22-8):

          Figure 22-8. Pipe types
          streams (sinks). See Figure 22-9.
          Figure 22-9. A simple PipeAssembly
          operations that are applied either to individual tuples or groups of tuples (Figure 22-10):

          Figure 22-10. Operation types
          Operations are bound to pipes when the pipe assembly is created (Figure 22-11).
          Figure 22-11. An assembly of operations
          (Figure 22-12). That is, the same pipe assembly can be “instantiated” many times into

          Figure 22-12. A Flow
          between them (Figure 22-13).
          Figure 22-13. How a Flow translates to chained MapReduce jobs
          In this overview, we will focus on the “log processing pipeline” (Figure 22-14). This

          Figure 22-14. The ShareThis log processing pipeline
          just source and sink Taps, trap Taps were planned in (Figure 22-15). Normally, when

          Figure 22-15. The ShareThis log processing Flow

          Interfaces

          FSDataInputStream, Seekable, PositionedReadable


          • The open() method of FileSystem returns a FSDataInputStream
            • rather than a standard java.io class
          • FSDataInputStream is a specialization of java.io.DataInputStream
            • with support of random access
            • so you can read from any part of the stream
              • Those two interface help for this purpose.

          FSDataOutputStream, Progressable

          Glob patterns and PathFilter

          • Hadoop supports the same set of glob characters as Unix bash
          • When glob patterns are not powerful enough to describe a set of files you want to access, you can use PathFilter.
          public FileStatus[] globStatus(Path pathPattern) throws IOException
          public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws
            IOException
          
          Here is the interface of PathFilter
            package org.apache.hadoop.fs;
          
            public interface PathFilter {
              boolean accept(Path path);
            }
          


          Writable


          package org.apache.hadoop.io;
          
          import java.io.DataOutput;
          import java.io.DataInput;
          import java.io.IOException;
          
          public interface Writable {
            void write(DataOutput out) throws IOException;
            void readFields(DataInput in) throws IOException;
          }
          

          The Writable interface defines two methods:

          • One for writing its state to a DataOutput binary stream and
          • One for reading its state from a DataInput binary stream

              public static void main(String[] args) throws Exception {
                  IntWritable iw = new IntWritable(1024);
                  
                  ByteArrayOutputStream baos = new  ByteArrayOutputStream();
                  DataOutputStream dos = new DataOutputStream(baos);
                  iw.write(dos);
                  dos.flush();
                  byte[] data = baos.toByteArray();
                  
                  System.out.println(StringUtils.byteToHexString(data));
                  
                  ByteArrayInputStream bais = new ByteArrayInputStream(data);
                  DataInputStream dis = new DataInputStream(bais);
                  IntWritable iw2 = new IntWritable();
                  iw2.readFields(dis);
                  System.out.println(iw2.get());
              }
          

          WritableComparable and comparators


          • IntWritable implements the WritableComparable interface, which is just a subinterface of the Writable and java.lang.Comparable interface:
          package org.apache.hadoop.io;
          
          public interface WritableComparable extends Writable, Comparable {
          }
          


          • Comparison of types is crucial for MapReduce
            • where there is a sorting phase during which keys are compared with one another.
          • RawComparator is an optimization that Hadoop provides
            • extension of Java's Comparator
            • allows implementors to compare records read from a stream without deserializing them into objects
              • Using big endian may help this also.
          • WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes.
            • It provides two main functions.
              • A default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method.
              • acts as a factory for RawComparator instances
          RawComparator comparator = WritableComparator.get(IntWritable.class);
          
          IntWritable w1 = new IntWritable(163);
          IntWritable w2 = new IntWritable(67);
          assertThat(comparator.compare(w1, w2), greaterThan(0));
          

          GenericOptionsParser, Tool interface and ToolRunner

          • GenericOptionsParser is a class that interprets common Hadoop command-line options and sets them on a Configuration object for your applicatio to use as desired.
          • You don't usually use GenericOptionsParser directly, as it's more convenient to implement the Tool interface and run your application with the ToolRunner
            • which uses GenericOptionsParser internally
          • You can have your App class derives from Configured, which is an implementation of the Configurable interface.
            • All implementation of Tool need to implement Configurable
            • and subclassing Configured is often the easiest way to achieve this.
          • ToolRunner.run() method takes care of creating a Configuration object for the Tool before calling its run() method.
          • ToolRunner also uses a GenericOptionsParser to pick up any standard options specified on the command line and to set them on the Configuration instance. 
            • -conf <conf file>
          • GenericOptionsParser also allows you to set individual properties.
            • hadoop ConfigurationPrinter -D color=yellow | grep color
            • The -D option is used to set the configuration property with key color to the value yellow.
            • Options specified with -D take priority over properties from the configuration files.

          InputSampler, Sampler



          • The InputSampler class defines a nested Sampler interface whose implementations return a sample of keys given an InputFormat and Job
          • This interface usually is not called directly by clients. Instead, the writePartitionFile()  static method on InputSampler is used, which creates a sequence file to store the keys that define the partitions.
          • The sequence file is used by TotalOrderPartitioner to create partitions for the sort job.

          12.5.15

          Four-step Strategy for Incremental Updates in Apache Hive

          Four-step Strategy for Incremental Updates in Apache Hive

          Sqoop Commands

          $ sqoop tool-name [tool-arguments]
          $ sqoop help
          $ HADOOP_HOME=/path/to/some/hadoop sqoop import --arguments...
          $ export HADOOP_HOME=/some/path/to/hadoop
          $ sqoop import --arguments...
          $ sqoop help import
          $ sqoop import --connect jdbc:mysql://localhost/db --username foo --table TEST
          $ sqoop --options-file /users/homer/work/import.txt --table TEST
          $ sqoop import (generic-args) (import-args)
          $ sqoop-import (generic-args) (import-args)
          $ sqoop import --connect jdbc:mysql://database.example.com/employees
          $ sqoop import --connect jdbc:mysql://database.example.com/employees \
              --username aaron --password 12345
          $ sqoop import --driver com.microsoft.jdbc.sqlserver.SQLServerDriver \
              --connect <connect-string> ...
          $ sqoop import \
            --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \
            --split-by a.id --target-dir /user/foo/joinresults
          $ sqoop import \
            --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \
            -m 1 --target-dir /user/foo/joinresults
          $ sqoop import --connnect <connect-str> --table foo --warehouse-dir /shared \
              ...
          $ sqoop import --connnect <connect-str> --table foo --target-dir /dest \
              ...
          $ sqoop import --connect jdbc:mysql://server.foo.com/db --table bar \
              --direct -- --default-character-set=latin1
          $ sqoop import ... --map-column-java id=String,value=Integer
          $ sqoop import --fields-terminated-by , --escaped-by \\ --enclosed-by '\"' ...
          $ sqoop import --optionally-enclosed-by '\"' (the rest as above)...
          $ sqoop import --connect <connect-str> --table SomeTable --package-name com.foocorp
          $ sqoop import --table SomeTable --jar-file mydatatypes.jar \
              --class-name SomeTableType
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              --username SomeUser -P
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              --columns "employee_id,first_name,last_name,job_title"
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              -m 8
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              --direct
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              --class-name com.foocorp.Employee --as-sequencefile
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              --fields-terminated-by '\t' --lines-terminated-by '\n' \
              --optionally-enclosed-by '\"'
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              --hive-import
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              --where "start_date > '2010-01-01'"
          $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \
              --split-by dept_id
          $ hadoop fs -ls EMPLOYEES
          $ hadoop fs -cat EMPLOYEES/part-m-00000 | head -n 10
          $ sqoop import --connect jdbc:mysql://db.foo.com/somedb --table sometable \
              --where "id > 100000" --target-dir /incremental_dataset --append
          $ sqoop import-all-tables (generic-args) (import-args)
          $ sqoop-import-all-tables (generic-args) (import-args)
          $ sqoop import-all-tables --connect jdbc:mysql://db.foo.com/corp
          $ hadoop fs -ls
          $ sqoop export (generic-args) (export-args)
          $ sqoop-export (generic-args) (export-args)
          $ sqoop export --connect jdbc:mysql://db.example.com/foo --table bar  \
              --export-dir /results/bar_data
          $ sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
          $ sqoop-job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
          $ sqoop job --create myjob -- import --connect jdbc:mysql://example.com/db \
              --table mytable
          $ sqoop job --list
          $ sqoop job --show myjob
          $ sqoop job --exec myjob
          $ sqoop job --exec myjob -- --username someuser -P
          $ sqoop metastore (generic-args) (metastore-args)
          $ sqoop-metastore (generic-args) (metastore-args)
          $ sqoop merge (generic-args) (merge-args)
          $ sqoop-merge (generic-args) (merge-args)
          $ sqoop merge --new-data newer --onto older --target-dir merged \
              --jar-file datatypes.jar --class-name Foo --merge-key id
          $ sqoop codegen (generic-args) (codegen-args)
          $ sqoop-codegen (generic-args) (codegen-args)
          $ sqoop codegen --connect jdbc:mysql://db.example.com/corp \
              --table employees
          $ sqoop create-hive-table (generic-args) (create-hive-table-args)
          $ sqoop-create-hive-table (generic-args) (create-hive-table-args)
          $ sqoop create-hive-table --connect jdbc:mysql://db.example.com/corp \
              --table employees --hive-table emps
          $ sqoop eval (generic-args) (eval-args)
          $ sqoop-eval (generic-args) (eval-args)
          $ sqoop eval --connect jdbc:mysql://db.example.com/corp \
          $ sqoop eval --connect jdbc:mysql://db.example.com/corp \
              -e "INSERT INTO foo VALUES(42, 'bar')"
          $ sqoop list-databases (generic-args) (list-databases-args)
          $ sqoop-list-databases (generic-args) (list-databases-args)
          $ sqoop list-databases --connect jdbc:mysql://database.example.com/
          $ sqoop list-tables (generic-args) (list-tables-args)
          $ sqoop-list-tables (generic-args) (list-tables-args)
          $ sqoop list-tables --connect jdbc:mysql://database.example.com/corp
          $ sqoop help [tool-name]
          $ sqoop-help [tool-name]
          $ sqoop help
          $ bin/sqoop help import
          $ sqoop version
          $ sqoop-version
          $ sqoop version
          $ sqoop import --table foo \
              --connect jdbc:mysql://db.example.com/someDb?zeroDateTimeBehavior=round
          $ sqoop import -D oracle.sessionTimeZone=America/Los_Angeles \
              --connect jdbc:oracle:thin:@//db.example.com/foo --table bar

          Efficiency Comparison of All Possible Ways to Dump Database (mysql) into HDFS


          • sqoop + mysql connector
            • It use mysqldump command to do the job, which makes it the most efficient solution.
          • sqoop + mysql jdbc driver
          • MapReduce + jdbc driver
            • It is good but less efficient than sqoop + mysql jdbc driver because sqoop will optimize the job.
          • mysqldump to local file system than copy to hdfs
            • overhead of storing the data twice.

          Implicities in Hadoop Jobs

          1. If no mapper is set, IdentityMapper will be used, or Mapper will be used since version <version?>.
          2. If IdentityMapper is used and the job doesn't set up mapper output classes, classes used for output of reducers will be reused for mapper's output, which can cause problem -> IOException instead of ClassCastException:

          MapTask.class (2.6.0)


              public synchronized void collect(K key, V value, final int partition
                                               ) throws IOException {
                reporter.progress();
                if (key.getClass() != keyClass) {
                  throw new IOException("Type mismatch in key from map: expected "
                                        + keyClass.getName() + ", received "
                                        + key.getClass().getName());
                }
                if (value.getClass() != valClass) {
                  throw new IOException("Type mismatch in value from map: expected "
                                        + valClass.getName() + ", received "
                                        + value.getClass().getName());
                }
                if (partition < 0 || partition >= partitions) {
                  throw new IOException("Illegal partition for " + key + " (" +
                      partition + ")");
                }
          
          
          


          3. Default file location for Hive DB and Tables:

          /user/hive/warehouse/<db>.db/<table>

          Need to remember


          • How do you configure a MapReduce job so that a single map task processes each input file regardless of how many blocks the input file occupies?
            • Write a custom FileInputFormat and override the method isSplitable to always return false.
              • The isSplitable() method in your InputFormat is passed each filename; if it returns true then the file can be broken up and processed by multiple Mappers. If it returns false then the file is considered to be 'not splittable' - that is, the entire file must be processed by a single Mapper.
          • To make sure jar files other than the one with the Driver Class gets distributed to all nodes in the cluster, the hadoop command should be:
            • % hadoop jar job.jar MyDriver -libjar ex1.jar:ex2.jar
            • Just to remember <hadoop jar job.jar MyDriver> is the main part.

          • InputFormat
            • SequenceFileInputFormat
            • SequenceFileInputFormat
            • TextInputFormat
            • ObjectPositionInputFormat
            • FileInputFormat
          • ToolRunner
          • LocalJobRunner
          • job.setNumReduceTasks(8);job.setPartitionerClass(MyPartitioner.class);
          • Write a custom FileInputFormat and override the method isSplitable to always return false
          • MRUnit 
          • hadoop fs -setrep 4 f1
            • hadoop fs -Ddfs.replication=4 -cp f1 f1.tmp; hadoop fs -rm f1; hadoop fs -mv f1.tmp f1
          • How are keys and values presented and passed to the reduce() method during a standard shuffle and sort phase of MapReduce?
          • Does the MapReduce programming model provide a way for reduce tasks to communicate with each other?
          • Speculative Execution
          • WritableComparable
          • Workflow of Oozie
          • A single map task processes <>?
          • Hive table field delimiters
          • When testing a Reducer using MRUnit, you should only pass the Reducer a single key and list of values. In this case, we use the withInput() method twice, but only the second call will actually be used -- the first will be overridden by the second. If you want to test the Reducer with two inputs, you would have to write two tests.

          sqoop 常用命令整理(一) - 岑玉海 - 博客园


          11.5.15

          Salesforce SOQL/SOSL


          SOAP Request Message to Salesforce Login

          Not sure why, but when I left the first line of the SOAP login request message as empty, I got 500 from salesforce.


          def templ = '''
          <?xml version="1.0" encoding="utf-8" ?>
                            <env:Envelope
                              xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                              xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">
                              <env:Body>
                                <n1:login xmlns:n1="urn:partner.soap.sforce.com">
                                  <n1:username>{username}</n1:username>
                                  <n1:password>{password}{token}</n1:password>
                                </n1:login>
                              </env:Body>
                            </env:Envelope>''';
          
          def loginBody = templ.
              replace("{username}", p.getProperty("salesforce.username")).
              replace("{password}", p.getProperty("salesforce.password")).
              replace("{token}", p.getProperty("salesforce.token"));
          
          

          Cloudera forgot to create user hive and didn't set the password for hue?

          Not sure why, but in Quickstart VM for CDH5.4.0, I got errors for logging into mysql with hive@localhost and hue@localhost.

          hive-metastore.log

          2015-05-11 08:31:38,162 ERROR [BoneCP-pool-watch-thread]: bonecp.BoneCP (BoneCP.java:obtainInternalConnection(292)) - Failed to acquire connection to jdbc:mysql://127.0.0.1/metastore?createDatabaseIfNotExist=true. Sleeping for 7000 ms. Attempts left: 5
          java.sql.SQLException: Access denied for user 'hive'@'localhost' (using password: YES)
                  at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)
                  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
                  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
                  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:870)
                  at com.mysql.jdbc.MysqlIO.secureAuth411(MysqlIO.java:4332)
                  at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1258)
                  at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2234)
                  at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2265)
                  at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2064)
                  at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:790)
                  at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44)
                  at sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown Source)
                  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
                  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
                  at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
                  at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:395)
                  at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:325)
                  at java.sql.DriverManager.getConnection(DriverManager.java:571)
                  at java.sql.DriverManager.getConnection(DriverManager.java:187)
                  at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:361)
                  at com.jolbox.bonecp.BoneCP.obtainInternalConnection(BoneCP.java:269)
                  at com.jolbox.bonecp.ConnectionHandle.<init>(ConnectionHandle.java:242)
                  at com.jolbox.bonecp.PoolWatchThread.fillConnections(PoolWatchThread.java:115)
                  at com.jolbox.bonecp.PoolWatchThread.run(PoolWatchThread.java:82)
                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                  at java.lang.Thread.run(Thread.java:745)

          Need to create user hive@localhost into mysql and then everything works fine. 

          Sqoop Usage

          [cloudera@quickstart ~]$ sqoop help
          Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
          Please set $ACCUMULO_HOME to the root of your Accumulo installation.
          15/05/11 07:15:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.4.0
          usage: sqoop COMMAND [ARGS]

          Available commands:
            codegen            Generate code to interact with database records
            create-hive-table  Import a table definition into Hive
            eval               Evaluate a SQL statement and display the results
            export             Export an HDFS directory to a database table
            help               List available commands
            import             Import a table from a database to HDFS
            import-all-tables  Import tables from a database to HDFS
            import-mainframe   Import datasets from a mainframe server to HDFS
            job                Work with saved jobs
            list-databases     List available databases on a server
            list-tables        List available tables in a database
            merge              Merge results of incremental imports
            metastore          Run a standalone Sqoop metastore
            version            Display version information

          See 'sqoop help COMMAND' for information on a specific command.


          1. 一个简单的例子

          % sqoop import --connect jdbc:mysql://localhost/hadoopguide --table widgets -m 1

          import: 命令字,从SQL数据库里导入数据
          --connect <jdbc url>,指定connector
          --table <table name>,指定数据库表名
          -m 1,指定只使用一个mapper

          这将会建立 hdfs://<host>/user/<username>/<tablename>目录,并成生一个结果:part-m-00000

          2. 指定classname

          % sqoop import --connect <jdbc url> --table <table name> --class-name <class name>

          3. --query

          4. Incremental Imports

          5. 来自clouder quickstart的例子:

          $ sqoop import-all-tables \
              -m 1 \
              --connect jdbc:mysql://quickstart:3306/retail_db \
              --username=retail_dba \
              --password=cloudera \
              --compression-codec=snappy \
              --as-avrodatafile \
              --warehouse-dir=/user/hive/warehouse

          这里把整个库都转进来。





          Sqoop.....

          1. Funny version numbers:

          Latest stable release is 1.4.6 (downloaddocumentation). Latest cut of Sqoop2 is 1.99.6 (downloaddocumentation). Note that 1.99.6 is not compatible with 1.4.6 and not feature complete, it is not intended for production deployment.

          2. Quick demo

          Sqoop 5 Minutes Demo — Apache Sqoop documentation

          3. Compatibility with Hadoop

          Sqoop server supports multiple Hadoop versions. However as Hadoop major versions are not compatible with each other, Sqoop have multiple binary artefacts - one for each supported major version of Hadoop. You need to make sure that you’re using appropriated binary artifact for your specific Hadoop version.

          hadoop - Sqoop - Could not find or load main class org.apache.sqoop.Sqoop - Stack Overflow

          make sure you have sqoop-1.4.3.jar under your SQOOP HOME directory.

          Note : May be because you had downloaded wrong distribution under Sqoop Distribution

          shareimprove this answer
          answered Mar 20 '13 at 23:18

          Laxmikanth Samudrala
          82721426
           
          thanks for your answer; right distribution is sqoop-x.y.z.bin__hadoop.a.b.c; now it's working. –  talha06 Mar 21 '13 at 7:48

          It seems sqoop is quit difficult to install.. Just try Cloudera Quick VM for now.