Scale to out-of-memory CSV datasets from cloud object storage providers

Laay Trivedi
5 min readOct 28, 2020

--

We will use the Python ObjectStorageDataset(OSDS) library. OSDS supports CSV files from Google Cloud Storage(GCS), AWS S3, local storage, and similar.

Python’s OSDS library which stands for Object Storage DataSet provides a convenient way of importing large, out-of-memory datasets from cloud storage services like Google Cloud Storage (GCS) and Amazon S3 and iterating over a dataset when training your machine learning model with PyTorch.

The OSDS library provides standard PyTorch interfaces like Dataset and DataLoader for datasets that are stored in CSV format, irrespective of whether they are stored as a public cloud object or on a local file system. The result is a PyTorch tensor of the “batch of numeric values” from the CSV based dataset for each call to the DataLoader class.

This post is divided into two parts, in the first part I will explain the OSDS API, and in the latter part, I will use an example to demonstrate how to use this library. So, let us get started:

Understanding OSDS API

To instantiate the ObjectStorageDataset, you must specify a URL-style path (like a Unix glob string) pointing to the location of your CSV-formatted dataset. For example, f”s3://” can be used to call datasets directly from the Amazon S3, and f”gcs:// initiator can be used to import datasets from Google Cloud storage. When using ObjectStorageDataset with out-of-memory datasets, the batch size parameter used in the example is needed.

The ObjectStorageDataset is not available by default when you install PyTorch, so you need to install it separately in your Python environment using pip, and once installed, import the class in your runtime using:

pip install osdsfrom osds.utils import ObjectStorageDataset

To understand how we can import large datasets from the cloud storage, we will use “NYC-TLC taxi fare” which is a large public dataset (consisting of more than 100M records per calendar year) hosted on Amazon S3. Please review the reference link to learn more about the dataset. Here, we will import a total of 12 objects (one CSV file per each month of 2017) located in the NYC-TLC bucket of the s3 cloud. Finally, we will predict the taxi fare amount given the features like passenger_count, trip_distance, payment_type, drop off and pick up location, etc. using a linear regression model trained using mini-batch gradient descent. The data can be directly loaded from the cloud storage using the following code snippet:

train_ds = ObjectStorageDataset(f"s3://nyc-tlc/trip   
data/yellow_tripdata_2017-0*.csv",
storage_options = {'anon' : True },
batch_size = 16384,
eager_load_batches=False)

By default, the ObjectStorageDataset is designed to instantiate in the shortest amount of time possible in order to start the iterations of gradient descent. In practice, this translates to ObjectStorageDataset caching in-memory and on-disk just the dataset objects (typically from object storage) needed to return just the first batch of examples from the dataset, as illustrated here:

[figure 1]

In the example on figure 1, ObjectStorageDataset is instantiated using a fictional src S3 bucket containing CSV-formatted objects with a complete URL-style path as s3://src/data/part*.csv. The partitions of the dataset (i.e. the CSV formatted objects with the names matching part*.csv) reside under the data folder in the src bucket. Note that the size (number of rows) of the partitions is entirely independent of the batch_size used to instantiate ObjectStorageDataset, meaning that the batch_size can vary while the size of the partitions stays constant.

In general, the number and the size of the dataset partitions vary depending on the specifics of the machine learning project, although it is better to choose the size of the partitions to be in the range of 100–200MB for efficient transfer over a network connection if you are working with commonly deployed 100Mbps network interfaces.

The Simple Linear Regression model

train_ds = ObjectStorageDataset(f"s3://nyc-tlc/trip   
data/yellow_tripdata_2017-0*.csv",
storage_options = {'anon' : True },
batch_size = 16384,
eager_load_batches=False)
train_dl = DataLoader(train_ds, batch_size=None)

Each one of the yellow_tripdata_2017–0*.csv objects consists of around 9.7M examples which are represented in the CSV format as one line per row. After ObjectStorageDataset is instantiated with a batch_size of 16,384 in the Python runtime of a compute node, the implementation of ObjectStorageDataset triggers a network transfer of nearly 600 dataset partitions from the S3 bucket to the file system cache of the compute node (dataset partitions = number_of_rows/batch_size).

Avoid setting eager_load_batches to True for datasets that do not fit in the node and cluster memory since this may lead to out of memory conditions. If storage_options are not specified or None, then assumed to be {‘anon’: True} which uses unauthenticated access to object storage. To specify storage platform-specific authentication, use {‘anon’: False} instead. If dtype is not specified or None, the widest possible data type is used for numeric data which prevents loss of information but uses extra memory.

import torch as ptLEARNING_RATE = 0.03optimizer = pt.optim.SGD(model.parameters(), lr=LEARNING_RATE)
device = "cuda:0" if pt.cuda.is_available() else "cpu"

model = pt.nn.Linear(7, 1).to(device)

Here we are trying to predict the total taxi_fare amount using features like passenger_count, trip_distance, payment_type, drop off and pick up location, etc. A detailed description of the numeric features is shown in the table.

The number of features is 7. The learning rate is 0.03 and the number of ITERATION_COUNT is 300. In total, the data consists of nearly 116M rows, and we are training using 4.9M records (BATCH_SIZE* ITERATION_COUNT).

GRADIENT_NORM = 0.1

ITERATION_COUNT = 300

losses = []
for iter_idx, batch in zip(range(ITERATION_COUNT), train_dl):
y_, X_ = batch[:, -1].to(device), batch[:,3:10].to(device)
y_est = model(X_)
mse = pt.mean( (y_est - y_) ** 2 )
mse.backward()
pt.nn.utils.clip_grad_norm_(model.parameters(), GRADIENT_NORM) if GRADIENT_NORM else None optimizer.step()
optimizer.zero_grad()
losses.append(mse.data.item())
if (iter_idx % 100 == 0):
print(f"Iteration: {iter_idx}, MSE: {mse.data.item()}, W: {model.weight.data.squeeze()}")

Conclusion

I have used the simple linear regression model to demonstrate how a user can easily load out-of-memory data from cloud storage providers like Amazon AWS and GCP using OSDS.

[TABLE -1]

All of the capabilities of OSDS described in this article apply to datasets that reside in serverless object storage services from major cloud providers. Although the example in this article focuses on using Amazon S3, you can easily repoint an instance of the OSDS library to a different cloud provider (including local filesystem) by modifying the protocol specified in the URL-style glob named parameter of the class (TABLE-1).

Links:

  1. https://github.com/osipov/osds
  2. https://osds.readthedocs.io/en/latest/gcs.html

--

--