A Remote Data Aggregation Pipeline to Provide Machine Learning Datasets

This blog explores how we aggregate datasets remotely, keep track of already generated datasets, and download datasets from a remote server using a generalizable architecture pattern.


Multiple authors
11 min read
A Remote Data Aggregation Pipeline to Provide Machine Learning Datasets

In our NRW.EFRE funded research project KI Design, we spent a lot of time in training and testing convolutional networks together with our fellows from the Bochumer Institute of Technology (BO-I-T). Our goal: Using Artificial Intelligence (AI) to make photo editing more comprehensive and easier at the same time. Details about the project and the motivation behind it, and some results can be found on our project homepage and in this blog post.

Figure 1: Example of an AI alpha-matted image of a wood duck.

Today, we want to share details about our custom-made approach for a remote data aggregation and data transfer pipeline that we developed to support seamless integration of data preprocessing and storage into the training procedure. We think that this topic is of interest to the machine learning community, because the generation, versioning, and handling of datasets for the training of machine learning algorithms constitute a challenge for many researchers and developers.

Introduction

A major challenge in nearly every project in which machine learning and deep learning are applied, is set in the data preparation and augmentation for the training process. As nowadays many approaches and algorithms are data-driven, having training data in the right amount and quality even can make the difference between a project's success or failure. This also includes a well-organized data infrastructure to store data, possibly in different versions of datasets.

In our joint project KI Design, we face the setting that our server cluster is split up geographically, having a data storage server at the BO-I-T laboratory and a computing server at the IMG.LY site. This, of course, makes the design of a training pipeline a bit more demanding in how to efficiently use resources. After searching for an existing software solution, we decided to develop our custom-made approach, adapted to our requirements that we formulated as follows:

  • The efficient workflow between data (pre)processing, storage, and provisioning on one side, as well as training initiation and execution on the other side
  • A possibility to requests data preparations and augmentations based on configurations generated at the training side
  • Versioning of datasets or configurations to ensure reproducibility
  • Availability of already computed datasets to omit preparation and processing of equal data requests multiple times
  • A notification process implemented on the data server to signal the availability of a dataset and trigger download as well as training processes to the computation side
  • Integration into a TensorFlow training pipeline

Our Custom-made Approach - Overview


In this part, we briefly sketch our concept, before diving into our technical implementation in the next part. To train new models on our work-thirsty computing server at IMG.LY, we need training data. As mentioned before, data is stored on the data server at BO-I-T. We work on different tasks with different experiments in our project, thus we need to perform several different pieces of training of machine learning algorithms, many of which require different datasets.

A dataset can be created in a data preparation process. In our context, the data preparation can be based on one of several “raw datasets” such as the COCO, the DUTS, or some self-assembled datasets and might include data pre-formatting (e.g. adjusting image size or section) and augmentation (e.g. image rotation, brightness adjustment, or combining foreground subjects with different backgrounds). Our idea was to use the data server for the whole process of data preparation and augmentation, to not waste valuable resources on the client for this.

Figure 2: Example of an image data preparation process. Left: raw image data; top right: cropped and flipped image; bottom right: cropped and rotated image with adjusted exposure.

The pre-training data aggregation phase can be described like that:

  1. The client on the computing server should be able to request datasets from the data server. To do so, the client should transmit a (parameter) description of an exact configuration of the required dataset (image type and resolution size, as well as meta information) to the data server.
  2. Waiting for the requested dataset to be generated and provided, the computing server can spend its resources into other scheduled training jobs. (Computation time is money!)
  3. Meanwhile, the data server checks if the requested dataset is already existent — prepared from a previous request — or whether a new data generation process needs to be launched.
  4. If a dataset is available and ready to be downloaded, either directly after the request (because it was already created in a previous request with the exact same configuration) or after the time it took to create the new dataset on the server, it sends back a notification message to the client.
  5. Receiving this response, the computing server can download the dataset and initiate the training process.

To implement this concept, we set up an architecture that is organized by three services, c.f. Figure 3:

  1. a dataset-client,
  2. a dataset-server and
  3. a dataset-handler.
Figure 3: Schema of the data transfer process

For our implementation, we developed a dataset-client service that provides the functionality required to cope with the client-side. The dataset-server service is the counterpart on the server-side. While the previous two components are pretty straightforward to understand, the third part, the dataset-handler, requires a little explanation: not only the implementation and training of machine learning models is part of our research project. With similar importance, we develop new strategies and approaches for data preparation. Thus, the server can not be provided with all preparation functions a-priori. Instead, it needs to be able to get to the required code for, e.g., new augmentation procedures and other algorithms, in their respective latest versions just before the preparation starts. The data-handler is the concept for this: it is the adjustable tool the server uses, to perform the data preparation.

Our Custom-made Approach - Technical Implementation

In this part, we do not aim to provide a full description of our implementation. Instead, our goal is to give some insights into which frameworks we used and how we implemented the interfaces between the client, the data server, and the data handler.

Framework and Languages

To set up our remote data pipeline we used a combination of different tools, frameworks and programming languages:

  • Apache2
  • Node.js
  • Python

For our data server, we used Apache2 as a webserver to allow requests from outside and redirect them. Our REST API and WebSocket connection are set up in Node.js. Here we use the Express.js and WebSocket libraries to handle the REST requests and establish a WebSocket connection. Further server-sided processes such as checking the availability of datasets by a hashtag, setting up a virtual environment (to be sure that all libraries are available on the server we set up a pipenv environment), and trigger the data handler configuration are written in Python. Here, we used libraries like hashlib, subprocess and zip file for the implementation as well as some other basic libraries.

Besides the data-server service, the data-handler and the data-client are built up in Python. Here we used libraries as requests, asyncio, and WebSocket to establish the client-sided connection with the data server. Further used libraries of the data handler strongly depend on the task to perform and thus vary a lot. Just to call some examples: we frequently use libraries as pillow, imageio, or OpenCV for image manipulation.

Client Side

The dataset client is running on the client and triggers the whole data aggregation process. It covers the following functionalities:

  • requesting datasets,
  • establishing a WebSocket connection,
  • downloading aggregated datasets,
  • starting the model training.

To get preprocessed training data from the server, a REST-based post request is sent from the client to the data server. This request includes a configuration, defining the exact dataset attributes, as well as meta information that specify the raw dataset that should be used for the preprocessing and the version of data-handler. Here is a code snippet showing the required post parameters and the data type:

request_dataset(dataset_name: str, handler_version: str, dataset_config: Dict)

The parameter dataset_name and handler_version are passed as strings and the config settings dataset_config in a dictionary. Different handlers are assigned to different projects and are customized to their specific needs. Therefore, the config settings vary on each project. But, to get an idea of some configurations here is a (short) example:

short_example_ config = {
        'input_attributes': ['image'],
        'batch_size': 100,
        'image_size': [1024, 1024],
        'variations_per_sample': 1,
        'crop_size': [800, 800],
        'random_crop_centered': True,
        }

It shows some basic parameters like, e.g., image type, image resolution as well as batch size, but also options of further operations as centered cropping. These settings serve as instructions for the data_handler to create the dataset and are further fully customizable and adapted to a use case.

Receiving a successful response of the post request, the dataset client will establish a WebSocket connection. This allows continuous communication between the client and the data server, which is important for a server-sided notification in case of the finished data preprocessing (as an alternative, we could have implemented a regularly scheduled client-side polling to check for dataset states). Here we depict an example of our asyncio event loop command:

asyncio.get_event_loop().run_until_complete(wait_for_completion('topic_id'))

With this loop, the client waits until it receives a related response from the server. If a success token is returned, the client leaves the loop and starts to download the dataset and further, the actual training process (as soon as the training scheduler assigns resources, but this is a different story to tell).

Server Side

On the server-side, two services are running: the dataset_server and the dateset_handler. The dataset_server handles the communication with the client and receives configuration requests as well as download requests. Furthermore, it checks the availability of datasets and if necessary triggers the data_handler to run a dataset creation process. In summary, the dataset_server covers the following functionalities:

  • receiving request over REST service
  • set up websocket communication protocol
  • check for existing datasets
  • install dataset_handler and initialize dataset creation
  • send notification to the client

Using a REST-API as an entry point, the data server receives the post request of the client and checks, if the required dataset was already created. This check is being done by a string matching of a Universally Unique Identifier (UUID) of a configuration: Using an md5 hash, each request is converted to a special UUID, generated from the transferred dataset configurations. Information that is taken into account for the UUID/hash generation is the dataset name, the handler version as well as the md5 hash of import config settings. We set the dataset name and handler version in front of our md5 hash as it might be useful information in case we run out of disk space. Here is our function of the generation of the UUID.

def hash_dataset_name(dataset_name: str, handler_version: str, dataset_config: str):

    h = hashlib.md5()
    h.update(dataset_config.encode("utf-8"))

    return "".join([dataset_name, "_", handler_version, "_", h.hexdigest(), ".zip"])

If the UUID matches with an existing dataset, a notification to the client will be returned, pointing to the related zip archive on the datastore. Otherwise, the data_handler will be advised to start data preprocessing. However, before the generation process can be started, the correct data_handler has to be downloaded and installed. Different data_handler versions are stored in a GitHub repository, available to be downloaded from data_server. This is dispatched by application of a Python integrated bash command, running a “pip install”:

subprocess.call(["pipenv","run","pip", "install", "--no-cache-dir", "--upgrade", "--process-dependency-links", "git+https://github.com/imgly/dataset-handlers@{}#egg=dataset-handlers".format(handler_version)])

Note: We need pip in version 18 here because it is the last version supporting the “--process-dependency-links” option, which ensures that the dependencies inside of dataset-handlersare installed. The data_handler version is directly passed into the pip command, linking to the corresponding GitHub repository and handler release version.

Data Handler

After successful installation of the dataset_handler, we can simply import the handler in Python: import dataset_handlers

As described earlier, the core functionality of the data_handler is to create a dataset based on a given configuration and return zipped .tfrecord files. For those of you who are not familiar with the .tfrecord file format: it is a special TensorFlow format for handling data as binary records in a sequence. This has the advantage of using less disc space, being faster in copying as well as being more efficient to read data from disk. But let’s go on with our data_handler: Depending on the project and use case, the data handlers vary strongly and offer different methods. This makes an example a bit difficult at this point. But we can present some abstract methods of our class DatasetHandler():

We start with the initialization method or constructor:

class DatasetHandler():
    def __init__(self, config: Dict[str, any], base_data_path: str):
            self.base_data_path = base_data_path
          for k, v in config.items():
            setattr(self, k, v)

The input variables from the client are passed into the constructor method and set as class attributes. Additionally, the base path of the raw data is required and set as an attribute. These are the main settings needed for data generation. Furthermore, the following methods are important for a generation process:

  • a static method keeping all allowed config operations,
  • a method to create the correct raw data path and
  • a method that returns a .tfrecord file list.

      def as_tfrecord(self) -> List[str]:
        pass
    def config_options() -> Dict[str, Any]:
        pass
    def tf_records_mapper(self, files: List[str]) -> tf.data.TFRecordDataset:
        pass

Finally, after successfully dataset creation, the server creates a .zip from the .tfrecord Mfiles and returns the file name to the client over the WebSocket connection. The client can now download the dataset.

Summary

Today we introduced our concept and some insights into the implementation of our developed remote data pipeline used to prepare and provide training data for complex machine learning training in our research project. Using this pipeline, we have solved the challenging task of a split infrastructure without suffering from strong performance-related deficits. This remote data transfer was one of the first milestones in our project to be reached and further the base of a successful collaboration.

The advantage of our presented solution is the shared use of resources: The computing server focuses its resources entirely on training models, while the data server handles the labor-intensive creation of datasets. To give you an idea of why this is so important: as we work with large-size image data, a full augmentation process can easily take up to 12 hours. Waste of valuable computing resources if tasks are not split up. Furthermore, we usually start multiple training sessions all requiring the same dataset at the same time. Without our data pipeline, each experiment would create its own version of the same data set and will block even more resources. With our solution, this is solved in a far more efficient way.
Our developed solution could be extended in further versions by features like a monitoring tool with visualization. Important information and statistics to display could be, for example, the status of currently running data preparation processes, a list of all cached datasets including their configurations, as well as statistics about usage and downloads. This could help to keep the storage more structured and clean.

All in all, we were very content with how fast the communication and transfer between our servers take place and are very content with our self developed approach.

This project was funded by the European Regional Development Fund (ERDF).

GO TOP