In the Set up a platform for data pipelines tutorial, you used Prefect Cloud to set up a platform for data pipelines. In this tutorial, you’ll learn what to do when those data pipelines fail.

This tutorial starts where the previous tutorial leaves off, so complete that one first. You will need a paid Prefect Cloud account.

Find failures

You can use the Prefect Cloud dashboard to find failures.

  1. Sign in to Prefect Cloud
  2. Use the workspace switcher to open the staging workspace that you created in the last tutorial.
  3. Go to Home, and look for red bars in the Flow Runs section, these indicate failed flow runs.
  4. Hover over a red bar to see more details about the flow run: name, deployment, duration, timestamp, and tags.

You can filter by a specific tag (e.g. team-a) if you’re only interested in a specific set of flows.

Debug a failure

A single flow might experience failures on several runs. When this happens, it can be helpful to inspect the first failure in the series.

  1. In the Flow Runs section on the Home page, expand the data-pipeline flow.
  2. You will see a list of failing data-pipeline flow runs, in reverse chronological order.
  3. Use the pagination controls to navigate to the last failure in the list, this is the first failure that occurred.
  4. Click the name of the flow run to go to its detail page.
  5. From the flow run detail page, scroll down to the Logs section in the right panel.
  6. Look for an error message similar to the following:
File "/opt/prefect/demos/simulate_failures.py", line 12, in process_data
    raise Exception(f"Run failed")

It looks like there’s an error in the simulate_failures.py file. Now that you’ve found the failure, the next step is to fix the underlying code.

Update the code

Open the simulate_failures.py file and look at line 12.

simulate_failures.py
import argparse
import asyncio
from typing import Optional

from prefect import flow, task
from prefect.client.orchestration import get_client


@task
def process_data(run: int, fail_at_run: Optional[int] = None) -> bool:
    """Simulate data processing with failures"""
    
    # Simulate persistent failures
    if fail_at_run and run > fail_at_run:
        raise Exception(f"Run failed")
    
    return True

# ...

The if statement is the problem. If you specify the --fail_at_run flag, once the flow runs more than fail_at_run times, the flow fails with an exception. Remove the if statement to fix this failure. We added this statement to give you something to fix. :)

simulate_failures.py
import argparse
import asyncio
from typing import Optional
from prefect import flow, task
from prefect.client.orchestration import get_client

@task
def process_data(run: int, fail_at_run: Optional[int] = None) -> bool:
    """Simulate data processing with failures"""
    
    return True

# ...

Now, all flow runs succeed in spite of the --fail-at-run flag. Deploy the fix to the staging workspace to confirm this new behavior.

prefect cloud workspace set --workspace "<account>/staging"
python simulate_failures.py --fail-at-run 3

After the script finishes, open the Home page in Prefect Cloud to verify that the flow run is no longer failing.

You can now switch workspaces to update the code used in the production workspace as well.

prefect cloud workspace set --workspace "<account>/production"
python simulate_failures.py

Next steps

In this tutorial, you successfully used Prefect Cloud to fix a failing data pipeline.

To take this to the next level, learn how to set up an alert so that you get notified about failures automatically.

Need help? Book a meeting with a Prefect Product Advocate to get your questions answered.