Use annotations to control data provenance

Introduction

In node-graph, a critical feature is tracking task inputs and outputs to ensure data provenance and reproducibility. To achieve this, task functions must be annotated with input and output specifications. This tells the engine how to handle, serialize, and store data as individual data nodes.

This process addresses two key aspects of provenance:

  • Data creation: How should data be created and stored? For example, if a task returns a nested dictionary, should it be stored as a single entity or unpacked into separate nodes?

  • Data lineage: Where does the data come from? How does it flow between tasks in the workflow?

This guide will walk you through the various ways to annotate your tasks.

import typing as t

from node_graph import dynamic, namespace, task

Data creation

Static namespaces for outputs

Let’s define two tasks that perform the same calculation but have different output annotations.

The first task, add_multiply1, has no output specification. Consequently, the returned dictionary will be stored as a single Dict task. The second task, add_multiply2, uses an output namespace to specify that each key-value pair in the dictionary should be stored as a separate task.

@task()
def add_multiply1(x, y):
    """Return a dictionary, which will be stored as a single Dict task."""
    return {"sum": x + y, "product": x * y}


@task()
def add_multiply2(
    x: int,
    y: int,
) -> t.Annotated[dict, namespace(sum=int, product=int),]:
    """Return a dictionary, but its elements are stored as separate Int nodes."""
    return {"sum": x + y, "product": x * y}


@task.graph()
def AddMultiply(x: int, y: int):
    """A graph to run both versions of the add_multiply task."""
    add_multiply1(x=x, y=y)
    add_multiply2(x=x, y=y)


wg = AddMultiply.build(x=1, y=2)
wg.run()
{}

Note

In add_multiply2, we also annotated the input types (x: int, y: int). This adds a layer of validation to ensure only integers are passed to the task. This feature is experimental - its API and behavior may change in future releases.

Let’s visualize the data provenance of our executed workflow:

wg.engine.recorder
provenance proc:ddead5fa4c934ec1a2a6d0dead641ab5 AddMultiply (ddead5f...) state=FINISHED proc:c3cd709a0b484322bc2610c7565bae2a add_multiply1 (c3cd709...) state=FINISHED proc:ddead5fa4c934ec1a2a6d0dead641ab5->proc:c3cd709a0b484322bc2610c7565bae2a CALL proc:74d6e65aecbb444ca5a23ca870753396 add_multiply2 (74d6e65...) state=FINISHED proc:ddead5fa4c934ec1a2a6d0dead641ab5->proc:74d6e65aecbb444ca5a23ca870753396 CALL data:0d5cd59a-4c1e-4464-9e40-27ced1ea7c67 dict (0d5cd59...) proc:c3cd709a0b484322bc2610c7565bae2a->data:0d5cd59a-4c1e-4464-9e40-27ced1ea7c67 CREATE result data:32d912bd-6687-4f41-a649-3e91b51cbecf int (32d912b...) proc:74d6e65aecbb444ca5a23ca870753396->data:32d912bd-6687-4f41-a649-3e91b51cbecf CREATE sum data:f5991325-6e24-46f9-908e-012226225767 int (f599132...) proc:74d6e65aecbb444ca5a23ca870753396->data:f5991325-6e24-46f9-908e-012226225767 CREATE product data:c0f40dd1-10e8-4bfd-b66a-054949ab25e1 int (c0f40dd...) data:c0f40dd1-10e8-4bfd-b66a-054949ab25e1->proc:ddead5fa4c934ec1a2a6d0dead641ab5 INPUT x data:c0f40dd1-10e8-4bfd-b66a-054949ab25e1->proc:c3cd709a0b484322bc2610c7565bae2a INPUT x data:c0f40dd1-10e8-4bfd-b66a-054949ab25e1->proc:74d6e65aecbb444ca5a23ca870753396 INPUT x data:b35a55a9-5529-4ffa-8a6a-3510027510df int (b35a55a...) data:b35a55a9-5529-4ffa-8a6a-3510027510df->proc:ddead5fa4c934ec1a2a6d0dead641ab5 INPUT y data:b35a55a9-5529-4ffa-8a6a-3510027510df->proc:c3cd709a0b484322bc2610c7565bae2a INPUT y data:b35a55a9-5529-4ffa-8a6a-3510027510df->proc:74d6e65aecbb444ca5a23ca870753396 INPUT y


As the provenance graph shows, add_multiply1 has a single output task (result), while add_multiply2 has two separate output nodes (sum and product), as defined in its namespace.

Static namespaces for inputs

Similarly, we can annotate inputs to unpack a dictionary into distinct data nodes. We use Python’s standard typing.Annotated to attach the node-graph namespace metadata to the input type.

@task()
def add_multiply3(
    data: t.Annotated[
        dict,
        namespace(x=int, y=int),
    ],
) -> t.Annotated[dict, namespace(sum=int, product=int),]:
    """Take a dictionary as input but treat 'x' and 'y' as separate nodes."""
    return {"sum": data["x"] + data["y"], "product": data["x"] * data["y"]}


@task.graph()
def AddMultiplyInputs(x: int, y: int):
    add_multiply3(data={"x": x, "y": y})


wg = AddMultiplyInputs.build(x=1, y=2)
wg


Note how the x input is passed to data.x (and similarly for y). This is due to the namespace specifications.

wg.run()
{}

Finally, we can inspect the provenance graph for this workflow:

wg.engine.recorder
provenance proc:da929655c894446ab71181b57e89e19f AddMultiplyInputs (da92965...) state=FINISHED proc:265f4e9bce824407b7d9721b4eccea8b add_multiply3 (265f4e9...) state=FINISHED proc:da929655c894446ab71181b57e89e19f->proc:265f4e9bce824407b7d9721b4eccea8b CALL data:c546b6ea-16a9-40e9-b877-a5e8a7717c6f int (c546b6e...) proc:265f4e9bce824407b7d9721b4eccea8b->data:c546b6ea-16a9-40e9-b877-a5e8a7717c6f CREATE sum data:8735f558-5635-40b3-ad5d-cc785b9f57c9 int (8735f55...) proc:265f4e9bce824407b7d9721b4eccea8b->data:8735f558-5635-40b3-ad5d-cc785b9f57c9 CREATE product data:8e66489b-df75-40b6-a952-c59be453ee23 int (8e66489...) data:8e66489b-df75-40b6-a952-c59be453ee23->proc:da929655c894446ab71181b57e89e19f INPUT x data:8e66489b-df75-40b6-a952-c59be453ee23->proc:265f4e9bce824407b7d9721b4eccea8b INPUT data.x data:feb48374-fb06-4895-978e-afd78ee55c76 int (feb4837...) data:feb48374-fb06-4895-978e-afd78ee55c76->proc:da929655c894446ab71181b57e89e19f INPUT y data:feb48374-fb06-4895-978e-afd78ee55c76->proc:265f4e9bce824407b7d9721b4eccea8b INPUT data.y


We can see that even though we passed the inputs as a single dictionary, they were serialized as two separate Int nodes, x and y, before being passed to the task.

Dynamic namespaces

Sometimes, the number and names of outputs are not known until the task runs. Dynamic namespaces are designed for this scenario.

This is particularly useful for tasks that generate a variable number of outputs based on their inputs.

@task()
def generate_square_numbers(
    n: int,
) -> t.Annotated[dict, dynamic(t.Any),]:
    """Generate a dict of square numbers. The number of outputs depends on 'n'."""
    return {f"square_{i}": i**2 for i in range(n)}


@task.graph()
def SquareNumbersGenerator(n: int):
    generate_square_numbers(n=n)


wg = SquareNumbersGenerator.build(n=5)
wg.run()
{}

Let’s examine the provenance of this dynamic workflow:

wg.engine.recorder
provenance proc:765580fb28c84a2fa6f74aa764b9fa0e SquareNumbersGenerator (765580f...) state=FINISHED proc:0eee8f58158341f99c31b1d09c0dce49 generate_square_numbers (0eee8f5...) state=FINISHED proc:765580fb28c84a2fa6f74aa764b9fa0e->proc:0eee8f58158341f99c31b1d09c0dce49 CALL data:586ce685-1c53-4fe1-81a7-f22874b66443 int (586ce68...) proc:0eee8f58158341f99c31b1d09c0dce49->data:586ce685-1c53-4fe1-81a7-f22874b66443 CREATE square_0 data:32aafbee-6325-4f52-9a28-19e31c1bae99 int (32aafbe...) proc:0eee8f58158341f99c31b1d09c0dce49->data:32aafbee-6325-4f52-9a28-19e31c1bae99 CREATE square_1 data:c9d14f52-d799-43b3-9d9d-ec2ce94d4e3a int (c9d14f5...) proc:0eee8f58158341f99c31b1d09c0dce49->data:c9d14f52-d799-43b3-9d9d-ec2ce94d4e3a CREATE square_2 data:742533bc-d3a2-4dee-b155-0170d302dc53 int (742533b...) proc:0eee8f58158341f99c31b1d09c0dce49->data:742533bc-d3a2-4dee-b155-0170d302dc53 CREATE square_3 data:a2114cbe-028c-4300-989a-4e8c4b07bb18 int (a2114cb...) proc:0eee8f58158341f99c31b1d09c0dce49->data:a2114cbe-028c-4300-989a-4e8c4b07bb18 CREATE square_4 data:48cff478-4ff3-4a58-ac00-2de4fb286667 int (48cff47...) data:48cff478-4ff3-4a58-ac00-2de4fb286667->proc:765580fb28c84a2fa6f74aa764b9fa0e INPUT n data:48cff478-4ff3-4a58-ac00-2de4fb286667->proc:0eee8f58158341f99c31b1d09c0dce49 INPUT n


The graph shows that the generate_square_numbers task has multiple output nodes, one for each entry in the dynamically generated dictionary. The dynamic(typing.Any) specification treats each value in the returned dictionary as a separate output node of any type.

Note

If no item type is specified (i.e., just dynamic()), the namespace becomes fully dynamic, enabling arbitrary nested structures (e.g., dictionaries). Each value will then be recursively treated as a leaf node.

Nested namespaces

Namespaces can be nested to represent complex, structured data. Let’s define a task that returns a nested dictionary.

@task()
def generate_nested_dict(
    x: int,
    y: int,
) -> t.Annotated[dict, namespace(sum=int, nested=namespace(diff=int, product=int)),]:
    """Returns a nested dictionary with a corresponding nested namespace."""
    return {"sum": x + y, "nested": {"diff": x - y, "product": x * y}}


@task.graph()
def NestedDictGenerator(x: int, y: int):
    generate_nested_dict(x=x, y=y)


wg = NestedDictGenerator.build(x=1, y=2)
wg.run()
{}

Visualize the full graph provenance:

wg.engine.recorder
provenance proc:c2490c4d52b644d680af3ea672d48fd2 NestedDictGenerator (c2490c4...) state=FINISHED proc:d71c1d62121c44129b95d0b1e95af646 generate_nested_dict (d71c1d6...) state=FINISHED proc:c2490c4d52b644d680af3ea672d48fd2->proc:d71c1d62121c44129b95d0b1e95af646 CALL data:5b14253f-9da7-4fb8-adc3-0357ed432ef7 int (5b14253...) proc:d71c1d62121c44129b95d0b1e95af646->data:5b14253f-9da7-4fb8-adc3-0357ed432ef7 CREATE sum data:ca660da7-12ae-4e34-8e49-dae7dc859434 int (ca660da...) proc:d71c1d62121c44129b95d0b1e95af646->data:ca660da7-12ae-4e34-8e49-dae7dc859434 CREATE nested.diff data:b833c0dc-42f0-4164-ad45-8218aa1dfcc1 int (b833c0d...) proc:d71c1d62121c44129b95d0b1e95af646->data:b833c0dc-42f0-4164-ad45-8218aa1dfcc1 CREATE nested.product data:39b7d8b2-857f-4178-b781-93b3ed67228b int (39b7d8b...) data:39b7d8b2-857f-4178-b781-93b3ed67228b->proc:c2490c4d52b644d680af3ea672d48fd2 INPUT x data:39b7d8b2-857f-4178-b781-93b3ed67228b->proc:d71c1d62121c44129b95d0b1e95af646 INPUT x data:32cb48a4-9d61-4758-b199-84a253987c93 int (32cb48a...) data:32cb48a4-9d61-4758-b199-84a253987c93->proc:c2490c4d52b644d680af3ea672d48fd2 INPUT y data:32cb48a4-9d61-4758-b199-84a253987c93->proc:d71c1d62121c44129b95d0b1e95af646 INPUT y


The summary confirms that the output is correctly structured with a top-level sum and a nested nested dictionary, just as defined in the namespace.

We can also combine dynamic and nested namespaces.

@task()
def generate_dynamic_nested_dict(
    n: int,
) -> t.Annotated[dict, dynamic(namespace(square=int, cube=int)),]:
    """Generate a nested dict of square and cube numbers from 0 to n."""
    return {f"data_{i}": {"square": i**2, "cube": i**3} for i in range(n)}


@task.graph()
def DynamicNestedDictGenerator(n: int):
    generate_dynamic_nested_dict(n=n)


wg = DynamicNestedDictGenerator.build(n=3)
wg.run()
{}

Visualize the provenance:

wg.engine.recorder
provenance proc:a0c9914948994f8abf48b2bed6012b74 DynamicNestedDictGenerator (a0c9914...) state=FINISHED proc:54329977f9ef4d5d876f43c9e1f62ae4 generate_dynamic_nested_dict (5432997...) state=FINISHED proc:a0c9914948994f8abf48b2bed6012b74->proc:54329977f9ef4d5d876f43c9e1f62ae4 CALL data:45c3d169-3fd6-476e-a20b-0fa62fd0b109 int (45c3d16...) proc:54329977f9ef4d5d876f43c9e1f62ae4->data:45c3d169-3fd6-476e-a20b-0fa62fd0b109 CREATE data_0.square data:5e2573dc-fbf4-41db-80ec-d18f8a14b860 int (5e2573d...) proc:54329977f9ef4d5d876f43c9e1f62ae4->data:5e2573dc-fbf4-41db-80ec-d18f8a14b860 CREATE data_0.cube data:3c61703d-a305-4005-a288-03e2aae68a50 int (3c61703...) proc:54329977f9ef4d5d876f43c9e1f62ae4->data:3c61703d-a305-4005-a288-03e2aae68a50 CREATE data_1.square data:fad3e10a-3b35-4b81-a5a6-3e2172e72ff9 int (fad3e10...) proc:54329977f9ef4d5d876f43c9e1f62ae4->data:fad3e10a-3b35-4b81-a5a6-3e2172e72ff9 CREATE data_1.cube data:e08b8519-095b-4084-832e-1c423f39e714 int (e08b851...) proc:54329977f9ef4d5d876f43c9e1f62ae4->data:e08b8519-095b-4084-832e-1c423f39e714 CREATE data_2.square data:2feb72f9-b14f-45d8-b018-373fb975207a int (2feb72f...) proc:54329977f9ef4d5d876f43c9e1f62ae4->data:2feb72f9-b14f-45d8-b018-373fb975207a CREATE data_2.cube data:fbce9b68-0afe-44d6-97f1-2fe4f77a580b int (fbce9b6...) data:fbce9b68-0afe-44d6-97f1-2fe4f77a580b->proc:a0c9914948994f8abf48b2bed6012b74 INPUT n data:fbce9b68-0afe-44d6-97f1-2fe4f77a580b->proc:54329977f9ef4d5d876f43c9e1f62ae4 INPUT n


The output shows a dictionary with dynamic keys (data_0, data_1, etc.), where each value is itself a dictionary with a fixed square and cube structure, as specified by dynamic(namespace(...)). Using TypedDict —————-

TypedDict works like a static namespace with one socket per key. Optional keys (total=False) become optional sockets.

from typing import TypedDict


class XYIn(TypedDict):
    x: int
    y: int


class AddMultiplyOut(TypedDict):
    sum: int
    product: int


@task()
def add_multiply_typed_dict(**kwargs) -> AddMultiplyOut:
    return {"sum": kwargs["x"] + kwargs["y"], "product": kwargs["x"] * kwargs["y"]}


@task.graph()
def AddMultiplyTypedDict():
    add_multiply_typed_dict(x=2, y=5)


wg = AddMultiplyTypedDict.build()
wg.run()
wg.engine.recorder
provenance proc:fff9b124719a42c68078af8c1e45acc0 AddMultiplyTypedDict (fff9b12...) state=FINISHED proc:b69ff4d9fe5443358fe72c056285a014 add_multiply_typed_dict (b69ff4d...) state=FINISHED proc:fff9b124719a42c68078af8c1e45acc0->proc:b69ff4d9fe5443358fe72c056285a014 CALL data:91098163-389c-4dcf-bdaf-0f06b171690e int (9109816...) proc:b69ff4d9fe5443358fe72c056285a014->data:91098163-389c-4dcf-bdaf-0f06b171690e CREATE sum data:63a15574-4391-4136-8e50-ed012befb880 int (63a1557...) proc:b69ff4d9fe5443358fe72c056285a014->data:63a15574-4391-4136-8e50-ed012befb880 CREATE product data:077409e1-a60a-49e1-b9e7-2ee62b7dfba5 int (077409e...) data:077409e1-a60a-49e1-b9e7-2ee62b7dfba5->proc:b69ff4d9fe5443358fe72c056285a014 INPUT x data:39092ee2-0071-4502-a7b5-dc87f05cf33a int (39092ee...) data:39092ee2-0071-4502-a7b5-dc87f05cf33a->proc:b69ff4d9fe5443358fe72c056285a014 INPUT y


Note

You can also expand **kwargs with Unpack[YourTypedDict] to make the inputs explicit in the function signature (same socket expansion). Optional keys can be expressed with total=False or NotRequired. NotRequired requires Python 3.11+ (or typing_extensions).

%% Using Pydantic models ———————

You can use Pydantic models in annotations as a more structured, reusable way to define namespaces. By default, a BaseModel expands to a static namespace with one socket per field.

If you want a dynamic namespace, set model_config = {"extra": "allow"} and (optionally) "item_type" for the type of each dynamic value. If you want to treat a model as a single leaf (blob), set model_config = {"leaf": True} or use Leaf[YourModel] in the annotation.

from pydantic import BaseModel
from node_graph.socket_spec import Leaf


class OutputsModel(BaseModel):
    sum: int
    product: int


@task()
def add_multiply_pydantic_in_out(x, y) -> OutputsModel:
    return {"sum": x + y, "product": x * y}


@task.graph()
def AddMultiplyPydantic():
    # IMPORTANT: pass a plain dict, not OutputsModel(x=3, y=4)
    add_multiply_pydantic_in_out(x=3, y=4)


wg = AddMultiplyPydantic.build()
wg.run()
wg.engine.recorder
provenance proc:59d2e9cdb934432a91f857176dec60aa AddMultiplyPydantic (59d2e9c...) state=FINISHED proc:ec611fec1c5748a29f22a0e1cbad48b6 add_multiply_pydantic_in_out (ec611fe...) state=FINISHED proc:59d2e9cdb934432a91f857176dec60aa->proc:ec611fec1c5748a29f22a0e1cbad48b6 CALL data:b99c6f16-cf76-46df-bcfa-07abd251a4ba int (b99c6f1...) proc:ec611fec1c5748a29f22a0e1cbad48b6->data:b99c6f16-cf76-46df-bcfa-07abd251a4ba CREATE sum data:2e15693e-7001-4c9e-95a6-d5bf2ad426ca int (2e15693...) proc:ec611fec1c5748a29f22a0e1cbad48b6->data:2e15693e-7001-4c9e-95a6-d5bf2ad426ca CREATE product data:a01e5aaa-6358-478d-9c4f-2fc8018315b1 int (a01e5aa...) data:a01e5aaa-6358-478d-9c4f-2fc8018315b1->proc:ec611fec1c5748a29f22a0e1cbad48b6 INPUT x data:84d44887-c6aa-4ab5-850e-87f881b035ba int (84d4488...) data:84d44887-c6aa-4ab5-850e-87f881b035ba->proc:ec611fec1c5748a29f22a0e1cbad48b6 INPUT y


Dynamic Pydantic models

Mark a model as dynamic with extra='allow'. Add item_type to specify the per-key value type. Fixed fields still appear as normal sockets alongside your dynamic keys.

class DynamicOut(BaseModel):
    model_config = {"extra": "allow", "item_type": int}

    header: int = 42  # fixed (non-dynamic) field


@task()
def make_dynamic_with_model(n: int) -> DynamicOut:
    # fixed field + dynamic keys with int values
    return {"header": 100, **{f"k{i}": i * i for i in range(n)}}


@task.graph()
def GraphDynamicOut(n: int):
    make_dynamic_with_model(n=n)


wg = GraphDynamicOut.build(n=4)
wg.run()
wg.engine.recorder
provenance proc:e1d190934b044b34b2b4073ab17b4e78 GraphDynamicOut (e1d1909...) state=FINISHED proc:702820efe4d448ca86bf08c649dff23f make_dynamic_with_model (702820e...) state=FINISHED proc:e1d190934b044b34b2b4073ab17b4e78->proc:702820efe4d448ca86bf08c649dff23f CALL data:8084ee95-f20c-4f4f-8b45-69b6513518c2 int (8084ee9...) proc:702820efe4d448ca86bf08c649dff23f->data:8084ee95-f20c-4f4f-8b45-69b6513518c2 CREATE header data:e4caefee-2631-470b-8f3f-89544c64f340 int (e4caefe...) proc:702820efe4d448ca86bf08c649dff23f->data:e4caefee-2631-470b-8f3f-89544c64f340 CREATE k0 data:253c82e4-f84e-4ab3-b0c1-b8523e6c85d9 int (253c82e...) proc:702820efe4d448ca86bf08c649dff23f->data:253c82e4-f84e-4ab3-b0c1-b8523e6c85d9 CREATE k1 data:1848e952-6081-43b9-9a47-924b4519e13a int (1848e95...) proc:702820efe4d448ca86bf08c649dff23f->data:1848e952-6081-43b9-9a47-924b4519e13a CREATE k2 data:bee0319c-871c-4e95-8c8f-43eb0350b2ce int (bee0319...) proc:702820efe4d448ca86bf08c649dff23f->data:bee0319c-871c-4e95-8c8f-43eb0350b2ce CREATE k3 data:0f41a82d-a471-44ea-a050-a4f079f669cb int (0f41a82...) data:0f41a82d-a471-44ea-a050-a4f079f669cb->proc:e1d190934b044b34b2b4073ab17b4e78 INPUT n data:0f41a82d-a471-44ea-a050-a4f079f669cb->proc:702820efe4d448ca86bf08c649dff23f INPUT n


Leaf Pydantic models (single blob)

Sometimes you want to validate with a Pydantic model but store it as a single node instead of expanding fields. For leaf models, the value is treated as a blob; the socket stays annotated and the object is stored as-is. There are two ways:

  1. Mark the model: model_config = {"leaf": True}

  2. Per-use override: annotate with Leaf[YourModel]

class BlobModel(BaseModel):
    model_config = {"leaf": True}  # always a leaf blob

    a: int
    b: int


@task()
def consume_blob(m: BlobModel) -> dict:
    # 'm' is validated by Pydantic but stored/treated as one leaf node
    return {"sum": m.a + m.b}


# Per-use override without modifying the model:
class AnotherModel(BaseModel):
    a: int
    b: int


@task()
def consume_blob_per_use(m: Leaf[AnotherModel]) -> dict:
    return {"sum": m.a + m.b}


@task.graph()
def BlobExamples():
    consume_blob(m=BlobModel(a=1, b=2))
    consume_blob_per_use(m=AnotherModel(a=3, b=4))


wg = BlobExamples.build()
wg.run()
wg.engine.recorder
provenance proc:6a5dac1dd38c433a9e512e84fd6af66c BlobExamples (6a5dac1...) state=FINISHED proc:6e975511905045ec848f24e37664f314 consume_blob (6e97551...) state=FINISHED proc:6a5dac1dd38c433a9e512e84fd6af66c->proc:6e975511905045ec848f24e37664f314 CALL proc:633a3f0044fc407fb2c474dd1ccdafc3 consume_blob_per_use (633a3f0...) state=FINISHED proc:6a5dac1dd38c433a9e512e84fd6af66c->proc:633a3f0044fc407fb2c474dd1ccdafc3 CALL data:3575dbc5-1ee5-4980-afd8-689ea8b18730 dict (3575dbc...) proc:6e975511905045ec848f24e37664f314->data:3575dbc5-1ee5-4980-afd8-689ea8b18730 CREATE result data:9c1fa143-e51c-42d7-93f2-df742c0badc3 dict (9c1fa14...) proc:633a3f0044fc407fb2c474dd1ccdafc3->data:9c1fa143-e51c-42d7-93f2-df742c0badc3 CREATE result data:92528e2d-0a4c-452f-ab6c-3f0dfd02d46e BlobModel (92528e2...) data:92528e2d-0a4c-452f-ab6c-3f0dfd02d46e->proc:6e975511905045ec848f24e37664f314 INPUT m data:4e529335-6924-4dc5-a8d0-f7e1cf869d7c AnotherModel (4e52933...) data:4e529335-6924-4dc5-a8d0-f7e1cf869d7c->proc:633a3f0044fc407fb2c474dd1ccdafc3 INPUT m


Nested Pydantic models

Pydantic models can be nested to represent complex data structures. You can mix static, dynamic, and leaf models as needed.

class InnerModel(BaseModel):
    value: int


class OuterModel(BaseModel):
    name: str
    inner: InnerModel

Using dataclasses

Dataclasses work just like Pydantic models for annotations:

  • Plain dataclass -> expanded static namespace (one socket per field)

  • model_config={‘extra’: ‘allow’, ‘item_type’: T} -> dynamic namespace

  • model_config={‘leaf’: True} or Leaf[YourDataclass] -> single leaf (blob)

from dataclasses import dataclass


@dataclass
class DCOutputs:
    sum: int
    product: int


@task()
def add_multiply_dc_in_out(x, y) -> DCOutputs:
    return {"sum": x + y, "product": x * y}


@task.graph()
def AddMultiplyDataclass():
    # IMPORTANT: pass a plain dict, not DCInputs(...)
    add_multiply_dc_in_out(x=2, y=5)


wg = AddMultiplyDataclass.build()
wg.run()
wg.engine.recorder
provenance proc:db819d17290a45b5868729921791c2c0 AddMultiplyDataclass (db819d1...) state=FINISHED proc:0b16cab1e46849f9b20fa23e130b9dbe add_multiply_dc_in_out (0b16cab...) state=FINISHED proc:db819d17290a45b5868729921791c2c0->proc:0b16cab1e46849f9b20fa23e130b9dbe CALL data:ca06a998-dfcd-4f64-95a7-48fadf4d9fed int (ca06a99...) proc:0b16cab1e46849f9b20fa23e130b9dbe->data:ca06a998-dfcd-4f64-95a7-48fadf4d9fed CREATE sum data:b5409b6d-89f4-4b70-a00e-7c5a1fa24dd4 int (b5409b6...) proc:0b16cab1e46849f9b20fa23e130b9dbe->data:b5409b6d-89f4-4b70-a00e-7c5a1fa24dd4 CREATE product data:5e68e4f4-bc58-456f-82e6-a33b1dfc8137 int (5e68e4f...) data:5e68e4f4-bc58-456f-82e6-a33b1dfc8137->proc:0b16cab1e46849f9b20fa23e130b9dbe INPUT x data:0b576071-ae70-431a-9feb-9174f9f0b0b1 int (0b57607...) data:0b576071-ae70-431a-9feb-9174f9f0b0b1->proc:0b16cab1e46849f9b20fa23e130b9dbe INPUT y


Important

Structured models (Pydantic or dataclasses) are supported as runtime values. You may pass instances to tasks/graphs and return them from tasks:

  • Instances are expanded to plain dicts when assigned to namespace sockets, so Graph can wire provenance edges precisely (e.g., data.x –> task.data.x).

  • Graph inputs can still be collected from task outputs as a dict of AiiDA ORM nodes, preserving AiiDA links between nodes.

  • Validation still happens via the Graph spec (derived from your annotations).

Data linkage

Data linkage tracks the flow of data between tasks. At the workflow level, a task.graph can define its own inputs and outputs, providing a clean interface to a complex chain of tasks. node-graph validates data against these graph-level specifications and automatically links graph inputs to the appropriate task inputs.

In this final example, we will build a graph that reuses the input and output specifications from the tasks it contains. This is a powerful feature for building complex, modular, and self-consistent workflows.

@task()
def add_multiply(
    data: t.Annotated[
        dict,
        namespace(x=int, y=int),
    ],
) -> t.Annotated[dict, namespace(sum=int, product=int),]:
    """A reusable task with well-defined I/O specifications."""
    return {"sum": data["x"] + data["y"], "product": data["x"] * data["y"]}


@task.graph()
def AddMultiplyFinal(
    n: int,
    data: t.Annotated[
        dict,
        namespace(
            add_multiply1=add_multiply.inputs,
            add_multiply2=add_multiply.inputs,
        ),
    ],
) -> t.Annotated[
    dict,
    namespace(
        square=generate_square_numbers.outputs,
        add_multiply1=add_multiply.outputs,
        add_multiply2=add_multiply.outputs,
    ),
]:
    """A complex graph demonstrating I/O reuse and data linkage."""
    square_numbers = generate_square_numbers(n)

    # Unpack nested inputs and pass them to the respective tasks
    out1 = add_multiply(data=data["add_multiply1"]["data"])
    out2 = add_multiply(data=data["add_multiply2"]["data"])

    # Gather task outputs into the graph-level output structure
    return {"square": square_numbers, "add_multiply1": out1, "add_multiply2": out2}


wg = AddMultiplyFinal.build(
    n=3,
    data={
        "add_multiply1": {"data": {"x": 1, "y": 2}},
        "add_multiply2": {"data": {"x": 3, "y": 4}},
    },
)
wg


In the example above:

  • Graph outputs: The outputs are annotated with a nested namespace that defines the shape of the final result. Here we reuse generate_square_numbers.outputs and add_multiply.outputs to ensure the graph’s output signature is consistent with the tasks it contains.

  • Graph inputs: The data input is annotated with a nested namespace that reuses add_multiply.inputs. This allows node-graph to validate the complex input dictionary and create the correct data links.

In the GUI representation of the Graph, you will see how the nested inputs are correctly wired. For instance, there is a direct link from the graph input socket data.add_multiply1.data.x to the task input socket add_multiply_task_1.data.x, guaranteeing perfect data lineage.

Tip

If a graph only exposes the outputs of a single task, this can be simplified as

@task.graph()
def SomeGraph(...) - t.Annotated[dict, some_task.outputs]:
    return some_task(...)

We can see similar linkage in the provenance graph. Let’s run the graph and visualize its provenance.

wg.run()
wg.engine.recorder
provenance proc:b3385064523544cab5720e2dc03528d1 AddMultiplyFinal (b338506...) state=FINISHED proc:457398a1f7cf48b4beaec5d52d2f1ada generate_square_numbers (457398a...) state=FINISHED proc:b3385064523544cab5720e2dc03528d1->proc:457398a1f7cf48b4beaec5d52d2f1ada CALL proc:3a8f5e14c4114328a171b9f1fa17add9 add_multiply (3a8f5e1...) state=FINISHED proc:b3385064523544cab5720e2dc03528d1->proc:3a8f5e14c4114328a171b9f1fa17add9 CALL proc:08b4f27d00ac4c318581d097b906d694 add_multiply1 (08b4f27...) state=FINISHED proc:b3385064523544cab5720e2dc03528d1->proc:08b4f27d00ac4c318581d097b906d694 CALL data:bbe9ca63-dd00-4117-9a2d-62b02269d41f int (bbe9ca6...) proc:b3385064523544cab5720e2dc03528d1->data:bbe9ca63-dd00-4117-9a2d-62b02269d41f RETURN square.square_0 data:9745edaf-a82c-42dd-bae8-889bca509c8b int (9745eda...) proc:b3385064523544cab5720e2dc03528d1->data:9745edaf-a82c-42dd-bae8-889bca509c8b RETURN square.square_1 data:473c28ae-b292-49d4-86fc-740b275d8e5e int (473c28a...) proc:b3385064523544cab5720e2dc03528d1->data:473c28ae-b292-49d4-86fc-740b275d8e5e RETURN square.square_2 data:fbf2963c-f859-4e9e-b984-b3a17a28c767 int (fbf2963...) proc:b3385064523544cab5720e2dc03528d1->data:fbf2963c-f859-4e9e-b984-b3a17a28c767 RETURN add_multiply1.sum data:17767b29-f75e-41cd-aa89-ab12fd498b05 int (17767b2...) proc:b3385064523544cab5720e2dc03528d1->data:17767b29-f75e-41cd-aa89-ab12fd498b05 RETURN add_multiply1.product data:a7f1a7be-6725-43c7-9be1-80394e542055 int (a7f1a7b...) proc:b3385064523544cab5720e2dc03528d1->data:a7f1a7be-6725-43c7-9be1-80394e542055 RETURN add_multiply2.sum data:4c9761e6-d503-4fff-852f-185b273562e7 int (4c9761e...) proc:b3385064523544cab5720e2dc03528d1->data:4c9761e6-d503-4fff-852f-185b273562e7 RETURN add_multiply2.product proc:457398a1f7cf48b4beaec5d52d2f1ada->data:bbe9ca63-dd00-4117-9a2d-62b02269d41f CREATE square_0 proc:457398a1f7cf48b4beaec5d52d2f1ada->data:9745edaf-a82c-42dd-bae8-889bca509c8b CREATE square_1 proc:457398a1f7cf48b4beaec5d52d2f1ada->data:473c28ae-b292-49d4-86fc-740b275d8e5e CREATE square_2 proc:3a8f5e14c4114328a171b9f1fa17add9->data:fbf2963c-f859-4e9e-b984-b3a17a28c767 CREATE sum proc:3a8f5e14c4114328a171b9f1fa17add9->data:17767b29-f75e-41cd-aa89-ab12fd498b05 CREATE product proc:08b4f27d00ac4c318581d097b906d694->data:a7f1a7be-6725-43c7-9be1-80394e542055 CREATE sum proc:08b4f27d00ac4c318581d097b906d694->data:4c9761e6-d503-4fff-852f-185b273562e7 CREATE product data:e5067d56-9796-4646-bd3e-8d0dfe7e583d int (e5067d5...) data:e5067d56-9796-4646-bd3e-8d0dfe7e583d->proc:b3385064523544cab5720e2dc03528d1 INPUT n data:e5067d56-9796-4646-bd3e-8d0dfe7e583d->proc:457398a1f7cf48b4beaec5d52d2f1ada INPUT n data:2b3f369e-916a-47c7-ac86-ea093d1681cf int (2b3f369...) data:2b3f369e-916a-47c7-ac86-ea093d1681cf->proc:b3385064523544cab5720e2dc03528d1 INPUT data.add_multiply1.data.x data:2b3f369e-916a-47c7-ac86-ea093d1681cf->proc:3a8f5e14c4114328a171b9f1fa17add9 INPUT data.x data:2a293179-d12b-4ed4-8d03-3999211af4a6 int (2a29317...) data:2a293179-d12b-4ed4-8d03-3999211af4a6->proc:b3385064523544cab5720e2dc03528d1 INPUT data.add_multiply1.data.y data:2a293179-d12b-4ed4-8d03-3999211af4a6->proc:3a8f5e14c4114328a171b9f1fa17add9 INPUT data.y data:69e8ffab-2e62-4a03-b173-e0e9d38270d7 int (69e8ffa...) data:69e8ffab-2e62-4a03-b173-e0e9d38270d7->proc:b3385064523544cab5720e2dc03528d1 INPUT data.add_multiply2.data.x data:69e8ffab-2e62-4a03-b173-e0e9d38270d7->proc:08b4f27d00ac4c318581d097b906d694 INPUT data.x data:b5ee5e52-1293-489c-ad93-3800bf19de24 int (b5ee5e5...) data:b5ee5e52-1293-489c-ad93-3800bf19de24->proc:b3385064523544cab5720e2dc03528d1 INPUT data.add_multiply2.data.y data:b5ee5e52-1293-489c-ad93-3800bf19de24->proc:08b4f27d00ac4c318581d097b906d694 INPUT data.y


Note how the outputs of the various tasks are exposed (linked) to the graph.

Reshaping specifications with select

Often, you’ll want to reuse parts of a specification while modifying it. For example, you might want to exclude a field that is provided by another source or rename fields for clarity. This can be done declaratively using select.

The main parameters are:

  • include=... / exclude=...: Keep or drop fields (supports dotted paths for nested fields).

  • include_prefix=... / exclude_prefix=...: Filter top-level fields by name prefix.

  • rename={old:new}: Rename top-level fields.

  • prefix="p_": Add a prefix to all top-level fields.

Important

To apply a select, you must place it in the metadata list of t.Annotated[<type>, ...] alongside the specification you are modifying.

Let’s see an example where we build a graph that runs a task twice. We want a shared structure input at the graph level, so we must exclude it from the nested input specifications for each task call.

from node_graph.socket_spec import meta, select


@task()
def consume_complex(
    data: t.Annotated[
        dict,
        namespace(
            pw=namespace(structure=int, kpoints=int, parameters=int),
            metadata=dict,
        ),
    ],
) -> dict:
    return {"seen": list(sorted(data.keys()))}


# Exclude a nested field (drop data.pw.structure)
# because it's provided separately as a graph input
# and shared between the two task calls.
@task.graph()
def UseExclude(
    structure,
    inputs: t.Annotated[
        dict,
        namespace(
            consume_complex1=t.Annotated[
                dict, consume_complex.inputs, select(exclude="data.pw.structure")
            ],
            consume_complex2=t.Annotated[
                dict, consume_complex.inputs, select(exclude="data.pw.structure")
            ],
        ),
    ],
):
    # Manually add the shared 'structure' to each task's inputs
    consume_complex_input1 = inputs["consume_complex1"]
    consume_complex_input1["data"]["pw"]["structure"] = structure
    consume_complex(**consume_complex_input1)

    consume_complex_input2 = inputs["consume_complex2"]
    consume_complex_input2["data"]["pw"]["structure"] = structure
    consume_complex(**consume_complex_input2)


wg = UseExclude.build(
    structure=1,
    inputs={
        "consume_complex1": {
            "data": {"pw": {"kpoints": 2, "parameters": 3}, "metadata": {}}
        },
        "consume_complex2": {
            "data": {"pw": {"kpoints": 4, "parameters": 5}, "metadata": {}}
        },
    },
)
wg


In the GUI representation, you can see that the graph has a top-level structure input, and the nested inputs inputs.consume_complex1.data.pw and inputs.consume_complex2.data.pw are missing the structure socket, just as we specified.

Modifying specification metadata

You can also set namespace-level metadata declaratively using meta, for example, to mark a reused specification as optional.

  • meta(required=False): Makes the input optional.

  • meta(is_metadata=True): Marks the input as metadata-only.

It is attached alongside select in the same Annotated metadata list.

@task.graph()
def UseMeta(
    data: t.Annotated[
        dict,
        consume_complex.inputs,  # Reuse the original spec
        meta(required=False),  # Make the entire 'data' input optional
    ],
):
    if data:
        return consume_complex(data=data)


# Attaching custom metadata
# ~~~~~~~~~~~~~~~~~~~~~~~~~
# The ``meta`` helper also exposes a generic ``extras=`` dictionary if you need to carry
# arbitrary metadata through the graph (for example, GUI hints or backend-specific flags).
# These keys are preserved on the socket and available to downstream tooling.


@task()
def AnnotateExtras(
    payload: dict,
) -> t.Annotated[
    dict,
    meta(
        extras={
            "ui:panel": "energy-details",
            "source": "user-provided",
        }
    ),
]:
    return payload

meta also accepts a semantics= payload to attach ontology metadata; see Use annotations to capture ontology semantics for a dedicated walkthrough.

Conclusion

You now know how to annotate task and graph inputs and outputs in node-graph. By leveraging static (namespace), dynamic (dynamic), nested namespaces, and structured models, you can precisely control data serialization and create transparent data lineages.

The key takeaways are:

  • Annotate task/graph outputs to unpack results into individual AiiDA nodes.

  • Annotate inputs to specify input structures.

  • Employ dynamic (or Pydantic models with extra='allow') for tasks with a variable number of outputs.

  • Use Pydantic model, dataclass, or TypedDict for reusable, validated schemas:
    • Plain BaseModel, dataclass, or TypedDict –> expanded namespace

    • model_config={'extra':'allow', 'item_type': T} –> dynamic namespace

    • model_config={'leaf': True} or Leaf[Model] –> single leaf (blob)

  • Use Unpack[TypedDict] to expand **kwargs into a fixed namespace.

  • Reuse .inputs and .outputs specifications at the graph level to build modular and robust workflows.

  • Use select inside Annotated to reshape reused specifications (e.g., include/exclude with dotted paths, rename, prefix).

  • Use meta to modify metadata of a specification, such as making it optional.

These tools are fundamental to building reproducible and verifiable scientific workflows with complete data provenance.

Total running time of the script: (0 minutes 0.302 seconds)

Gallery generated by Sphinx-Gallery