Migrating Data from SQL to Single Parquet Format with Append Mode and Batch Processing in Python using Fastparquet
- Categories:
- tutorial
Data processing and analysis are crucial tasks in the field of data science and big data. Parquet is a popular columnar storage format for big data processing and analysis. It offers a number of advantages over traditional row-based storage formats, including improved performance, smaller storage footprint, and better data compression.
Fastparquet is a fast implementation of the Parquet file format that supports appending data to an existing file, making it a great choice for working with large datasets. By converting data from SQL to Parquet, we can greatly improve the performance of big data processing and analysis, making it easier and faster to work with large datasets.
In this blog post, we’ll go over the steps involved in converting data stored in SQL to the Parquet format in Python, using the Pandas and Fastparquet packages.
Setting up the Environment
Before we dive into the main topic, we’ll need to set up the environment by installing the required packages. To convert SQL data to Parquet format in Python, we’ll be using the Pandas package for data manipulation, the SQLAlchemy package version 1.x to connect to a SQL database, and the Fastparquet package to write the data to a Parquet file.
To install these packages, you can use the following pip commands:
pip install pandas
pip install 'SQLAlchemy<2.0.0'
pip install fastparquet
pip install psycopg2
Reading Data from SQL
To read data from a SQL database, we’ll first need to create an SQLAlchemy engine to connect to the database. In the following code, we’ll connect to a local PostgreSQL database named “example_db”.
import pandas as pd
import sqlalchemy
engine = sqlalchemy.create_engine('postgresql+psycopg2://exampleuser:examplepassword@localhost/example_db')
Next, we’ll use the Pandas read_sql function to query the data from the SQL database. In this example, we’ll query the “example_table” table for the “id” and “email” columns.
df = pd.read_sql("SELECT id, email FROM example_table", engine)
However, if the dataset is very large, it’s best to process it in batches to prevent memory issues. In the following code, we set the batch size to 100, and determine the number of batches required to process the entire dataset.
batch_size = 100
total_rows = pd.read_sql("SELECT count(*) FROM example_table", engine).iloc[0, 0]
num_batches = total_rows // batch_size + (total_rows % batch_size > 0)
We then loop through each batch and read the data from the SQL database using the following code:
for i in range(num_batches):
start = i * batch_size
end = (i + 1) * batch_size
df = pd.read_sql("SELECT id, email FROM example_table LIMIT {} OFFSET {}".format(batch_size, start), engine)
Writing Data to Parquet
With the data read from the SQL database, we can now write it to a Parquet file using the fastparquet
. In the following code, we use the write function from the fastparquet
to write the data to a Parquet file named “output.parquet”.
from fastparquet import write
write('output.parquet', df)
If we’re processing the data in batches, we need to append the subsequent batches to the existing Parquet file. The Fastparquet package supports appending data to a Parquet file by passing the “append” argument as True.
for i in range(num_batches):
start = i * batch_size
end = (i + 1) * batch_size
df = pd.read_sql("SELECT id, email FROM example_table LIMIT {} OFFSET {}".format(batch_size, start), engine)
# check if file exists. if not, create it. if so, append to it.
if i == 0:
write('output.parquet', df)
else:
write('output.parquet', df, append=True)
Complete Python Script
You can get the script below on my Github.
Python read from sql to parquet
import pandas as pd
import sqlalchemy # use version 1.x
from fastparquet import write
# connect to database
engine = sqlalchemy.create_engine('postgresql+psycopg2://exampleuser:examplepassword@localhost/example_db')
# batch processing
batch_size = 100
total_rows = pd.read_sql("SELECT count(*) FROM example_table", engine).iloc[0, 0]
num_batches = total_rows // batch_size + (total_rows % batch_size > 0)
for i in range(num_batches):
start = i * batch_size
end = (i + 1) * batch_size
df = pd.read_sql("SELECT id, email FROM example_table LIMIT {} OFFSET {}".format(batch_size, start), engine)
# check if file exists. if not, create it. if so, append to it.
if i == 0:
write('output.parquet', df)
else:
write('output.parquet', df, append=True)
Conclusion
In this blog post, we went over the steps involved in converting data stored in SQL to the Parquet format in Python, using the Pandas and Fastparquet packages. By converting data from SQL to Parquet, we can greatly improve the performance of big data processing and analysis, making it easier and faster to work with large datasets