Home > IEEE Papers > Data-aware scheduling in grid computing

Data-aware scheduling in grid computing

Authors:  Tevfik Kosar, Mehmet Balman
Efficient and reliable access to large-scale data sources and archiving destinations in a widely distributed computing environment brings new challenges. The insufficiency of the traditional systems and existing CPU-oriented batch schedulers in addressing these challenges has yielded a new emerging era: data-aware schedulers. In this article, we discuss the limitations of the traditional CPU-oriented batch schedulers in handling the challenging data management problem of large-scale distributed applications; give our vision for the new paradigm in data-intensive scheduling; and elaborate on our case study: the Stork data placement scheduler.

Essential matter:
Large experiments, such as high-energy physics simulations, genome mapping, and climate modeling generate data volumes reaching hundreds of terabytes. Data transferred from satellites and remote sensors too generate huge data. Processing such data requires distributed resources. Distributed resources imposes new challenges i.e. managing these resources, scheduling and allocation of storage resources, and efficient data movement.

A middleware is needed for scheduling and managing the tasks, as well as resources. The management of storage resources and data movement is main bottleneck. Overload of write data transfers on remote storage resources and concurrent read data transfers can crash server. Traditional Distributed systems closely couple data handling and computation. Data access is considered as a side effect of computation. The insufficiency of the traditional systems and existing CPU-Oriented schedulers in dealing with complex data handling has urged the necessity for Data-aware schedulers. Example of such schedulers is Stork Data placement Scheduler.

Two data management tools have been used for efficient and reliable Data placement(access, retrieve, movement) (a) Reliable File Transfer Service(RFT) (b) Lightweight Data Replicator(LDR).

Purpose of this tools

  • reliable transfer.
  • handling possible failures like dropped connections, machine reboots, temporary network outages via retrying.
  • This tools are placed on top of GridFTP, which is already a secure and reliable trasfer protocol for High bandwidth WAN.

PetaShare a data-intensive collaborative prohject aims to enable domain scientists to focus on their primary research problem, assured that the underlying infrastructure will manage the low-level data handling issues. The key technologies that are being developed in this project include data-aware storage systems, data-aware schedulers, and cross-domain metadata schemes which take the responsibility of managing data resources and scheduling data tasks from the user and perform these tasks

Data-aware scheduler: Stork
Stork is especially designed to understand the semantics and characteristics of data placement tasks, which can include data transfer, storage allocation and de-allocation, data removal, metadata registration and un-registration, and replica location.

Stork uses the ClassAd( Distributed resource management for high throughput computing, in: Proceedings of the Seventh IEEE International Symposium on High Performance Distributed Computing, HPDC7, Chicago, IL, July 1998) job description language to represent the data placement jobs. The ClassAd language provides a very flexible and extensible data model that can be used to represent arbitrary services and constraints. This flexibility allows Stork to specify job level policies as well as global ones. Global policies apply to all jobs scheduled by the same Stork server. Users can override them by specifying job level policies in job description ClassAds. Stork can interact with higher level planners and workflow managers. This allows the users to schedule both CPU resources and storage resources together. We have introduced a new workflow language capturing the data placement jobs in the workflow as well. The enhancements made to the workflow manager (i.e. DAGMan) allow it to differentiate between computational jobs and data placement jobs. The workflow manager can then submit computational jobs to a computational job scheduler, such as Condor or Condor-G, and the data placement jobs to Stork.

Stork also acts like an I/O control system (IOCS) between the user applications and the underlying protocols and data storage servers. It provides complete modularity and extendibility. The users can add support for their favorite storage system, data transport protocol, or middleware very easily. This is a very crucial feature in a system designed to work in a heterogeneous distributed environment. The users or applications may not expect all storage systems to support the same interfaces to talk to each other. And we cannot expect all applications to talk to all the different storage systems, protocols, and middleware. There needs to be a negotiating system between them which can interact with those systems easily and even translate different protocols to each other. Stork has been developed to be capable of this. The modularity of Stork allows users to insert a plug-in to support any storage system, protocol, or middleware easily.
Resource utilization: The data-aware batch scheduler (Stork) can control the number of concurrent requests coming to any storage system it has access to, and it makes sure that neither that storage system nor the network link to that storage system gets overloaded. Beside that it can make allocate and deallocate space such that space is always available on the required system.

Fig. 1 shows the effect of increased parallelism and concurrency levels on the transfer rate and CPU utilization. With the level of parallelism, we refer to the number of parallel streams used during the transfer of a single file; and with the level of concurrency, we refer to the number of files being transferred concurrently.

Full-size image
(a) Transfer rate vs parallelism/concurrency local area.
Full-size image
(b) Transfer rate vs parallelism/concurrency wide area.When the level parallelism and concurrency increase, the transfer rate incurred in the wide area transfers increases as expected (Fig. 1(b)). But for the local area transfers, the case is different. After a certain point they have a negative impact on the transfer rate (Fig. 1(a)). The transfer rate comes to a threshold, and after this point the overhead of using multiple streams and issuing multiple transfers starts causing a decrease in the transfer rate.
Full-size image
(c) Transfer rate vs parallelism local area.
Full-size image
(d) Transfer rate vs parallelism wide area.This shows that using a combination of concurrency and parallelism can result in higher performance than using parallelism only or concurrency only. It can also help in achieving the optimum transfer rate by causing lower load to the server
Full-size image
(e) CPU utilization vs parallelism/concurrency GridFTP server.
Full-size image
(f) CPU utilization vs parallelism/concurrency GridFTP client.This show the effect of increased parallelism and concurrency levels on the CPU utilization. While the number of parallel streams and the concurrency level increase, the CPU utilization at the client side increases as expected. On the server side, the same thing happens as the level of concurrency increases. But, we observe a completely opposite effect on the server side as the level of parallelism increases. With the increased parallelism level, the server CPU utilization starts dropping and keeps this behavior as long as the parallelism level is increased.

Storage Space Management:

One of the important resources that needs to be taken into consideration when making scheduling decisions is the available storage space at the destination. Some storage system support space allocation. However for some system which does not support the data placement scheduler needs to make the best effort in order not to overcommit the storage space. This is performed by keeping track of the size of the data transferred to, and removed from, each storage system which does not support space allocation.

Below figure shows how the scheduler changes the order of the previously scheduled jobs to meet the space requirements at the destination storage system. In this example, four different techniques are used to determine in which order to execute the data transfer request without overcommitting the available storage space at the destination: first fit, largest fit, smallest fit, and best fit.

Full-size image
Storage space management: Different techniques.

First fit: If the next transfer job in the queue is for data which will not fit in the available space, it is skipped for that scheduling cycle and the next available transfer job with data size less than or equal to the available space is executed instead.  The initial scheduling order is preserved, but only requests which will not satisfy the storage space requirements are skipped, since they would fail anyway and also would prevent other jobs in the queue from being executed.

Largest fit and smallest fit: Reordering of the queue is done and then request for largest or smallest file in the queue is executed. Both techniques have a higher complexity compared with the first fit technique, although they do not guarantee better utilization of the remote storage space.

Best fit: From name only it suggest that it finds the combination of jobs that can be best fitted, though this utilizes remote storage space best but it comes with a cost of running time as queue can be very large.

Kosar et al. experimented with an aim of  processing 40 gigabytes of data, which consists of 60 files each between 500 MB and 1 gigabyte. First, the files need to be transferred from the remote storage site to the staging site near the compute pool where the processing will be done. Each file is fed to separate process, so 60 jobs in queue, each job requires 1 file. The processing site has only 10 gigabytes of storage capacity, which puts a limitation on the number of files that can be transferred and processed at any time.

Traditional scheduler would start to transfer 60 transfer concurrently, because it is unaware of the storage space limitations. Ultimately in this scenario all jobs will fail.

On the other hand, Stork completes all transfers successfully by smartly managing the storage space at the processing site. Available storage spaces are filled by scheduler and as soon as the processing of a job completes its file is deallocated from the storage making space for new file whose job is waiting in queue.  The number of transfer jobs running concurrently  depend on the number of file exist in the storage space.

Stork also performs better than the traditional scheduler when restricted to 10 jobs at a time, and another fallacy is it is not as dynamic as stork and also does not utilizes storage space properly.

Data Workflow 

A distributed workflow system can be viewed as a large pipeline consisting of many tasks divided into substages, where the main bottleneck is remote data access/retrieval due to network latency and communication overhead. Just like pipelining techniques are used to overlap different types of job and execute them concurrently while preserving the task sequence and dependencies, we can order and schedule data movement jobs in distributed systems independent of compute tasks to exploit parallel execution while preserving data dependencies.

For this purpose, we expand the scientific workflows to include the necessary data placement steps such as stage-in and stage-out, as well as other important steps which support data movement, such as allocating and de-allocating storage space for the data, and reserving and releasing the network links.

Full-size image
Expansion of a scientific workflow to include data placement steps.

Like the data forwarding between pipeline stages is used to avoid wasting pipeline cycles when writing to a register and reading it again for the next instruction in the pipeline order, the data-aware workflow planner communicates with the data placement scheduler and computational job scheduler via the workflow execution tool to order jobs so that data stage-in and stage-out are managed to minimize the system overhead. For example, one approach is to schedule and send jobs to compute nodes that are “close” to the requested data.

Kosar et al. have done a preliminary implementation of data-aware workflow management where data-awareness components were added to the workflow planning tool Pegasus and workflow execution tool Condor DAGMan. This implementation consists of two parallel universes: a data subsystem, and a compute subsystem. These two subsystems are complementary, the first specializing in data management and scheduling and the latter specializing in compute management and scheduling. The orchestration of these two parallel subsystems is performed by the upper layer workflow planning and execution components. This implementation was used in coastal hazard prediction (SCOOP) and reservoir uncertainty analysis (UCoMS) applications.

Full-size image

The data-aware scheduler “Stork” and its interaction with other components in an end-to-end system.

In the UCoMS application, thousands of small files (10–100 kB) need to be moved to the execution sites. With the number of datasets gets larger, there is a significant increase in the transfer rate using Stork’s data fusion method i.e. to improve the transfer performance we implemented a data fusion method that merges smaller files into larger packages to be transferred in our local system where the input files are located. Afterwards the merged package is sent to the remote host. Once the merged package is transferred successfully, the files are unpackaged on the remote host.

Stork and its successors are steps taken towards a new paradigm in data intensive computing: data-aware batch schedulers. This trend will continue since the batch schedulers are bound to take the data dependencies and data movement into consideration when making scheduling decisions in order to handle the data-intensive tasks correctly and efficiently.

Categories: IEEE Papers
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: