• Batch Processing with Unix Tools

    • many data analyses can be done in a few minutes using some combination of awk, sed, grep, sort, uniq, and xargs, and they perform surprisingly well

      Untitled

      Untitled

    • Chain of commands versus custom program

      • ruby script that has hashmap of urls → counts
    • Sorting versus in-memory aggregation

      • The Unix pipeline example does not have such a hash table, but instead relies on sorting a list of URLs in which multiple occur‐ rences of the same URL are simply repeated
      • Which is better depends on data size
        • Have to fit whole hashmap in memory for ruby style
        • sort utility in unix handles larger than memory datasets by spilling to disk and automatically parallelizes across cores
  • The Unix Philosophy

    • “We should have some ways of connecting programs like [a] garden hose—screw in another segment when it becomes necessary to massage data in another way. This is the way of I/O also.”
    • the idea of connecting pro‐ grams with pipes became part of what is now known as the Unix philosophy

    Untitled

    • A uniform interface
      • If you want to be able to connect any program’s output to any pro‐ gram’s input, that means that all programs must use the same input/output interface.
      • In unix, that’s a file (descriptor)
      • Today it’s an exception, not the norm, to have programs that work together as smoothly as Unix tools do.
    • Separation of logic and wiring
      • the program doesn’t know or care where the input is coming from and where the output is going to. (One could say this is a form of loose coupling, late binding [15], or inversion of control [16].)
      • there are limits to what you can do with stdin and stdout (multiple inputs, pipe output to network connection, etc)
    • Transparency and experimentation
      • input files to Unix commands are normally treated as immutable
      • can end the pipeline at any point, pipe the output into less, and look at it to see if it has the expected form
      • write the output of one pipeline stage to a file and use that file as input to the next stage (checkpoint)
    • the biggest limitation of Unix tools is that they run only on a single machine—and that’s where tools like Hadoop come in
  • MapReduce and Distributed Filesystems

    • A single MapReduce job is comparable to a single Unix process: it takes one or more inputs and produces one or more outputs.

      • does not have any side effects other than producing the output
    • MapReduce jobs read and write files on a distributed filesystem. In Hadoop’s implementation of Map‐ Reduce, that filesystem is called HDFS (Hadoop Distributed File System)

      • other distributed filesystems besides HDFS exist, such as GlusterFS and the Quantcast File System (QFS) [20]. Object storage services such as Amazon S3, Azure Blob Storage, and OpenStack Swift [21] are similar
    • HDFS is based on the shared-nothing principle - shared-nothing approach requires no special hardware, only computers connected by a conventional datacenter net‐ work

    • HDFS conceptually creates one big filesystem that can use the space on the disks of all machines running the daemon

      • to tolerate machine and disk failures, file blocks are replicated on multiple machines
      • techniques are similar to RAID, which provides redundancy across several disks attached to the same machine, but this over network
    • the cost of data storage and access on HDFS, using commodity hardware and open source software, is much lower than that of the equivalent capacity on a dedicated storage appliance

    • MapReduce Job Execution

      Untitled

      • Steps 2 (map) and 4 (reduce) are where you write your custom data processing code - the role of the mapper is to prepare the data by putting it into a form that is suitable for sorting, and the role of the reducer is to process the data that has been sorted

        • The mapper is called once for every input record, and its job is to extract the key and value from the input record. For each input, it may generate any number of key-value pairs (including none). It does not keep any state from one input record to the next, so each record is handled independently.
        • The MapReduce framework takes the key-value pairs produced by the mappers, collects all the values belonging to the same key, and calls the reducer with an iterator over that collection of values. The reducer can produce output records (such as the number of occurrences of the same URL).
        • In Hadoop MapReduce, the mapper and reducer are each a Java class that implements a particular interface. In MongoDB and CouchDB, mappers and reducers are JavaScript functions
      • Distributed execution of MapReduce

        • mapper and reducer only operate on one record at a time; they don’t need to know where their input is coming from or their output is going to, so the framework can handle the complexities of moving data between machines

        Untitled

        • To ensure that all key-value pairs with the same key end up at the same reducer, the framework uses a hash of the key to determine which reduce task should receive a particular key-value pair
        • the sorting is per‐ formed in stages. First, each map task partitions its output by reducer, based on the hash of the key. Each of these partitions is written to a sorted file on the mapper’s local disk
        • Whenever a mapper finishes reading its input file and writing its sorted output files, the MapReduce scheduler notifies the reducers that they can start fetching the output files from that mapper
          • The process of partitioning by reducer, sorting, and copying data partitions from mappers to reducers is known as the shuffle
        • The reduce task takes the files from the mappers and merges them together, preserv‐ ing the sort order
        • The reducer is called with a key and an iterator that incrementally scans over all records with the same key (which may in some cases not all fit in memory). The reducer can use arbitrary logic to process these records, and can generate any number of output recordsThe reducer is called with a key and an iterator that incrementally scans over all records with the same key (which may in some cases not all fit in memory). The reducer can use arbitrary logic to process these records, and can generate any number of output records
      • MapReduce workflows

        • it is very common for MapReduce jobs to be chained together into workflows, such that the output of one job becomes the input to the next job
        • like a sequence of commands where each command’s output is written to a temporary file, and the next command reads from the tempo‐ rary filelike a sequence of commands where each command’s output is written to a temporary file, and the next command reads from the tempo‐ rary file
        • A batch job’s output is only considered valid when the job has completed successfully (MapReduce discards the partial output of a failed job)
          • various workflow schedulers for Hadoop have been developed, including Oozie, Azkaban, Luigi, Airflow, and Pinball
          • higher-level tools for Hadoop, such as Pig [30], Hive [31], Cascading [32], Crunch [33], and FlumeJava [34], also set up workflows of multiple MapReduce stages that are automatically wired together appropriately
    • Reduce-Side Joins and Grouping

      • When we talk about joins in the context of batch processing, we mean resolving all occurrences of some association within a dataset. For example, we assume that a job is processing the data for all users simultaneously

      • Example: analysis of user activity events

        Untitled

      • think of this example as being part of a star schema - the log of events is the fact table, and the user data‐ base is one of the dimensions

    • In order to achieve good throughput in a batch process, the computation must be (as much as possible) local to one machine. Making random-access requests over the network for every record you want to process is too slow. Moreover, querying a remote database would mean that the batch job becomes nondeterministic, because the data in the remote database might change.

    • take a copy of the user database and to put it in the same distributed filesystem as the log of user activity events. You would then have the user database in one set of files in HDFS and the user activity records in another set of files, and could use MapReduce to bring together all of the relevant records in the same place and process them efficiently

    • Sort-merge joins

      Untitled

      • perform the actual join logic in the reducers, and are hence known as reduce-side joins
      • has the advantage that you do not need to make any assumptions about the input data
      • downside is that all that sorting, copying to reducers, and merging of reducer inputs can be quite expensive
      • Since the reducer processes all of the records for a particular user ID in one go, it only needs to keep one user record in memory at any one time, and it never needs to make any requests over the network. This algorithm is known as a sort-merge join, since mapper output is sorted by key, and the reducers then merge together the sorted lists of records from both sides of the join.
      • Bringing related data together in the same place
        • the mappers and the sorting process make sure that all the nec‐ essary data to perform the join operation for a particular user ID is brought together in the same place: a single call to the reducer
        • the MapReduce programming model has separated the physical network com‐ munication aspects of the computation (getting the data to the right machine) from the application logic (processing the data once you have it)
      • GROUP BY
        • The simplest way of implementing such a grouping operation with MapReduce is to set up the mappers so that the key-value pairs they produce use the desired grouping key
        • Thus, grouping and joining look quite similar when implemented on top of MapReduce
      • Handling skew
        • The pattern of “bringing all records with the same key to the same place” breaks down if there is a very large amount of data related to a single key
        • Collecting all activity related to a celebrity (e.g., replies to something they posted) in a single reducer can lead to significant skew (also known as hot spots)
    • Map-Side Joins

      • if you can make certain assumptions about your input data, it is possible to make joins faster by using a so-called map-side join
      • Broadcast hash joins
        • simplest way of performing a map-side join applies in the case where a large dataset is joined with a small dataset
        • the small dataset needs to be small enough that it can be loaded entirely into memory in each of the mappers
        • when a mapper starts up, it can first read the user database from the distributed filesystem into an in-memory hash table. Once this is done, the mapper can scan over the user activity events and simply look up the user ID for each event in the hash table
        • the word broadcast reflects the fact that each mapper for a partition of the large input reads the entirety of the small input (so the small input is effectively “broadcast” to all partitions of the large input), and the word hash reflects its use of a hash table
        • an alternative is to store the small join input in a read-only index on the local disk
      • Partitioned hash joins
      • Map-side merge joins
      • MapReduce workflows with map-side joins
  • The Output of Batch Workflows

  • Comparing Hadoop to Distributed Databases

  • Beyond MapReduce

  • High-Level APIs and Languages