Skip to content

Hadoop

MR

  • reducer writes to HDFS, whereas mapper writes to local (unless the MR job has no reducers)
    • reducer writes are costly due to HDFS redundant writes
  • combiner function runs before reducer
    • Hadoop may call combiner function
  • Hadoop streaming uses Unix pipes to allow non-java programs to be used in MR
  • JobControl class allows ordering multiple jobs ordered as a DAG but evaluate dependencies on the client machine
    • Oozie is a more robust solution that runs as a service

HDFS

  • consists of 1 namenode and multiple datanode
  • namenode knows which blocks are on each datanode, but does not store their exact locations persistently
    • this is reconstructed from datanodes when the system starts
  • All data would be lost if namenode were lost, so for HA have,
    • namenode write metadata to two places, usually NFS mounted storage
    • a secondary namenode running on separate machine which periodically merges edit log with namespace image
      • data loss is still possible since a secondary namenode lags the primary namenode
    • a hot standby namenode (instead of a secondary) must use HA shared storage for storing metadata
  • namenode stores blocks metadata in namespace image and edit log files
  • Files cannot have multiple concurrent writers and are written in append-only mode.
  • Not good for low-latency, large number of small files
  • Applications can direct namenode which files to cache for better performance
    • namenode doesn't itself do any caching but delegates caching to datanodes
  • Hadoop 2.x introduced HDFS federation, which let's a volume to be maintained by separate namenode
    • keeps namenode from getting too big as was the case in 1.x
    • failure of one namenode doesn't affect others
    • since blockpool storage is not partitioned, datanodes register with each namenode
  • Various URI schemes besides hdfs to allow different filesystem with uniform interface. Eg
    • local, s3a: AWS S3, wasb: Azure, ftp, viewfs: client-side mount table for federated HDFS
  • Also available via HTTP REST API for non-java clients either directly or through proxy servers
  • Clients connect to namenode first to get a list of blocks with their locations ondatanodes and then connect directly to data nodes

HA

  • Hadoop 1.x had long recovery time due to having to replay edit log and getting metadata from all datanodes
  • Hadoop 2.x introduced standby namenode to reduce startup time upon failure
  • datanodes must send block reports to both namenodes since they are in namenodes' memory and not persistent storage
  • standby namenode also acts as secondary namenode
  • metadata shared storage can be NFS or QJM (Quoram Journal Manager)
    • QJM uses multiple nodes for writes, and writes must be successful on majority of nodes
  • Uses zookeeper for electing active namenode
  • Fencing is process for preventing both namenodes to think they are the primary.
    • QJM enforces this by letting only one namenode to perform writes. NFS requires stricter fencing

V/S Cloud

  • When data is small and localized on HDFS (no shuffle), HDFS may be better than Cloud object store
    • ROT: Keep small data in cloud but copy it to HDFS before processing
  • MR jobs running on Hadoop 2.x but < Hadoop 2.7, run faster on HDFS
  • S3 v/s HDFS
    • S3 is 10x cheaper when including human cost, or 5x cheaper for just h/w
    • HDFS can provide up to 6x performance
    • S3 is ~2x for the same performace/price compared to HDFS
    • S3 provide more reliable storage than HDFS
    • HDFS provides transactional writes (through atomic renames). Since S3 does not, data integrity can be complex

YARN

  • Consists of resource manager (/cluster) and node managers (/node)
    • node managers run applications in a container
    • timeline server stores application history
  • Scheduling policies: FIFO, Capacity and Fair Scheduling
    • Fair scheduling optionally permits preemption
  • Application life-cycle
    1. Application request resource-manager to launch which is usually an Application Master process
    2. resource-manager finds a node-manager to run the application
    3. node-manager creates a container and runs application-master
    4. application-master can either perform processing itself and return the results back, or
    5. application-master can request request-manger to allocate more containers to run processes in parallel
  • Applications can specify locality constraints when requesting containers
    • constraint can be specifi node, rack or anywhere in the cluster
    • constraints can be loose or strict, which decides if application falls back or fails if locality cannot be fulfilled
  • Application can run using different models:
    • One per job, eg map-reduce
    • One per user session, eg spark
    • Shared by multiple users, eg Impala, Slider etc

Other

  • Avro can use utf8 or Java String. utf8 is more space efficient because it's lazily evaluated
  • Parquet can efficiently store/retrieve deeply nested fields in columnar format

Flume

  • high-volume ingestion of event-based data
  • consists of channels which connecto source and sink
    • eg log files as source and HDFS/HBase as sink
  • Fan Out refers to having more than 1 sink to deliver same data
    • can also filter events by using a multiplexing selector
  • channel maintains separate transactions for source and sink
  • channel can be file (durable) or memory (fast)
  • every event will reach sink at least once
  • can use batches for efficiency

Sqoop

  • Uses database specific Connector for bulk data load
  • Some vendors pre-configure it to use TDCH

Hive

  • Provides an SQL abstraction over MR
  • Managed and external tables
  • LOADing an external HDFS file into a managed table actually moves the data, so dropping it later will lose the data
  • Creating partitions creates nested folders witch each level named with column-name=value
    • CLUSTERED BY (<col>) INTO 4 BUCKETS uses module on hash value of col

Zookeeper

  • highly available, high performance coordination service
    • HA works by replication within a cluster of servers call ensemble
    • uses zab protocol which is similar to RAFT
  • data is stored in znode as hiearchical path -- like Unix filesystem
    • data is very small (<1MB) and accessed atomocally (always all of the data read/written)
  • znode are either ephemeral (deleted when client session ends) or persistent
  • sequential znode has a sequence# automatically appended to its name
    • useful for imposing global ordering, eg handing out a lock on a resource
  • operations: create, delete, exists, {get,set}ACL, getChildren, {get,set}Data, sync
    • Can set a watch (triggered only once) on data operations allowing to get notified for changes
    • Update operations require a version (obtained from a prior exist) that must match current version
  • clients usually establish a long lived session with any server from the ensemble
    • must keep the session alive with heartbeats
    • ephemeral znodes are lost when session terminates
  • znode can be protected by ACLs
    • authentication is performed by ip, sasl (Kerberos), digest (user/password)

Misc

  • HDFS block size are max value, a smaller block size will not use the entire 128MB of space
  • HDFS blocks are checksumed and verified on each read.
    • Datanode block scanner is a background process that periodically verifies all blocks in a datanode
  • bzip2 compressed files are splittable, but files compressed most others codec are not
  • hadoop distcp command provides an efficient way (without flowing data to the client) to copy data from one HDFS to another
    • ROT: use 20 mappers per node for fast and balanced data copy
  • YARN doesn't provide ways for application running in containers to communicate, application specific methods (such as RPC) are used

Storage Formats

  • NSM (N-ary Storage Model) layout is row-oriented storage
  • DSM Columnar storage
  • PAX (Partition Attribute Across) layout:
    • A batch is group of rows; a batch is columnar.
  • Protocols like ODBC and JDBC are row oriented, which means transferring data to/from column storage systems involve a lot of computations to assemble data from columns to rows and back to columns
  • ADBC solves the problem by transferring data in columnar layout, which uses Arrow