Rate This Document
Findability
Accuracy
Completeness
Readability

OmniShuffle

In big data analysis scenarios, the massive amount of data comes from multiple heterogeneous sources and the data analysis cost is high. In most of the scenarios, a large number of drive shuffles exist in the data iteration process. As a result, data analysis and index construction take a long time.

As a performance acceleration component of the big data engine Spark, OmniShuffle, runs in big data clusters of the customer's data center. It employs effective features such as unified addressing of the memory pool, data exchange in memory semantics, and converged shuffle to reduce the drive I/O overhead, quicken the data analysis process, and improve cluster resource utilization.

Figure 1 OmniShuffle acceleration principle

Key Technologies of OmniShuffle

  • In-memory shuffle: It is the core technology used by OmniShuffle for acceleration. The basic principle of in-memory shuffle is to optimize the shuffle process. Specifically, the original process of open source serialization > compression > flushing to drives > TCP transmission > decompression > deserialization is optimized to memory semantic operations. In-memory shuffle reduces the data flushing overhead and improves data read and write efficiency in the shuffle process.
  • Data aggregation: In the Spark shuffle process, a large number of data blocks are exchanged between nodes. In a typical application scenario, hundreds of thousands or even millions of cross-node data exchanges are involved based on the number of Spark executors. The cross-node data block exchanges bring about heavy protocol and network overheads. Each exchange requires a series of operations such as link establishment, file opening, data transfer, file closing, and link closing. If the size of each to-be-exchanged data block is relatively small, for example, only dozens or hundreds of KB, the protocol overheads during the data exchanges will become a major bottleneck. Tests show that the end-to-end process of a 100 KB data block exchange takes dozens of milliseconds, but the real data transfer takes as short as 1 millisecond. The protocol overhead accounts for more than 90% of the transfer time. After shuffle data is generated in different map tasks, OmniShuffle aggregates the data of the same partition in the same shuffle phase on a single node to one or more continuous memory blocks. After the aggregation is complete, the original memory blocks are released. The data aggregation increases the size of I/Os transferred over the network to avoid the problem where the network bandwidth cannot be fully utilized in the case of small I/Os. In this way, OmniShuffle reduces the number of data transfers and therefore transfers shuffle data within a shorter time.
  • Shuffle convergence: In-memory data processing is an underlying operation of OmniShuffle. However, it cannot be implemented in all scenarios. When the memory space required for data processing is greater than the memory pool capacity, the data processing job must be completed properly while minimizing the impact on performance. With shuffle convergence, if the memory pool capacity is insufficient to store all shuffle data, part of shuffle data in the memory resources is stored temporarily to local file systems and in this way, shuffle data is exchanged between nodes in batches.
  • Columnar shuffle: OmniShuffle and OmniOperator combine to further improve the performance. OmniShuffle improves the efficiency of data shuffle between nodes, and OmniOperator improves the computing performance of operators on a node. However, OmniOperator accelerates operators based on the columnar storage structure outside the heap, whereas Spark processes data in rows. Therefore, multiple row-column conversions, copy inside and outside the heap, and serialization/deserialization exist in the Spark end-to-end process. As a result, the end-to-end performance of tasks is affected. OmniShuffle implements native shuffle to eliminate memory copy operations inside and outside the heap. The columnar data structure outside the heap is adapted in the writer and reader to eliminate row-column conversion and serialization/deserialization, further improving the joint performance effect of OmniShuffle and OmniOperator.
  • ESS and RSS modes: In ESS mode, OmniShuffle and Spark are deployed on the same node. In RSS mode, OmniShuffle is deployed in a separate cluster to decouple the storage cluster from the compute cluster and provide a general Shuffle service that features high performance, high availability, and scalability.
  • Boost Tuning: OCK Boost Tuning for Spark SQL automatically adjusts the parallelism degree of Spark SQL jobs in real time based on historical data, eliminating the need to manually optimize the parallelism degree and reducing spills in the shuffle-reduce process by 90%. Due to this, OCK Boost Tuning quickens big data cluster jobs while increasing the job throughput.

Performance Improvement with OmniShuffle

Two Kunpeng 920 5220 processors and 384 GB memory per node in a four-node cluster (three compute nodes and one management node), at least 10GE network (10GE TCP, 25GE TCP/RDMA, 100GE TCP/RDMA), typical configuration of 12 x 4 TB SATA drives.

In ESS mode:

  • In TeraSort scenario: over 40% higher performance for 1 TB of data
  • In PageRank (Spark Core) scenario: more than doubled performance for 90 GB of data
  • In TPC-DS benchmark tests, for 8 TB of data, OmniShuffle improves the Spark performance by 30%, and the combination of OmniShuffle and OmniOperator improves the Spark performance by more than 60%.
    Figure 2 OmniShuffle TPC-DS test result
    Figure 3 OmniShuffle+OmniOperator TPC-DS test result

In RSS mode:

In TPC-DS benchmark tests, for 3 TB of data, the performance is 10% higher than that of Celeborn.

Figure 4 OmniShuffle TPC-DS test result