About: Software Engineer experienced in building applications using Python, Rust (Web), Go, and JavaScript/TypeScript. I am actively looking for new job opportunities.
Location:
USA
Joined:
Jun 25, 2020
Consuming paginated API using periodic Celery task in a Django Application
Publish Date: Feb 8
0 0
Introduction
API (Application Programming Interface) consumption is the new normal in contemporary times as many software products have shifted focus on decoupling Backend and Frontend codebases. Backend Engineers are tasked with writing consumable APIs that their Frontend counterparts consume. In some cases, even Backend Engineers utilize some other API services to accomplish their tasks.
Some services provide an enormously large dataset so making them accessible at a single API call might not be great. Pagination then comes to the rescue. Many APIs are now paginated to make available a fraction of the data. To access other fractions, you need some extra tasks.
This article demonstrates how to set up Celery background tasks to consume paginated APIs periodically. We'll explore iterative and recursive approaches for APIs paginated using page parameters and those using next URLs. The fetched data will be stored in a Django model, overwriting previous data. Note that persisting historical data is outside the scope of this article but will be addressed in a future post on building a data warehouse.
Prerequisite
Basic familiarity with Django is assumed. Refer to the Django tutorial for an introduction.
...# Application definition
INSTALLED_APPS=['django.contrib.admin','django.contrib.auth','django.contrib.contenttypes','django.contrib.sessions','django.contrib.messages','django.contrib.staticfiles','core.apps.CryptoappConfig',#Our new app
]...
It is time to set up our application to utilize Celery. To do this, create a file aptly named celery.py in your project's directory and paste the following snippet:
importosfromceleryimportCelery# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_excel.settings')app=Celery('django_excel')# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings',namespace='CELERY')# Load task modules from all registered Django apps.
app.autodiscover_tasks()@app.task(bind=True)defdebug_task(self):print(f'Request: {self.request!r}')
That was directly lifted from Celery Django documentation. Ensure you modify lines 6 and 8 to reflect your project's name. The namespace in line 14 enables you to prefix all celery-related configurations in your settings.py file with CELERY such as CELERY_BROKER_URL.
Note: Capitalization of Celery-Related Configurations
Because you are literarily providing constants, the celery-related configurations in your settings.py file are capitalized. For instance, one of the configurations is beat_schedule which in Django, becomes CELERY_BEAT_SCHEDULE.
Next, open open your project's __init__.py and append the following:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from.celeryimportappascelery_app__all__=('celery_app',)
To conclude celery-related configurations, let's set the following in settings.py:
We are using redis as our broker. You can opt for RabbitMQ which is supported out-of-box by celery.
In the settings above, I am linking CELERY_BROKER_URL to an environment variable named REDIS_URL. It normally should look like redis://127.0.0.1:6379 on a Linux system. That means I could have set my CELERY_BROKER_URL and CELERY_RESULT_BACKEND as:
Note that CELERY_RESULT_BACKEND is optional as well as CELERY_ACCEPT_CONTENT, CELERY_TASK_SERIALIZER, and CELERY_RESULT_SERIALIZER. However, not setting the last three might result in some runtime errors mostly when dealing with databases in asynchronous email broadcasting with celery.
The stage is now set, let's set up our database. We will be consuming CoinGecko's API and will be saving some data.
```bash :Terminal:
virtualenv➜ django_excel git:(main) python manage.py makemigrations # create migration file(s)
virtualenv➜ django_excel git:(main) python manage.py migrate # create the tables in the db
Optionally, you can create a superuser:
```bash :Terminal:
virtualenv➜ django_excel git:(main) python manage.py createsuperuser
Follow the prompts.
Step 3: Create and Register Periodic Tasks
Here is the juicy part:
...importloggingimporttimeimportrequestsfromceleryimportshared_taskfromdjango.confimportsettingsfromcore.modelsimportFullCoinlogger=logging.getLogger(__name__)defbuild_api_url(page:int)->str:"""Build the API URL."""market_currency_order='markets?vs_currency=usd&order=market_cap_desc&'per_page=f'per_page=50&page={page}&sparkline=false'returnf'{settings.BASE_API_URL}/coins/{market_currency_order}{per_page}'deffetch_coins_iteratively()->Generator[dict,None,None]:"""Fetch coins data from API using generator."""page=1whileTrue:try:url=build_api_url(page)response=requests.get(url)coin_data=response.json()# Check for rate limit response
ifisinstance(coin_data,dict)andcoin_data.get('status',{}).get('error_code')==429:logger.warning("Rate limit exceeded. Waiting 60 seconds...")time.sleep(60)continue# Check for empty response (end of pagination)
ifnotcoin_data:breakyieldfromcoin_datalogger.info(f"Fetched page {page} with {len(coin_data)} coins")page+=1time.sleep(1)# Be nice to the API
exceptrequests.exceptions.RequestExceptionase:logger.error(f"Request failed on page {page}: {e}")raise@shared_task(bind=True,autoretry_for=(Exception,),retry_backoff=True,retry_backoff_max=600,max_retries=5,)defget_full_coin_data_iteratively_for_page(self)->None:"""Get full coin data iteratively for each page."""try:# Use list comprehension to collect coins in batches
batch_size=100coins_batch=[]forcoininfetch_coins_iteratively():coins_batch.append(coin)iflen(coins_batch)>=batch_size:logger.info(f"Processing batch of {len(coins_batch)} coins")store_data(coins_batch)coins_batch=[]# Process remaining coins
ifcoins_batch:logger.info(f"Processing final batch of {len(coins_batch)} coins")store_data(coins_batch)exceptExceptionase:logger.error(f"Failed to process coins: {e}")raiseself.retry(exc=e)
The build_api_url helps continuously build CoinGecko API url based on the supplied page number. The BASE_API_URL is:
`fetch_coins_iteratively` is the core of the program. It starts with the first page and does an "infinite" loop which breaks only when there's no data returned by the API using the iterative strategy.
Its recursive alternative is:
```python
def fetch_coins_recursively(page: int = 1) -> Generator[dict, None, None]:
"""Fetch coins data from API recursively using generator."""
try:
url = build_api_url(page)
response = requests.get(url)
coin_data = response.json()
# Check for rate limit response
if isinstance(coin_data, dict) and coin_data.get('status', {}).get('error_code') == 429:
logger.warning("Rate limit exceeded. Waiting 60 seconds...")
time.sleep(60)
yield from fetch_coins_recursively(page)
return
# Base case: empty response (end of pagination)
if not coin_data:
return
# Process current page
yield from coin_data
logger.info(f"Fetched page {page} with {len(coin_data)} coins")
# Be nice to the API
time.sleep(1)
# Recursive case: fetch next page
yield from fetch_coins_recursively(page + 1)
except requests.exceptions.RequestException as e:
logger.error(f"Request failed on page {page}: {e}")
raise
Then there is the get_full_coin_data_iteratively_for_page which is decorated by shared_task (for task autodiscovery). We supplied some parameters:
bind=True to access task instance via self
autoretry_for=(Exception,) to auto-retry on exceptions
retry_backoff=True for exponential backoff
max_retries=5 to limit retries to 5
For this task to be periodic, we must add it to the CELERY_BEAT_SCHEDULE in settings.py:
It schedules this task to run every 3 minutes ('*/3') using crontab.
These implementations were with performance in mind. However, there is still room for improvement.
Step 4: Bonus
There are APIs whose paginations are not page-based but use the next (default DRF pagination strategy). For these systems, the last bits of data have empty next. That's the breaking point:
Enjoyed this article? I'm a Software Engineer, Technical Writer, and Technical Support Engineer actively seeking new opportunities, particularly in areas related to web security, finance, healthcare, and education. If you think my expertise aligns with your team's needs, let's chat! You can find me on LinkedIn and X. I am also an email away.
If you found this article valuable, consider sharing it with your network to help spread the knowledge!