Query pipeline results

Once you have created a pipeline, its results land in a dedicated sink collection that is separate from your raw data. This page covers how to query that sink.

You query pipeline results the same way you query raw readings, except you specify the pipeline-sink data source type and the pipeline ID of the pipeline whose results you want. The pipeline ID is returned on creation. If you did not save it, retrieve it with:

viam datapipelines list --org-id=<org-id>

To see a specific pipeline and its last run:

viam datapipelines describe --id=<pipeline-id>

Both commands are also documented in Manage pipelines.

Query in the Viam app

You can query pipeline results directly in the app’s query editor:

  1. Navigate to the DATA page and select the Query subtab, or click Query pipeline data in the three-dot menu next to a pipeline on the Pipelines subtab.
  2. In the query header, open the data source dropdown and select Pipeline sink.
  3. A second dropdown labeled Pipeline appears. Select the pipeline whose results you want to query.
  4. Write your MQL query and click Run query.

The query runs against the pipeline’s output collection. All MQL stages you add operate on the pipeline’s result documents, not the raw readings.

Query with the SDK

from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType

PIPELINE_ID = "YOUR-PIPELINE-ID"

# Returns a list of result documents
results = await data_client.tabular_data_by_mql(
    organization_id=ORG_ID,
    query=[
        {"$limit": 10},
    ],
    tabular_data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK,
    pipeline_id=PIPELINE_ID,
)

for entry in results:
    print(entry)
results, err := dataClient.TabularDataByMQL(ctx, orgID,
    []map[string]interface{}{
        {"$limit": 10},
    },
    &app.TabularDataByMQLOptions{
        TabularDataSourceType: app.TabularDataSourceTypePipelineSink,
        PipelineID:            "YOUR-PIPELINE-ID",
    },
)
if err != nil {
    logger.Fatal(err)
}

for _, entry := range results {
    fmt.Printf("%v\n", entry)
}

The query runs against the pipeline’s output documents, not the raw readings. The fields available depend on what your pipeline’s $project stage produces.

What results look like

Each result document contains the fields your pipeline’s $project stage outputs, plus metadata added automatically:

{
  "_viam_pipeline_run": {
    "id": "run-id",
    "interval": {
      "start": "2025-03-15T14:00:00.000Z",
      "end": "2025-03-15T15:00:00.000Z"
    },
    "organization_id": "org-id"
  },
  "location": "warehouse-a",
  "avg_temp": 23.5,
  "count": 3600
}

You can use $match on the _viam_pipeline_run.interval fields to filter results by time window.

Troubleshooting

Query returns empty results
  • Pipeline hasn’t run yet. Check pipeline run status with viam datapipelines describe --id=<pipeline-id> or see Monitor pipeline runs. Wait for at least one run to complete.
  • Wrong pipeline ID. The ID is returned on creation and visible in viam datapipelines list.
  • Wrong data source type. You must specify TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK, not STANDARD or HOT_STORAGE.