This is part 3 of a 4 part series of articles where I explain how I discovered and purchased my laptop by building a web application which scrapes a local PC parts forum and sends automated email alerts when posts featuring specific keywords appear:
- Part 1: Let Your Next Laptop Find YOU!
- Part 2: Django and Scrapy
- Part 3: Celery, Mailgun and Flower (this page)
- Part 4: Deploying and using CarbAlert
CarbAlert on GitHub: https://github.com/dinofizz/carbalert
Celery
Celery is a distributed task queue framework. In conjunction with a message broker (in my case Redis) it can be used to process asynchronous tasks as well as schedule periodic tasks. I am using both of these features:
- A periodic task is run every 5 minutes to initiate the Scrapy
CarbSpider
to scrape and scan the first page of the Carbonite Laptop forum index page for new threads featuring search phrases of interest. - From within the
CarbPipeline
activity I push asynchronous email tasks for Celery to handle. This separates the sending of my email notifications from the parsing of the thread metadata.
My Celery tasks are defined in a tasks.py
file within my CarbAlert project. I must admit that I did battle quite a bit with my Celery configuration. Import statements and task recognition depend on the location from which the Celery process is invoked (I think). I also need to reference my Django models in a Scrapy pipeline which is kicked off from a Celery task, and I definitely got a bit confused trying to get everything to play nicely together. Honestly this was one of those situations where I tried many different things, and once I had it working I kind of left it at that.
From what I can recall my specific problems were:
- Import statements for Django models in the
CarbPipeline.py
file (which is a Scrapy process initiated by a Celery task). - Using the correct/similar/same decorators for the actual Celery tasks. I recall trying to understand why my tasks were not being recognised at Celery start-up.
I do want to return to Celery and try resolve all the things I didn’t understand. Next project.
The code for my tasks can be found below. The scrape_carbonite
task is run every 5 minutes and simply kicks off a CrawlerProcess
with my CarbSpider
. The send_email_notification
tasks is initiated from within the CarbPipeline
. I am using the Requests library to send an HTTP request to the Mailgun API which in turn will send an email to the user associated with the search phrase hits for a forum post. More on Mailgun later.
app.conf.beat_schedule = {
"scrape-every-300-seconds": {
"task": "carbalert.carbalert_scrapy.carbalert_scrapy.tasks.scrape_carbonite",
"schedule": 300.0,
}
}
@app.task
def scrape_carbonite():
process = CrawlerProcess(settings=get_project_settings())
process.crawl(CarbSpider)
process.start()
@shared_task(base=MailgunAPITask, bind=True)
def send_email_notification(
self, email_address, phrases, title, text, thread_url, thread_datetime
):
logger.info(f"Received alert for {email_address} for thread title: {title}")
subject = f"CARBALERT: {title}"
phrase_list = ""
for phrase in phrases:
phrase_list += f"{phrase}\n"
text = f"{phrase_list}\n{thread_datetime}\n\n{title}\n\n{text}\n\n{thread_url}\n\nEND\n"
mailgun_url = f"https://api.mailgun.net/v3/{self.mailgun_domain}/messages"
mailgun_from = f"CarbAlert <{self.mailgun_email}>"
try:
logger.info(f"Sending mail to {email_address}")
response = requests.post(
mailgun_url,
auth=("api", self.mailgun_api_key),
data={
"from": mailgun_from,
"to": [email_address],
"subject": subject,
"text": text,
},
)
if response.status_code is not 200:
logger.error(
f"Unexpected error code received on Mailgun response for email to {email_address}. "
f"Code: {response.status_code}, Raw {response.raw}"
)
response.raise_for_status()
except Exception as ex:
logger.error(f"Error sending mail to {email_address}: {ex}")
raise ex
Link to code: https://github.com/dinofizz/carbalert/blob/master/carbalert/carbalert_scrapy/carbalert_scrapy/tasks.py
My docker-compose.yml
files specifies the command line arguments for the celery worker and celery beat processes, as shown below.
Mailgun
I decided that I wanted to try out a transactional email API for my email notification feature for this CarbAlert project. Mailgun offers an easy to use API with a pay-as-you-go tier that allows for up to 10000 emails to be sent every month for free. As this was a personal project where I had at most two active users during its lifetime I was not worried about sending more then 10000 emails a month. From the Mailgun dashboard you can see the number of emails sent, and how many were delivered successfully and how many failed delivery.
Mailgun setup requires one to register a Mailgun account and create a Mailgun “domain” (this is not an actual internet domain, more like an application which is associated with one of your own real internet domain names). To allow Mailgun to send and receive email associated with your actual domain you will need to add a few TXT entries to your DNS configuration. For each registered Mailgun domain you will be given an API key which can be used to send and receive email via HTTP requests to the Mailgun API. This is all explained really well in the Mailgun documentation.
To use the Mailgun API from within my Celery task I pass in the domain, “from” email address and the API key via command line arguments to the celery worker process:
$ python -m celery -A carbalert.carbalert_scrapy.carbalert_scrapy.tasks worker
--loglevel=info -f celery_worker.log --max-tasks-per-child 1
--email "${MAILGUN_EMAIL}" --key ${MAILGUN_API_KEY} --domain ${MAILGUN_DOMAIN}
The command line argument key words are are registered within the tasks.py
file. The values for the arguments are bound to properties in a MailgunAPITask
class. This is the “base’ task for my send_email_notification
task specified above, and so the properties are directly accessible from within the task function.
See below the Celery configuration which binds the arguments to the properties:
class MailgunAPITask(Task):
abstract = True
mailgun_api_key = None
mailgun_email = None
mailgun_domain = None
class MailgunArgs(bootsteps.Step):
def __init__(
self, worker, mailgun_domain, mailgun_email, mailgun_api_key, **options
):
MailgunAPITask.mailgun_domain = mailgun_domain[0]
MailgunAPITask.mailgun_email = mailgun_email[0]
MailgunAPITask.mailgun_api_key = mailgun_api_key[0]
logger = get_task_logger(__name__)
app = Celery("tasks")
app.conf.broker_url = "redis://redis:6379/0"
app.user_options["worker"].add(
Option("--domain", dest="mailgun_domain", default=None, help="Mailgun domain")
)
app.user_options["worker"].add(
Option(
"--email",
dest="mailgun_email",
default=None,
help='Mailgun "from" email address.',
)
)
app.user_options["worker"].add(
Option("--key", dest="mailgun_api_key", default=None, help="Mailgun API key")
)
app.steps["worker"].add(MailgunArgs)
Link to code: https://github.com/dinofizz/carbalert/blob/master/carbalert/carbalert_scrapy/carbalert_scrapy/tasks.py
Celery apps in Docker containers
See below the relevant sections from my docker-compose.yml
file which describes the command line parameters and environment variables used to start the Celery processes. The “worker” and the “beat” Celery process each run in an independant container:
...
celery_worker:
build: .
working_dir: /code
command: celery -A carbalert.carbalert_scrapy.carbalert_scrapy.tasks worker --loglevel=info -f celery_worker.log --max-tasks-per-child 1 --email "${MAILGUN_EMAIL}" --key ${MAILGUN_API_KEY} --domain ${MAILGUN_DOMAIN}
volumes:
- .:/code
depends_on:
- web
- redis
environment:
- SCRAPY_SETTINGS_MODULE=carbalert.carbalert_scrapy.carbalert_scrapy.settings
celery_beat:
build: .
working_dir: /code
command: celery -A carbalert.carbalert_scrapy.carbalert_scrapy.tasks beat --loglevel=info -f celery_beat.log
volumes:
- .:/code
depends_on:
- celery_worker
...
The complete listing for the tasks.py class can be found here:
Flower
I’m using Flower as a front-end for monitoring the CarbAlert Celery tasks. This is useful as I can see time-series data for the Carbonite scraping and email sending task, the data that is being passed to the tasks as well as the status of the completed task. There is very little configuration, as it will inspect your existing Celery configuration to determine the tasks to inspect and which broker is being used.
The Flower portal is available on my host at http://carbalert.dinofizzotti.com/flower. Flower provides a few different authentication mechanisms to control access to the Celery metrics. I am using GitHub OAuth, with the required OAuth parameters being passed into the process via environment variables.
My docker-compose.yml
file contains the command line entry and arguments for running my Flower instance (in its own container):
...
celery_flower:
build: .
working_dir: /code
command: celery -A carbalert.carbalert_scrapy.carbalert_scrapy.tasks flower --loglevel=debug --auth_provider=flower.views.auth.GithubLoginHandler --auth=${FLOWER_OAUTH2_EMAIL} --oauth2_key=${FLOWER_OAUTH2_KEY} --oauth2_secret=${FLOWER_OAUTH2_SECRET} --oauth2_redirect_uri=${FLOWER_OAUTH2_REDIRECT_URI} --url_prefix=flower
ports:
- "5555:5555"
depends_on:
- celery_worker
...
See below some screenshots from Flower:
Next post in series: Part 4: Deploying and using CarbAlert