Manage data pipelines

Monitor and manage your data pipelines after creation. For creating pipelines, see Create a pipeline.

View pipelines in the Viam app

Navigate to the DATA page in the Viam app, then select the Pipelines subtab.

The pipelines list shows each pipeline’s name, schedule, last execution time, and error count. Disabled pipelines display a Disabled badge next to the name. Pipelines with backfill in progress show the completion percentage.

Click a pipeline name to open its detail page, which has three tabs:

  • Runs: A table of all pipeline executions with the data window, status, runtime, and start time for each run. Failed runs show an error icon you can expand for details.
  • Details: The pipeline’s configuration, including its ID, schedule, data source type, creation date, backfill status, and the full MQL query.
  • Custom indexes: MongoDB indexes on the pipeline’s output collection. See Manage custom indexes for details.

Use the three-dot menu next to any pipeline to Copy ID, Query pipeline data, Enable or Disable the pipeline, or Delete it.

List pipelines

viam datapipelines list --org-id=<org-id>

Example output (one line per pipeline):

hourly-temp-avg (ID: 64f3a1b2c4d5e6f7a8b9c0d1) [Enabled] [Data Source Type: Standard]
daily-summary (ID: 64f3a1b2c4d5e6f7a8b9c0d2) [Disabled] [Data Source Type: Hot Storage]

If the command prints nothing, the organization has no pipelines. This is not an error.

pipelines = await data_client.list_data_pipelines(organization_id=ORG_ID)
for p in pipelines:
    print(f"{p.id}: {p.name} (enabled={p.enabled}, schedule={p.schedule})")
pipelines, err := dataClient.ListDataPipelines(ctx, orgID)
if err != nil {
    logger.Fatal(err)
}
for _, p := range pipelines {
    fmt.Printf("%s: %s (enabled=%v, schedule=%s)\n", p.ID, p.Name, p.Enabled, p.Schedule)
}

Get pipeline details

viam datapipelines describe --id=<pipeline-id>

Example output:

ID: 64f3a1b2c4d5e6f7a8b9c0d1
Name: hourly-temp-avg
Enabled: true
Schedule: 0 * * * *
MQL query: [
  {
    "$match": {
      "component_name": "temperature-sensor"
    }
  },
  ...
]
DataSourceType: TABULAR_DATA_SOURCE_TYPE_STANDARD
Last run:
  Status: Success
  Started: 2026-03-15T15:02:13Z
  Data range: [2026-03-15T14:00:00Z, 2026-03-15T15:00:00Z]
  Ended: 2026-03-15T15:02:18Z

If the pipeline has never run, the last section reads Has not run yet. instead.

pipeline = await data_client.get_data_pipeline(id="YOUR-PIPELINE-ID")
print(f"Name: {pipeline.name}")
print(f"Schedule: {pipeline.schedule}")
print(f"Enabled: {pipeline.enabled}")
print(f"Data source: {pipeline.data_source_type}")
print(f"Created: {pipeline.created_on}")
pipeline, err := dataClient.GetDataPipeline(ctx, "YOUR-PIPELINE-ID")
if err != nil {
    logger.Fatal(err)
}
fmt.Printf("Name: %s\nSchedule: %s\nEnabled: %v\n", pipeline.Name, pipeline.Schedule, pipeline.Enabled)

Monitor pipeline runs

Each pipeline run has a status and an associated time window showing which data it processed.

# Returns a page of runs (default page size: 10)
page = await data_client.list_data_pipeline_runs(id="YOUR-PIPELINE-ID")
for run in page.runs:
    print(f"Run {run.id}: {run.status}")
    print(f"  Data window: {run.data_start_time} to {run.data_end_time}")
    if run.error_message:
        print(f"  Error: {run.error_message}")

# Get the next page if there are more runs
if page.next_page_token:
    next_page = await page.next_page()
// Returns a page of runs (default page size: 10)
page, err := dataClient.ListDataPipelineRuns(ctx, "YOUR-PIPELINE-ID", 10)
if err != nil {
    logger.Fatal(err)
}
for _, run := range page.Runs {
    fmt.Printf("Run %s: %d\n", run.ID, run.Status)
    fmt.Printf("  Data window: %s to %s\n", run.DataStartTime, run.DataEndTime)
    if run.ErrorMessage != "" {
        fmt.Printf("  Error: %s\n", run.ErrorMessage)
    }
}

// Get the next page
nextPage, err := page.NextPage(ctx)

Run statuses:

SDK statusCLI labelMeaning
SCHEDULEDScheduledThe run is queued and waiting to execute (2-minute delay before execution starts).
STARTEDRunningThe run is executing the MQL aggregation against the data source.
COMPLETEDSuccessThe run finished and results are in the pipeline sink.
FAILEDFailedThe run encountered an error. Check the error_message field.

SDK methods return the enum Status value on the left. The viam datapipelines describe CLI output uses the label on the right.

If a run stays in STARTED for more than 10 minutes, it is automatically marked as failed and a new run is created for that time window.

Enable a pipeline

viam datapipelines enable --id=<pipeline-id>
err = dataClient.EnableDataPipeline(ctx, "YOUR-PIPELINE-ID")

Disable a pipeline

viam datapipelines disable --id=<pipeline-id>
err = dataClient.DisableDataPipeline(ctx, "YOUR-PIPELINE-ID")

Disabling a pipeline stops future scheduled runs but does not delete existing results. When you re-enable a pipeline, it resumes from the next scheduled time window. It does not backfill windows it missed while disabled.

Rename a pipeline

viam datapipelines rename --id=<pipeline-id> --name=new-name
await data_client.rename_data_pipeline(id="YOUR-PIPELINE-ID", name="new-name")
err = dataClient.RenameDataPipeline(ctx, "YOUR-PIPELINE-ID", "new-name")

Delete a pipeline

viam datapipelines delete --id=<pipeline-id>
await data_client.delete_data_pipeline(id="YOUR-PIPELINE-ID")
err = dataClient.DeleteDataPipeline(ctx, "YOUR-PIPELINE-ID")

Manage custom indexes

Custom indexes speed up queries against a pipeline’s output collection. You can create, list, and delete indexes from the Viam app, the CLI, or the SDK.

Create an index in the Viam app

  1. Open the pipeline detail page by clicking the pipeline name on the Pipelines subtab.

  2. Select the Custom indexes tab.

  3. Click Create custom index.

  4. Enter a MongoDB index specification in JSON format with a key field and optional options:

    {
      "key": { "location_id": 1, "avg_temp": -1 },
      "options": { "name": "location-avg-temp" }
    }
    
  5. Click Create index.

The index build starts in the background. The Custom indexes tab shows a status badge on each index: Building while the index is being created, Ready when complete, or Failed if the build encountered an error. You can only delete an index once it reaches the Ready or Failed state.

To delete an index, click the delete icon on the index card.

Create an index with the CLI or SDK

See Manage data indexes for CLI commands, or use the SDK’s CreateIndex, ListIndexes, and DeleteIndex methods documented in the data client API reference.

Troubleshooting

Pipeline consistently fails
  1. Check the error message in the run details (list_data_pipeline_runs or describe).
  2. Run the same MQL query manually in the query editor using MQL mode against the same data source. This isolates whether the issue is in the query or the pipeline configuration.
  3. Common failure causes:
    • Invalid MQL stage or syntax error
    • Query timeout (5-minute limit) on large datasets. Add a $match filter to reduce data.
    • Output exceeds 10,000 documents. Add $limit or make the $group less granular.
Re-enabled pipeline has gaps in results
This is expected. When you disable a pipeline, scheduled runs do not execute. When you re-enable it, it resumes from the next scheduled window. Missed windows are not retroactively processed, even if backfill is enabled. Backfill only applies to late-arriving data within windows the pipeline was active for.
Hot data store query returns no data
  • Verify the hot data store is enabled on the component. See Hot data store.
  • Check that data falls within the configured stored_hours window. Older data is removed hourly.
  • Verify data has been captured and synced within the retention window.