Use annotations to control data provenance

Introduction

In node-graph, a critical feature is tracking task inputs and outputs to ensure data provenance and reproducibility. To achieve this, task functions must be annotated with input and output specifications. This tells the engine how to handle, serialize, and store data as individual data nodes.

This process addresses two key aspects of provenance:

  • Data creation: How should data be created and stored? For example, if a task returns a nested dictionary, should it be stored as a single entity or unpacked into separate nodes?

  • Data lineage: Where does the data come from? How does it flow between tasks in the workflow?

This guide will walk you through the various ways to annotate your tasks.

import typing as t

from node_graph import dynamic, namespace, task

Data creation

Static namespaces for outputs

Let’s define two tasks that perform the same calculation but have different output annotations.

The first task, add_multiply1, has no output specification. Consequently, the returned dictionary will be stored as a single Dict task. The second task, add_multiply2, uses an output namespace to specify that each key-value pair in the dictionary should be stored as a separate task.

@task()
def add_multiply1(x, y):
    """Return a dictionary, which will be stored as a single Dict task."""
    return {"sum": x + y, "product": x * y}


@task()
def add_multiply2(
    x: int,
    y: int,
) -> t.Annotated[dict, namespace(sum=int, product=int),]:
    """Return a dictionary, but its elements are stored as separate Int nodes."""
    return {"sum": x + y, "product": x * y}


@task.graph()
def AddMultiply(x: int, y: int):
    """A graph to run both versions of the add_multiply task."""
    add_multiply1(x=x, y=y)
    add_multiply2(x=x, y=y)


wg = AddMultiply.build(x=1, y=2)
wg.run()
{}

Note

In add_multiply2, we also annotated the input types (x: int, y: int). This adds a layer of validation to ensure only integers are passed to the task. This feature is experimental - its API and behavior may change in future releases.

Let’s visualize the data provenance of our executed workflow:

wg.engine.recorder
provenance proc:764b35ff21c046de9aa26f3554565a8b AddMultiply (764b35f...) state=FINISHED proc:0d5ced737afc4c6abbc261fd6ac30aa9 add_multiply1 (0d5ced7...) state=FINISHED proc:764b35ff21c046de9aa26f3554565a8b->proc:0d5ced737afc4c6abbc261fd6ac30aa9 CALL proc:1955ad7fa7cd47b893bc61c5ff6a098c add_multiply2 (1955ad7...) state=FINISHED proc:764b35ff21c046de9aa26f3554565a8b->proc:1955ad7fa7cd47b893bc61c5ff6a098c CALL data:42318b24-609a-47f3-ad30-022ce38f14f9 dict (42318b2...) proc:0d5ced737afc4c6abbc261fd6ac30aa9->data:42318b24-609a-47f3-ad30-022ce38f14f9 CREATE result data:65258fd0-6436-4225-ad2f-e230c49e9c5e int (65258fd...) proc:1955ad7fa7cd47b893bc61c5ff6a098c->data:65258fd0-6436-4225-ad2f-e230c49e9c5e CREATE sum data:c5822d54-d87e-4900-8dd6-84b3a38eeabc int (c5822d5...) proc:1955ad7fa7cd47b893bc61c5ff6a098c->data:c5822d54-d87e-4900-8dd6-84b3a38eeabc CREATE product data:74ca2159-37df-43de-8850-a695abff2c65 int (74ca215...) data:74ca2159-37df-43de-8850-a695abff2c65->proc:764b35ff21c046de9aa26f3554565a8b INPUT x data:74ca2159-37df-43de-8850-a695abff2c65->proc:0d5ced737afc4c6abbc261fd6ac30aa9 INPUT x data:74ca2159-37df-43de-8850-a695abff2c65->proc:1955ad7fa7cd47b893bc61c5ff6a098c INPUT x data:5bbeb9f9-6813-4f8d-b22a-c5320a0d2099 int (5bbeb9f...) data:5bbeb9f9-6813-4f8d-b22a-c5320a0d2099->proc:764b35ff21c046de9aa26f3554565a8b INPUT y data:5bbeb9f9-6813-4f8d-b22a-c5320a0d2099->proc:0d5ced737afc4c6abbc261fd6ac30aa9 INPUT y data:5bbeb9f9-6813-4f8d-b22a-c5320a0d2099->proc:1955ad7fa7cd47b893bc61c5ff6a098c INPUT y


As the provenance graph shows, add_multiply1 has a single output task (result), while add_multiply2 has two separate output nodes (sum and product), as defined in its namespace.

Static namespaces for inputs

Similarly, we can annotate inputs to unpack a dictionary into distinct data nodes. We use Python’s standard typing.Annotated to attach the node-graph namespace metadata to the input type.

@task()
def add_multiply3(
    data: t.Annotated[
        dict,
        namespace(x=int, y=int),
    ],
) -> t.Annotated[dict, namespace(sum=int, product=int),]:
    """Take a dictionary as input but treat 'x' and 'y' as separate nodes."""
    return {"sum": data["x"] + data["y"], "product": data["x"] * data["y"]}


@task.graph()
def AddMultiplyInputs(x: int, y: int):
    add_multiply3(data={"x": x, "y": y})


wg = AddMultiplyInputs.build(x=1, y=2)
wg


Note how the x input is passed to data.x (and similarly for y). This is due to the namespace specifications.

wg.run()
{}

Finally, we can inspect the provenance graph for this workflow:

wg.engine.recorder
provenance proc:9fad11012b4946adb5c871dd4944c6c7 AddMultiplyInputs (9fad110...) state=FINISHED proc:25b9e0d8cba8411d97a682bb1eb73de1 add_multiply3 (25b9e0d...) state=FINISHED proc:9fad11012b4946adb5c871dd4944c6c7->proc:25b9e0d8cba8411d97a682bb1eb73de1 CALL data:256e1976-cc1c-4252-83f9-4787a4d7056f int (256e197...) proc:25b9e0d8cba8411d97a682bb1eb73de1->data:256e1976-cc1c-4252-83f9-4787a4d7056f CREATE sum data:995f2b07-8555-4d95-8010-377b18752e8c int (995f2b0...) proc:25b9e0d8cba8411d97a682bb1eb73de1->data:995f2b07-8555-4d95-8010-377b18752e8c CREATE product data:ab5fad84-32d0-457c-bfc0-e5578789c796 int (ab5fad8...) data:ab5fad84-32d0-457c-bfc0-e5578789c796->proc:9fad11012b4946adb5c871dd4944c6c7 INPUT x data:ab5fad84-32d0-457c-bfc0-e5578789c796->proc:25b9e0d8cba8411d97a682bb1eb73de1 INPUT data.x data:a913539c-b2b2-4286-ba53-a91276e3b472 int (a913539...) data:a913539c-b2b2-4286-ba53-a91276e3b472->proc:9fad11012b4946adb5c871dd4944c6c7 INPUT y data:a913539c-b2b2-4286-ba53-a91276e3b472->proc:25b9e0d8cba8411d97a682bb1eb73de1 INPUT data.y


We can see that even though we passed the inputs as a single dictionary, they were serialized as two separate Int nodes, x and y, before being passed to the task.

Dynamic namespaces

Sometimes, the number and names of outputs are not known until the task runs. Dynamic namespaces are designed for this scenario.

This is particularly useful for tasks that generate a variable number of outputs based on their inputs.

@task()
def generate_square_numbers(
    n: int,
) -> t.Annotated[dict, dynamic(t.Any),]:
    """Generate a dict of square numbers. The number of outputs depends on 'n'."""
    return {f"square_{i}": i**2 for i in range(n)}


@task.graph()
def SquareNumbersGenerator(n: int):
    generate_square_numbers(n=n)


wg = SquareNumbersGenerator.build(n=5)
wg.run()
{}

Let’s examine the provenance of this dynamic workflow:

wg.engine.recorder
provenance proc:4572aca61194402d9c6e756a86fd527f SquareNumbersGenerator (4572aca...) state=FINISHED proc:01f67f63e03d4619b29293a7ac7824e5 generate_square_numbers (01f67f6...) state=FINISHED proc:4572aca61194402d9c6e756a86fd527f->proc:01f67f63e03d4619b29293a7ac7824e5 CALL data:52052ef6-6b52-4422-b130-ba427adde1c3 int (52052ef...) proc:01f67f63e03d4619b29293a7ac7824e5->data:52052ef6-6b52-4422-b130-ba427adde1c3 CREATE square_0 data:601b8bcd-2ce2-4822-98cb-07e0ca8804cc int (601b8bc...) proc:01f67f63e03d4619b29293a7ac7824e5->data:601b8bcd-2ce2-4822-98cb-07e0ca8804cc CREATE square_1 data:2f84f3c8-919d-48aa-82fe-b3f2522b6c8b int (2f84f3c...) proc:01f67f63e03d4619b29293a7ac7824e5->data:2f84f3c8-919d-48aa-82fe-b3f2522b6c8b CREATE square_2 data:19cc9436-c749-4abd-8b36-aed6268c4ba2 int (19cc943...) proc:01f67f63e03d4619b29293a7ac7824e5->data:19cc9436-c749-4abd-8b36-aed6268c4ba2 CREATE square_3 data:aab5be38-f635-4c4b-8eda-e821d91cbe64 int (aab5be3...) proc:01f67f63e03d4619b29293a7ac7824e5->data:aab5be38-f635-4c4b-8eda-e821d91cbe64 CREATE square_4 data:aa123a65-4fc2-4d8b-ae94-73168448ee48 int (aa123a6...) data:aa123a65-4fc2-4d8b-ae94-73168448ee48->proc:4572aca61194402d9c6e756a86fd527f INPUT n data:aa123a65-4fc2-4d8b-ae94-73168448ee48->proc:01f67f63e03d4619b29293a7ac7824e5 INPUT n


The graph shows that the generate_square_numbers task has multiple output nodes, one for each entry in the dynamically generated dictionary. The dynamic(typing.Any) specification treats each value in the returned dictionary as a separate output node of any type.

Note

If no item type is specified (i.e., just dynamic()), the namespace becomes fully dynamic, enabling arbitrary nested structures (e.g., dictionaries). Each value will then be recursively treated as a leaf node.

Nested namespaces

Namespaces can be nested to represent complex, structured data. Let’s define a task that returns a nested dictionary.

@task()
def generate_nested_dict(
    x: int,
    y: int,
) -> t.Annotated[dict, namespace(sum=int, nested=namespace(diff=int, product=int)),]:
    """Returns a nested dictionary with a corresponding nested namespace."""
    return {"sum": x + y, "nested": {"diff": x - y, "product": x * y}}


@task.graph()
def NestedDictGenerator(x: int, y: int):
    generate_nested_dict(x=x, y=y)


wg = NestedDictGenerator.build(x=1, y=2)
wg.run()
{}

Visualize the full graph provenance:

wg.engine.recorder
provenance proc:ae87cf22f4ce4365bc68d6139a989ddc NestedDictGenerator (ae87cf2...) state=FINISHED proc:b5027e00dc0f4990b150120b88a8f1f4 generate_nested_dict (b5027e0...) state=FINISHED proc:ae87cf22f4ce4365bc68d6139a989ddc->proc:b5027e00dc0f4990b150120b88a8f1f4 CALL data:833849bd-9091-4cd9-8b42-67d919ece6f8 int (833849b...) proc:b5027e00dc0f4990b150120b88a8f1f4->data:833849bd-9091-4cd9-8b42-67d919ece6f8 CREATE sum data:22d37379-362d-4a67-8034-f2660686c4e9 int (22d3737...) proc:b5027e00dc0f4990b150120b88a8f1f4->data:22d37379-362d-4a67-8034-f2660686c4e9 CREATE nested.diff data:77321dd2-6a97-446f-972b-3bebb41818fe int (77321dd...) proc:b5027e00dc0f4990b150120b88a8f1f4->data:77321dd2-6a97-446f-972b-3bebb41818fe CREATE nested.product data:48266007-6fd9-4399-b5f2-dd803072c977 int (4826600...) data:48266007-6fd9-4399-b5f2-dd803072c977->proc:ae87cf22f4ce4365bc68d6139a989ddc INPUT x data:48266007-6fd9-4399-b5f2-dd803072c977->proc:b5027e00dc0f4990b150120b88a8f1f4 INPUT x data:f5674aff-f677-431c-8ccb-f0222419a9ad int (f5674af...) data:f5674aff-f677-431c-8ccb-f0222419a9ad->proc:ae87cf22f4ce4365bc68d6139a989ddc INPUT y data:f5674aff-f677-431c-8ccb-f0222419a9ad->proc:b5027e00dc0f4990b150120b88a8f1f4 INPUT y


The summary confirms that the output is correctly structured with a top-level sum and a nested nested dictionary, just as defined in the namespace.

We can also combine dynamic and nested namespaces.

@task()
def generate_dynamic_nested_dict(
    n: int,
) -> t.Annotated[dict, dynamic(namespace(square=int, cube=int)),]:
    """Generate a nested dict of square and cube numbers from 0 to n."""
    return {f"data_{i}": {"square": i**2, "cube": i**3} for i in range(n)}


@task.graph()
def DynamicNestedDictGenerator(n: int):
    generate_dynamic_nested_dict(n=n)


wg = DynamicNestedDictGenerator.build(n=3)
wg.run()
{}

Visualize the provenance:

wg.engine.recorder
provenance proc:a50e37fe49794ffa962d31de41f0c701 DynamicNestedDictGenerator (a50e37f...) state=FINISHED proc:a4dade1ddf3d4df18fa6c1095acee0ab generate_dynamic_nested_dict (a4dade1...) state=FINISHED proc:a50e37fe49794ffa962d31de41f0c701->proc:a4dade1ddf3d4df18fa6c1095acee0ab CALL data:bbf76813-a5c8-4d44-bdf7-e5a950ff4a02 int (bbf7681...) proc:a4dade1ddf3d4df18fa6c1095acee0ab->data:bbf76813-a5c8-4d44-bdf7-e5a950ff4a02 CREATE data_0.square data:9d069e82-15d4-4908-8ffb-84be6aa25c14 int (9d069e8...) proc:a4dade1ddf3d4df18fa6c1095acee0ab->data:9d069e82-15d4-4908-8ffb-84be6aa25c14 CREATE data_0.cube data:f940f2a0-b6d2-4e4e-badf-86649841d1da int (f940f2a...) proc:a4dade1ddf3d4df18fa6c1095acee0ab->data:f940f2a0-b6d2-4e4e-badf-86649841d1da CREATE data_1.square data:20cfd746-8d02-4561-9d42-2d9f644a6f5e int (20cfd74...) proc:a4dade1ddf3d4df18fa6c1095acee0ab->data:20cfd746-8d02-4561-9d42-2d9f644a6f5e CREATE data_1.cube data:beba07e0-9b87-4c52-b92d-a9e0ce2c8d27 int (beba07e...) proc:a4dade1ddf3d4df18fa6c1095acee0ab->data:beba07e0-9b87-4c52-b92d-a9e0ce2c8d27 CREATE data_2.square data:d116c2a4-bc99-4b41-a311-df3e32145b48 int (d116c2a...) proc:a4dade1ddf3d4df18fa6c1095acee0ab->data:d116c2a4-bc99-4b41-a311-df3e32145b48 CREATE data_2.cube data:adeae5df-e292-4a71-9cb7-e5073deec948 int (adeae5d...) data:adeae5df-e292-4a71-9cb7-e5073deec948->proc:a50e37fe49794ffa962d31de41f0c701 INPUT n data:adeae5df-e292-4a71-9cb7-e5073deec948->proc:a4dade1ddf3d4df18fa6c1095acee0ab INPUT n


The output shows a dictionary with dynamic keys (data_0, data_1, etc.), where each value is itself a dictionary with a fixed square and cube structure, as specified by dynamic(namespace(...)). Using TypedDict —————-

TypedDict works like a static namespace with one socket per key. Optional keys (total=False) become optional sockets.

from typing import TypedDict


class XYIn(TypedDict):
    x: int
    y: int


class AddMultiplyOut(TypedDict):
    sum: int
    product: int


@task()
def add_multiply_typed_dict(**kwargs) -> AddMultiplyOut:
    return {"sum": kwargs["x"] + kwargs["y"], "product": kwargs["x"] * kwargs["y"]}


@task.graph()
def AddMultiplyTypedDict():
    add_multiply_typed_dict(x=2, y=5)


wg = AddMultiplyTypedDict.build()
wg.run()
wg.engine.recorder
provenance proc:336f295da3664a05add6b108641aee17 AddMultiplyTypedDict (336f295...) state=FINISHED proc:c1c472ddf27b450990f868758b8d6d90 add_multiply_typed_dict (c1c472d...) state=FINISHED proc:336f295da3664a05add6b108641aee17->proc:c1c472ddf27b450990f868758b8d6d90 CALL data:9a48bc9f-f260-4f7f-a613-b7fa2e2a92b0 int (9a48bc9...) proc:c1c472ddf27b450990f868758b8d6d90->data:9a48bc9f-f260-4f7f-a613-b7fa2e2a92b0 CREATE sum data:ea468fca-4eb2-4e14-8c57-466861be1cee int (ea468fc...) proc:c1c472ddf27b450990f868758b8d6d90->data:ea468fca-4eb2-4e14-8c57-466861be1cee CREATE product data:446e17f5-b7f2-41b3-99f1-a8370cda89dd int (446e17f...) data:446e17f5-b7f2-41b3-99f1-a8370cda89dd->proc:c1c472ddf27b450990f868758b8d6d90 INPUT x data:72824a5d-b6d1-4100-94b3-09656fa1aded int (72824a5...) data:72824a5d-b6d1-4100-94b3-09656fa1aded->proc:c1c472ddf27b450990f868758b8d6d90 INPUT y


Note

You can also expand **kwargs with Unpack[YourTypedDict] to make the inputs explicit in the function signature (same socket expansion). Optional keys can be expressed with total=False or NotRequired. NotRequired requires Python 3.11+ (or typing_extensions).

%% Using Pydantic models ———————

You can use Pydantic models in annotations as a more structured, reusable way to define namespaces. By default, a BaseModel expands to a static namespace with one socket per field.

If you want a dynamic namespace, set model_config = {"extra": "allow"} and (optionally) "item_type" for the type of each dynamic value. If you want to treat a model as a single leaf (blob), set model_config = {"leaf": True} or use Leaf[YourModel] in the annotation.

from pydantic import BaseModel
from node_graph.socket_spec import Leaf


class OutputsModel(BaseModel):
    sum: int
    product: int


@task()
def add_multiply_pydantic_in_out(x, y) -> OutputsModel:
    return {"sum": x + y, "product": x * y}


@task.graph()
def AddMultiplyPydantic():
    # IMPORTANT: pass a plain dict, not OutputsModel(x=3, y=4)
    add_multiply_pydantic_in_out(x=3, y=4)


wg = AddMultiplyPydantic.build()
wg.run()
wg.engine.recorder
provenance proc:e829ef7156d1496aaff6f1dfd72332fc AddMultiplyPydantic (e829ef7...) state=FINISHED proc:918869d19c6d4ae7bf6e958049dc52f6 add_multiply_pydantic_in_out (918869d...) state=FINISHED proc:e829ef7156d1496aaff6f1dfd72332fc->proc:918869d19c6d4ae7bf6e958049dc52f6 CALL data:c9752881-b315-4e1c-9eb0-5ae9b04cd4cd int (c975288...) proc:918869d19c6d4ae7bf6e958049dc52f6->data:c9752881-b315-4e1c-9eb0-5ae9b04cd4cd CREATE sum data:07fa3701-74a4-44c6-b42a-35e6111be8a5 int (07fa370...) proc:918869d19c6d4ae7bf6e958049dc52f6->data:07fa3701-74a4-44c6-b42a-35e6111be8a5 CREATE product data:7d37ee93-aab5-43d9-b782-c0fda1322e8f int (7d37ee9...) data:7d37ee93-aab5-43d9-b782-c0fda1322e8f->proc:918869d19c6d4ae7bf6e958049dc52f6 INPUT x data:64c7cb7d-68f7-4406-9565-8e910c7f9fb0 int (64c7cb7...) data:64c7cb7d-68f7-4406-9565-8e910c7f9fb0->proc:918869d19c6d4ae7bf6e958049dc52f6 INPUT y


Dynamic Pydantic models

Mark a model as dynamic with extra='allow'. Add item_type to specify the per-key value type. Fixed fields still appear as normal sockets alongside your dynamic keys.

class DynamicOut(BaseModel):
    model_config = {"extra": "allow", "item_type": int}

    header: int = 42  # fixed (non-dynamic) field


@task()
def make_dynamic_with_model(n: int) -> DynamicOut:
    # fixed field + dynamic keys with int values
    return {"header": 100, **{f"k{i}": i * i for i in range(n)}}


@task.graph()
def GraphDynamicOut(n: int):
    make_dynamic_with_model(n=n)


wg = GraphDynamicOut.build(n=4)
wg.run()
wg.engine.recorder
provenance proc:cdf1f08924454428aac6aef073c6c76a GraphDynamicOut (cdf1f08...) state=FINISHED proc:82a1adf3f16340428caee346359e61f5 make_dynamic_with_model (82a1adf...) state=FINISHED proc:cdf1f08924454428aac6aef073c6c76a->proc:82a1adf3f16340428caee346359e61f5 CALL data:c33e5e09-b9b2-4b06-afb0-efd079f5ba1d int (c33e5e0...) proc:82a1adf3f16340428caee346359e61f5->data:c33e5e09-b9b2-4b06-afb0-efd079f5ba1d CREATE header data:48121111-4718-444a-9eca-87582f0feb03 int (4812111...) proc:82a1adf3f16340428caee346359e61f5->data:48121111-4718-444a-9eca-87582f0feb03 CREATE k0 data:e5f80e3e-f912-4880-812b-034c951c394d int (e5f80e3...) proc:82a1adf3f16340428caee346359e61f5->data:e5f80e3e-f912-4880-812b-034c951c394d CREATE k1 data:0b135b7b-8ce4-4459-bd18-d2d582feec39 int (0b135b7...) proc:82a1adf3f16340428caee346359e61f5->data:0b135b7b-8ce4-4459-bd18-d2d582feec39 CREATE k2 data:eaf6d968-c8f3-4bcb-b824-6b4fae01c447 int (eaf6d96...) proc:82a1adf3f16340428caee346359e61f5->data:eaf6d968-c8f3-4bcb-b824-6b4fae01c447 CREATE k3 data:2c4021d9-7572-4fc2-b9c7-7ac13b3664eb int (2c4021d...) data:2c4021d9-7572-4fc2-b9c7-7ac13b3664eb->proc:cdf1f08924454428aac6aef073c6c76a INPUT n data:2c4021d9-7572-4fc2-b9c7-7ac13b3664eb->proc:82a1adf3f16340428caee346359e61f5 INPUT n


Leaf Pydantic models (single blob)

Sometimes you want to validate with a Pydantic model but store it as a single node instead of expanding fields. For leaf models, the value is treated as a blob; the socket stays annotated and the object is stored as-is. There are two ways:

  1. Mark the model: model_config = {"leaf": True}

  2. Per-use override: annotate with Leaf[YourModel]

class BlobModel(BaseModel):
    model_config = {"leaf": True}  # always a leaf blob

    a: int
    b: int


@task()
def consume_blob(m: BlobModel) -> dict:
    # 'm' is validated by Pydantic but stored/treated as one leaf node
    return {"sum": m.a + m.b}


# Per-use override without modifying the model:
class AnotherModel(BaseModel):
    a: int
    b: int


@task()
def consume_blob_per_use(m: Leaf[AnotherModel]) -> dict:
    return {"sum": m.a + m.b}


@task.graph()
def BlobExamples():
    consume_blob(m=BlobModel(a=1, b=2))
    consume_blob_per_use(m=AnotherModel(a=3, b=4))


wg = BlobExamples.build()
wg.run()
wg.engine.recorder
provenance proc:99daa371522b4c51a6a71c7118588107 BlobExamples (99daa37...) state=FINISHED proc:5a1fa6adf8ea467bb8bea890237b3866 consume_blob (5a1fa6a...) state=FINISHED proc:99daa371522b4c51a6a71c7118588107->proc:5a1fa6adf8ea467bb8bea890237b3866 CALL proc:94f5423268524a988893c221b37d33a0 consume_blob_per_use (94f5423...) state=FINISHED proc:99daa371522b4c51a6a71c7118588107->proc:94f5423268524a988893c221b37d33a0 CALL data:f7413721-27e0-499e-af14-415dad5259c9 dict (f741372...) proc:5a1fa6adf8ea467bb8bea890237b3866->data:f7413721-27e0-499e-af14-415dad5259c9 CREATE result data:8a9feab0-4d2c-4b90-8f50-2d94ddd598df dict (8a9feab...) proc:94f5423268524a988893c221b37d33a0->data:8a9feab0-4d2c-4b90-8f50-2d94ddd598df CREATE result data:cc024e74-e5f5-44e1-96dd-bb868ef3ec1a BlobModel (cc024e7...) data:cc024e74-e5f5-44e1-96dd-bb868ef3ec1a->proc:5a1fa6adf8ea467bb8bea890237b3866 INPUT m data:dc9cb2ac-2ef4-4b3d-90fd-9424bfe09875 AnotherModel (dc9cb2a...) data:dc9cb2ac-2ef4-4b3d-90fd-9424bfe09875->proc:94f5423268524a988893c221b37d33a0 INPUT m


Nested Pydantic models

Pydantic models can be nested to represent complex data structures. You can mix static, dynamic, and leaf models as needed.

class InnerModel(BaseModel):
    value: int


class OuterModel(BaseModel):
    name: str
    inner: InnerModel

Using dataclasses

Dataclasses work just like Pydantic models for annotations:

  • Plain dataclass -> expanded static namespace (one socket per field)

  • model_config={‘extra’: ‘allow’, ‘item_type’: T} -> dynamic namespace

  • model_config={‘leaf’: True} or Leaf[YourDataclass] -> single leaf (blob)

from dataclasses import dataclass


@dataclass
class DCOutputs:
    sum: int
    product: int


@task()
def add_multiply_dc_in_out(x, y) -> DCOutputs:
    return {"sum": x + y, "product": x * y}


@task.graph()
def AddMultiplyDataclass():
    # IMPORTANT: pass a plain dict, not DCInputs(...)
    add_multiply_dc_in_out(x=2, y=5)


wg = AddMultiplyDataclass.build()
wg.run()
wg.engine.recorder
provenance proc:d03c3f83ae3a41f9810426fe6b3a37f8 AddMultiplyDataclass (d03c3f8...) state=FINISHED proc:e5fae430169846f68994fbced1c68f4e add_multiply_dc_in_out (e5fae43...) state=FINISHED proc:d03c3f83ae3a41f9810426fe6b3a37f8->proc:e5fae430169846f68994fbced1c68f4e CALL data:e5447129-270e-4ca2-a81a-e222a0e35f5f int (e544712...) proc:e5fae430169846f68994fbced1c68f4e->data:e5447129-270e-4ca2-a81a-e222a0e35f5f CREATE sum data:9090cf37-4fa0-486d-bd47-7e3cdbbba514 int (9090cf3...) proc:e5fae430169846f68994fbced1c68f4e->data:9090cf37-4fa0-486d-bd47-7e3cdbbba514 CREATE product data:70592e3f-3e02-4545-9646-bf5bb0c985d9 int (70592e3...) data:70592e3f-3e02-4545-9646-bf5bb0c985d9->proc:e5fae430169846f68994fbced1c68f4e INPUT x data:d1f5af6b-3210-4ccf-beb6-7496a660f4d1 int (d1f5af6...) data:d1f5af6b-3210-4ccf-beb6-7496a660f4d1->proc:e5fae430169846f68994fbced1c68f4e INPUT y


Important

Structured models (Pydantic or dataclasses) are supported as runtime values. You may pass instances to tasks/graphs and return them from tasks:

  • Instances are expanded to plain dicts when assigned to namespace sockets, so Graph can wire provenance edges precisely (e.g., data.x –> task.data.x).

  • Graph inputs can still be collected from task outputs as a dict of AiiDA ORM nodes, preserving AiiDA links between nodes.

  • Validation still happens via the Graph spec (derived from your annotations).

Data linkage

Data linkage tracks the flow of data between tasks. At the workflow level, a task.graph can define its own inputs and outputs, providing a clean interface to a complex chain of tasks. node-graph validates data against these graph-level specifications and automatically links graph inputs to the appropriate task inputs.

In this final example, we will build a graph that reuses the input and output specifications from the tasks it contains. This is a powerful feature for building complex, modular, and self-consistent workflows.

@task()
def add_multiply(
    data: t.Annotated[
        dict,
        namespace(x=int, y=int),
    ],
) -> t.Annotated[dict, namespace(sum=int, product=int),]:
    """A reusable task with well-defined I/O specifications."""
    return {"sum": data["x"] + data["y"], "product": data["x"] * data["y"]}


@task.graph()
def AddMultiplyFinal(
    n: int,
    data: t.Annotated[
        dict,
        namespace(
            add_multiply1=add_multiply.inputs,
            add_multiply2=add_multiply.inputs,
        ),
    ],
) -> t.Annotated[
    dict,
    namespace(
        square=generate_square_numbers.outputs,
        add_multiply1=add_multiply.outputs,
        add_multiply2=add_multiply.outputs,
    ),
]:
    """A complex graph demonstrating I/O reuse and data linkage."""
    square_numbers = generate_square_numbers(n)

    # Unpack nested inputs and pass them to the respective tasks
    out1 = add_multiply(data=data["add_multiply1"]["data"])
    out2 = add_multiply(data=data["add_multiply2"]["data"])

    # Gather task outputs into the graph-level output structure
    return {"square": square_numbers, "add_multiply1": out1, "add_multiply2": out2}


wg = AddMultiplyFinal.build(
    n=3,
    data={
        "add_multiply1": {"data": {"x": 1, "y": 2}},
        "add_multiply2": {"data": {"x": 3, "y": 4}},
    },
)
wg


In the example above:

  • Graph outputs: The outputs are annotated with a nested namespace that defines the shape of the final result. Here we reuse generate_square_numbers.outputs and add_multiply.outputs to ensure the graph’s output signature is consistent with the tasks it contains.

  • Graph inputs: The data input is annotated with a nested namespace that reuses add_multiply.inputs. This allows node-graph to validate the complex input dictionary and create the correct data links.

In the GUI representation of the Graph, you will see how the nested inputs are correctly wired. For instance, there is a direct link from the graph input socket data.add_multiply1.data.x to the task input socket add_multiply_task_1.data.x, guaranteeing perfect data lineage.

Tip

If a graph only exposes the outputs of a single task, this can be simplified as

@task.graph()
def SomeGraph(...) - t.Annotated[dict, some_task.outputs]:
    return some_task(...)

We can see similar linkage in the provenance graph. Let’s run the graph and visualize its provenance.

wg.run()
wg.engine.recorder
provenance proc:42108ea56a2d4d469ee74d063193b922 AddMultiplyFinal (42108ea...) state=FINISHED proc:947bed039c2c480c9f984a75ec74e098 generate_square_numbers (947bed0...) state=FINISHED proc:42108ea56a2d4d469ee74d063193b922->proc:947bed039c2c480c9f984a75ec74e098 CALL proc:6ab7c93f52c7419dbd52181e7230e028 add_multiply (6ab7c93...) state=FINISHED proc:42108ea56a2d4d469ee74d063193b922->proc:6ab7c93f52c7419dbd52181e7230e028 CALL proc:c2347864e90f4350ac065a81810675e4 add_multiply1 (c234786...) state=FINISHED proc:42108ea56a2d4d469ee74d063193b922->proc:c2347864e90f4350ac065a81810675e4 CALL data:23092651-e9a1-4e85-b0aa-4a2e076a1131 int (2309265...) proc:42108ea56a2d4d469ee74d063193b922->data:23092651-e9a1-4e85-b0aa-4a2e076a1131 RETURN square.square_0 data:17465566-9b6a-46d7-b2f3-d16fd9e6a5e3 int (1746556...) proc:42108ea56a2d4d469ee74d063193b922->data:17465566-9b6a-46d7-b2f3-d16fd9e6a5e3 RETURN square.square_1 data:6d2860a2-dfb0-434b-ab57-4421377e091d int (6d2860a...) proc:42108ea56a2d4d469ee74d063193b922->data:6d2860a2-dfb0-434b-ab57-4421377e091d RETURN square.square_2 data:7464eb13-6157-4ed1-afe2-f3e1bcaa8fc4 int (7464eb1...) proc:42108ea56a2d4d469ee74d063193b922->data:7464eb13-6157-4ed1-afe2-f3e1bcaa8fc4 RETURN add_multiply1.sum data:3597385a-f39a-4575-9211-e8d9a9889624 int (3597385...) proc:42108ea56a2d4d469ee74d063193b922->data:3597385a-f39a-4575-9211-e8d9a9889624 RETURN add_multiply1.product data:8979f769-5c5a-4703-9c62-6b80217f60b8 int (8979f76...) proc:42108ea56a2d4d469ee74d063193b922->data:8979f769-5c5a-4703-9c62-6b80217f60b8 RETURN add_multiply2.sum data:4ce70be7-eade-4c21-bbaa-f40b53ffe4de int (4ce70be...) proc:42108ea56a2d4d469ee74d063193b922->data:4ce70be7-eade-4c21-bbaa-f40b53ffe4de RETURN add_multiply2.product proc:947bed039c2c480c9f984a75ec74e098->data:23092651-e9a1-4e85-b0aa-4a2e076a1131 CREATE square_0 proc:947bed039c2c480c9f984a75ec74e098->data:17465566-9b6a-46d7-b2f3-d16fd9e6a5e3 CREATE square_1 proc:947bed039c2c480c9f984a75ec74e098->data:6d2860a2-dfb0-434b-ab57-4421377e091d CREATE square_2 proc:6ab7c93f52c7419dbd52181e7230e028->data:7464eb13-6157-4ed1-afe2-f3e1bcaa8fc4 CREATE sum proc:6ab7c93f52c7419dbd52181e7230e028->data:3597385a-f39a-4575-9211-e8d9a9889624 CREATE product proc:c2347864e90f4350ac065a81810675e4->data:8979f769-5c5a-4703-9c62-6b80217f60b8 CREATE sum proc:c2347864e90f4350ac065a81810675e4->data:4ce70be7-eade-4c21-bbaa-f40b53ffe4de CREATE product data:f0aa67b8-94ca-4109-9890-9b7f098e85ca int (f0aa67b...) data:f0aa67b8-94ca-4109-9890-9b7f098e85ca->proc:42108ea56a2d4d469ee74d063193b922 INPUT n data:f0aa67b8-94ca-4109-9890-9b7f098e85ca->proc:947bed039c2c480c9f984a75ec74e098 INPUT n data:2c547f48-01e8-48d4-9105-09b89fbbf2dc int (2c547f4...) data:2c547f48-01e8-48d4-9105-09b89fbbf2dc->proc:42108ea56a2d4d469ee74d063193b922 INPUT data.add_multiply1.data.x data:2c547f48-01e8-48d4-9105-09b89fbbf2dc->proc:6ab7c93f52c7419dbd52181e7230e028 INPUT data.x data:b35cb074-eaca-473c-baa1-ab21c8529179 int (b35cb07...) data:b35cb074-eaca-473c-baa1-ab21c8529179->proc:42108ea56a2d4d469ee74d063193b922 INPUT data.add_multiply1.data.y data:b35cb074-eaca-473c-baa1-ab21c8529179->proc:6ab7c93f52c7419dbd52181e7230e028 INPUT data.y data:68195d5b-30d2-47dc-a005-ad4afd9dd5f7 int (68195d5...) data:68195d5b-30d2-47dc-a005-ad4afd9dd5f7->proc:42108ea56a2d4d469ee74d063193b922 INPUT data.add_multiply2.data.x data:68195d5b-30d2-47dc-a005-ad4afd9dd5f7->proc:c2347864e90f4350ac065a81810675e4 INPUT data.x data:b466320f-1d08-4082-8cd0-e12a5642c110 int (b466320...) data:b466320f-1d08-4082-8cd0-e12a5642c110->proc:42108ea56a2d4d469ee74d063193b922 INPUT data.add_multiply2.data.y data:b466320f-1d08-4082-8cd0-e12a5642c110->proc:c2347864e90f4350ac065a81810675e4 INPUT data.y


Note how the outputs of the various tasks are exposed (linked) to the graph.

Reshaping specifications with select

Often, you’ll want to reuse parts of a specification while modifying it. For example, you might want to exclude a field that is provided by another source or rename fields for clarity. This can be done declaratively using select.

The main parameters are:

  • include=... / exclude=...: Keep or drop fields (supports dotted paths for nested fields).

  • include_prefix=... / exclude_prefix=...: Filter top-level fields by name prefix.

  • rename={old:new}: Rename top-level fields.

  • prefix="p_": Add a prefix to all top-level fields.

Important

To apply a select, you must place it in the metadata list of t.Annotated[<type>, ...] alongside the specification you are modifying.

Let’s see an example where we build a graph that runs a task twice. We want a shared structure input at the graph level, so we must exclude it from the nested input specifications for each task call.

from node_graph.socket_spec import meta, select


@task()
def consume_complex(
    data: t.Annotated[
        dict,
        namespace(
            pw=namespace(structure=int, kpoints=int, parameters=int),
            metadata=dict,
        ),
    ],
) -> dict:
    return {"seen": list(sorted(data.keys()))}


# Exclude a nested field (drop data.pw.structure)
# because it's provided separately as a graph input
# and shared between the two task calls.
@task.graph()
def UseExclude(
    structure,
    inputs: t.Annotated[
        dict,
        namespace(
            consume_complex1=t.Annotated[
                dict, consume_complex.inputs, select(exclude="data.pw.structure")
            ],
            consume_complex2=t.Annotated[
                dict, consume_complex.inputs, select(exclude="data.pw.structure")
            ],
        ),
    ],
):
    # Manually add the shared 'structure' to each task's inputs
    consume_complex_input1 = inputs["consume_complex1"]
    consume_complex_input1["data"]["pw"]["structure"] = structure
    consume_complex(**consume_complex_input1)

    consume_complex_input2 = inputs["consume_complex2"]
    consume_complex_input2["data"]["pw"]["structure"] = structure
    consume_complex(**consume_complex_input2)


wg = UseExclude.build(
    structure=1,
    inputs={
        "consume_complex1": {
            "data": {"pw": {"kpoints": 2, "parameters": 3}, "metadata": {}}
        },
        "consume_complex2": {
            "data": {"pw": {"kpoints": 4, "parameters": 5}, "metadata": {}}
        },
    },
)
wg


In the GUI representation, you can see that the graph has a top-level structure input, and the nested inputs inputs.consume_complex1.data.pw and inputs.consume_complex2.data.pw are missing the structure socket, just as we specified.

Modifying specification metadata

You can also set namespace-level metadata declaratively using meta, for example, to mark a reused specification as optional.

  • meta(required=False): Makes the input optional.

  • meta(is_metadata=True): Marks the input as metadata-only.

It is attached alongside select in the same Annotated metadata list.

@task.graph()
def UseMeta(
    data: t.Annotated[
        dict,
        consume_complex.inputs,  # Reuse the original spec
        meta(required=False),  # Make the entire 'data' input optional
    ],
):
    if data:
        return consume_complex(data=data)


# Attaching custom metadata
# ~~~~~~~~~~~~~~~~~~~~~~~~~
# The ``meta`` helper also exposes a generic ``extras=`` dictionary if you need to carry
# arbitrary metadata through the graph (for example, GUI hints or backend-specific flags).
# These keys are preserved on the socket and available to downstream tooling.


@task()
def AnnotateExtras(
    payload: dict,
) -> t.Annotated[
    dict,
    meta(
        extras={
            "ui:panel": "energy-details",
            "source": "user-provided",
        }
    ),
]:
    return payload

meta also accepts a semantics= payload to attach ontology metadata; see Use annotations to capture ontology semantics for a dedicated walkthrough.

Conclusion

You now know how to annotate task and graph inputs and outputs in node-graph. By leveraging static (namespace), dynamic (dynamic), nested namespaces, and structured models, you can precisely control data serialization and create transparent data lineages.

The key takeaways are:

  • Annotate task/graph outputs to unpack results into individual AiiDA nodes.

  • Annotate inputs to specify input structures.

  • Employ dynamic (or Pydantic models with extra='allow') for tasks with a variable number of outputs.

  • Use Pydantic model, dataclass, or TypedDict for reusable, validated schemas:
    • Plain BaseModel, dataclass, or TypedDict –> expanded namespace

    • model_config={'extra':'allow', 'item_type': T} –> dynamic namespace

    • model_config={'leaf': True} or Leaf[Model] –> single leaf (blob)

  • Use Unpack[TypedDict] to expand **kwargs into a fixed namespace.

  • Reuse .inputs and .outputs specifications at the graph level to build modular and robust workflows.

  • Use select inside Annotated to reshape reused specifications (e.g., include/exclude with dotted paths, rename, prefix).

  • Use meta to modify metadata of a specification, such as making it optional.

These tools are fundamental to building reproducible and verifiable scientific workflows with complete data provenance.

Total running time of the script: (0 minutes 0.307 seconds)

Gallery generated by Sphinx-Gallery