function.response

Utilities for working with RunFunctionResponses.

  1# Copyright 2023 The Crossplane Authors.
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""Utilities for working with RunFunctionResponses."""
 16
 17import datetime
 18
 19from google.protobuf import duration_pb2 as durationpb
 20from google.protobuf import struct_pb2 as structpb
 21
 22import crossplane.function.proto.v1.run_function_pb2 as fnv1
 23from crossplane.function import resource
 24
 25"""The default TTL for which a RunFunctionResponse may be cached."""
 26DEFAULT_TTL = datetime.timedelta(minutes=1)
 27
 28
 29def to(
 30    req: fnv1.RunFunctionRequest,
 31    ttl: datetime.timedelta = DEFAULT_TTL,
 32) -> fnv1.RunFunctionResponse:
 33    """Create a response to the supplied request.
 34
 35    Args:
 36        req: The request to respond to.
 37        ttl: How long Crossplane may optionally cache the response.
 38
 39    Returns:
 40        A response to the supplied request.
 41
 42    The request's tag, desired resources, and context is automatically copied to
 43    the response. Using response.to is a good pattern to ensure
 44    """
 45    dttl = durationpb.Duration()
 46    dttl.FromTimedelta(ttl)
 47    return fnv1.RunFunctionResponse(
 48        meta=fnv1.ResponseMeta(tag=req.meta.tag, ttl=dttl),
 49        desired=req.desired,
 50        context=req.context,
 51    )
 52
 53
 54def normal(rsp: fnv1.RunFunctionResponse, message: str) -> None:
 55    """Add a normal result to the response."""
 56    rsp.results.append(
 57        fnv1.Result(
 58            severity=fnv1.SEVERITY_NORMAL,
 59            message=message,
 60        )
 61    )
 62
 63
 64def warning(rsp: fnv1.RunFunctionResponse, message: str) -> None:
 65    """Add a warning result to the response."""
 66    rsp.results.append(
 67        fnv1.Result(
 68            severity=fnv1.SEVERITY_WARNING,
 69            message=message,
 70        )
 71    )
 72
 73
 74def fatal(rsp: fnv1.RunFunctionResponse, message: str) -> None:
 75    """Add a fatal result to the response."""
 76    rsp.results.append(
 77        fnv1.Result(
 78            severity=fnv1.SEVERITY_FATAL,
 79            message=message,
 80        )
 81    )
 82
 83
 84_STATUS_MAP = {
 85    "True": fnv1.STATUS_CONDITION_TRUE,
 86    "False": fnv1.STATUS_CONDITION_FALSE,
 87    "Unknown": fnv1.STATUS_CONDITION_UNKNOWN,
 88}
 89
 90
 91def set_conditions(
 92    rsp: fnv1.RunFunctionResponse,
 93    *conditions: resource.Condition,
 94) -> None:
 95    """Set one or more conditions on the composite resource (XR).
 96
 97    Args:
 98        rsp: The RunFunctionResponse to update.
 99        *conditions: The conditions to set.
100
101    Each condition is appended to ``rsp.conditions``. Crossplane uses the
102    conditions returned by a function to set custom status conditions on
103    the composite resource.
104
105    The ``last_transition_time`` field of each condition is ignored.
106    Crossplane sets the transition time itself.
107
108    Do not set the ``Ready`` condition type. Crossplane manages it based
109    on resource readiness.
110    """
111    for condition in conditions:
112        c = fnv1.Condition(
113            type=condition.typ,
114            status=_STATUS_MAP.get(condition.status, fnv1.STATUS_CONDITION_UNKNOWN),
115            reason=condition.reason or "",
116        )
117        if condition.message:
118            c.message = condition.message
119        rsp.conditions.append(c)
120
121
122def set_output(rsp: fnv1.RunFunctionResponse, output: dict | structpb.Struct) -> None:
123    """Set the output field in a RunFunctionResponse for operation functions.
124
125    Args:
126        rsp: The RunFunctionResponse to update.
127        output: The output data as a dictionary or protobuf Struct.
128
129    Operation functions can return arbitrary output data that will be written
130    to the Operation's status.pipeline field. This function sets that output
131    on the response.
132    """
133    match output:
134        case dict():
135            rsp.output.CopyFrom(resource.dict_to_struct(output))
136        case structpb.Struct():
137            rsp.output.CopyFrom(output)
138        case _:
139            t = type(output)
140            msg = f"Unsupported output type: {t}"
141            raise TypeError(msg)
142
143
144def require_resources(  # noqa: PLR0913
145    rsp: fnv1.RunFunctionResponse,
146    name: str,
147    api_version: str,
148    kind: str,
149    *,
150    match_name: str | None = None,
151    match_labels: dict[str, str] | None = None,
152    namespace: str | None = None,
153) -> None:
154    """Add a resource requirement to the response.
155
156    Args:
157        rsp: The RunFunctionResponse to update.
158        name: The name to use for this requirement.
159        api_version: The API version of resources to require.
160        kind: The kind of resources to require.
161        match_name: Match a resource by name (mutually exclusive with match_labels).
162        match_labels: Match resources by labels (mutually exclusive with match_name).
163        namespace: The namespace to search in (optional).
164
165    Raises:
166        ValueError: If both match_name and match_labels are provided, or neither.
167
168    This tells Crossplane to fetch the specified resources and include them
169    in the next call to the function in req.required_resources[name].
170    """
171    if (match_name is None) == (match_labels is None):
172        msg = "Exactly one of match_name or match_labels must be provided"
173        raise ValueError(msg)
174
175    selector = fnv1.ResourceSelector(
176        api_version=api_version,
177        kind=kind,
178    )
179
180    if match_name is not None:
181        selector.match_name = match_name
182
183    if match_labels is not None:
184        selector.match_labels.labels.update(match_labels)
185
186    if namespace is not None:
187        selector.namespace = namespace
188
189    rsp.requirements.resources[name].CopyFrom(selector)
190
191
192def require_schema(
193    rsp: fnv1.RunFunctionResponse,
194    name: str,
195    api_version: str,
196    kind: str,
197) -> None:
198    """Add a schema requirement to the response.
199
200    Args:
201        rsp: The RunFunctionResponse to update.
202        name: The name to use for this requirement.
203        api_version: The API version of the resource kind, e.g. "example.org/v1".
204        kind: The kind of resource, e.g. "MyResource".
205
206    This tells Crossplane to fetch the OpenAPI schema for the specified resource
207    kind and include it in the next call to the function in
208    req.required_schemas[name]. Use request.get_required_schema to retrieve it.
209
210    For CRDs, Crossplane returns the spec.versions[].schema.openAPIV3Schema field.
211    If Crossplane cannot find a schema for the requested kind, the schema will be
212    empty (get_required_schema will return None).
213    """
214    selector = fnv1.SchemaSelector(
215        api_version=api_version,
216        kind=kind,
217    )
218    rsp.requirements.schemas[name].CopyFrom(selector)
DEFAULT_TTL = datetime.timedelta(seconds=60)
def to( req: crossplane.function.proto.v1.run_function_pb2.RunFunctionRequest, ttl: datetime.timedelta = datetime.timedelta(seconds=60)) -> crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse:
30def to(
31    req: fnv1.RunFunctionRequest,
32    ttl: datetime.timedelta = DEFAULT_TTL,
33) -> fnv1.RunFunctionResponse:
34    """Create a response to the supplied request.
35
36    Args:
37        req: The request to respond to.
38        ttl: How long Crossplane may optionally cache the response.
39
40    Returns:
41        A response to the supplied request.
42
43    The request's tag, desired resources, and context is automatically copied to
44    the response. Using response.to is a good pattern to ensure
45    """
46    dttl = durationpb.Duration()
47    dttl.FromTimedelta(ttl)
48    return fnv1.RunFunctionResponse(
49        meta=fnv1.ResponseMeta(tag=req.meta.tag, ttl=dttl),
50        desired=req.desired,
51        context=req.context,
52    )

Create a response to the supplied request.

Arguments:
  • req: The request to respond to.
  • ttl: How long Crossplane may optionally cache the response.
Returns:

A response to the supplied request.

The request's tag, desired resources, and context is automatically copied to the response. Using response.to is a good pattern to ensure

def normal( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, message: str) -> None:
55def normal(rsp: fnv1.RunFunctionResponse, message: str) -> None:
56    """Add a normal result to the response."""
57    rsp.results.append(
58        fnv1.Result(
59            severity=fnv1.SEVERITY_NORMAL,
60            message=message,
61        )
62    )

Add a normal result to the response.

def warning( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, message: str) -> None:
65def warning(rsp: fnv1.RunFunctionResponse, message: str) -> None:
66    """Add a warning result to the response."""
67    rsp.results.append(
68        fnv1.Result(
69            severity=fnv1.SEVERITY_WARNING,
70            message=message,
71        )
72    )

Add a warning result to the response.

def fatal( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, message: str) -> None:
75def fatal(rsp: fnv1.RunFunctionResponse, message: str) -> None:
76    """Add a fatal result to the response."""
77    rsp.results.append(
78        fnv1.Result(
79            severity=fnv1.SEVERITY_FATAL,
80            message=message,
81        )
82    )

Add a fatal result to the response.

def set_conditions( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, *conditions: crossplane.function.resource.Condition) -> None:
 92def set_conditions(
 93    rsp: fnv1.RunFunctionResponse,
 94    *conditions: resource.Condition,
 95) -> None:
 96    """Set one or more conditions on the composite resource (XR).
 97
 98    Args:
 99        rsp: The RunFunctionResponse to update.
100        *conditions: The conditions to set.
101
102    Each condition is appended to ``rsp.conditions``. Crossplane uses the
103    conditions returned by a function to set custom status conditions on
104    the composite resource.
105
106    The ``last_transition_time`` field of each condition is ignored.
107    Crossplane sets the transition time itself.
108
109    Do not set the ``Ready`` condition type. Crossplane manages it based
110    on resource readiness.
111    """
112    for condition in conditions:
113        c = fnv1.Condition(
114            type=condition.typ,
115            status=_STATUS_MAP.get(condition.status, fnv1.STATUS_CONDITION_UNKNOWN),
116            reason=condition.reason or "",
117        )
118        if condition.message:
119            c.message = condition.message
120        rsp.conditions.append(c)

Set one or more conditions on the composite resource (XR).

Arguments:
  • rsp: The RunFunctionResponse to update.
  • *conditions: The conditions to set.

Each condition is appended to rsp.conditions. Crossplane uses the conditions returned by a function to set custom status conditions on the composite resource.

The last_transition_time field of each condition is ignored. Crossplane sets the transition time itself.

Do not set the Ready condition type. Crossplane manages it based on resource readiness.

def set_output( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, output: dict | google.protobuf.struct_pb2.Struct) -> None:
123def set_output(rsp: fnv1.RunFunctionResponse, output: dict | structpb.Struct) -> None:
124    """Set the output field in a RunFunctionResponse for operation functions.
125
126    Args:
127        rsp: The RunFunctionResponse to update.
128        output: The output data as a dictionary or protobuf Struct.
129
130    Operation functions can return arbitrary output data that will be written
131    to the Operation's status.pipeline field. This function sets that output
132    on the response.
133    """
134    match output:
135        case dict():
136            rsp.output.CopyFrom(resource.dict_to_struct(output))
137        case structpb.Struct():
138            rsp.output.CopyFrom(output)
139        case _:
140            t = type(output)
141            msg = f"Unsupported output type: {t}"
142            raise TypeError(msg)

Set the output field in a RunFunctionResponse for operation functions.

Arguments:
  • rsp: The RunFunctionResponse to update.
  • output: The output data as a dictionary or protobuf Struct.

Operation functions can return arbitrary output data that will be written to the Operation's status.pipeline field. This function sets that output on the response.

def require_resources( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, name: str, api_version: str, kind: str, *, match_name: str | None = None, match_labels: dict[str, str] | None = None, namespace: str | None = None) -> None:
145def require_resources(  # noqa: PLR0913
146    rsp: fnv1.RunFunctionResponse,
147    name: str,
148    api_version: str,
149    kind: str,
150    *,
151    match_name: str | None = None,
152    match_labels: dict[str, str] | None = None,
153    namespace: str | None = None,
154) -> None:
155    """Add a resource requirement to the response.
156
157    Args:
158        rsp: The RunFunctionResponse to update.
159        name: The name to use for this requirement.
160        api_version: The API version of resources to require.
161        kind: The kind of resources to require.
162        match_name: Match a resource by name (mutually exclusive with match_labels).
163        match_labels: Match resources by labels (mutually exclusive with match_name).
164        namespace: The namespace to search in (optional).
165
166    Raises:
167        ValueError: If both match_name and match_labels are provided, or neither.
168
169    This tells Crossplane to fetch the specified resources and include them
170    in the next call to the function in req.required_resources[name].
171    """
172    if (match_name is None) == (match_labels is None):
173        msg = "Exactly one of match_name or match_labels must be provided"
174        raise ValueError(msg)
175
176    selector = fnv1.ResourceSelector(
177        api_version=api_version,
178        kind=kind,
179    )
180
181    if match_name is not None:
182        selector.match_name = match_name
183
184    if match_labels is not None:
185        selector.match_labels.labels.update(match_labels)
186
187    if namespace is not None:
188        selector.namespace = namespace
189
190    rsp.requirements.resources[name].CopyFrom(selector)

Add a resource requirement to the response.

Arguments:
  • rsp: The RunFunctionResponse to update.
  • name: The name to use for this requirement.
  • api_version: The API version of resources to require.
  • kind: The kind of resources to require.
  • match_name: Match a resource by name (mutually exclusive with match_labels).
  • match_labels: Match resources by labels (mutually exclusive with match_name).
  • namespace: The namespace to search in (optional).
Raises:
  • ValueError: If both match_name and match_labels are provided, or neither.

This tells Crossplane to fetch the specified resources and include them in the next call to the function in req.required_resources[name].

def require_schema( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, name: str, api_version: str, kind: str) -> None:
193def require_schema(
194    rsp: fnv1.RunFunctionResponse,
195    name: str,
196    api_version: str,
197    kind: str,
198) -> None:
199    """Add a schema requirement to the response.
200
201    Args:
202        rsp: The RunFunctionResponse to update.
203        name: The name to use for this requirement.
204        api_version: The API version of the resource kind, e.g. "example.org/v1".
205        kind: The kind of resource, e.g. "MyResource".
206
207    This tells Crossplane to fetch the OpenAPI schema for the specified resource
208    kind and include it in the next call to the function in
209    req.required_schemas[name]. Use request.get_required_schema to retrieve it.
210
211    For CRDs, Crossplane returns the spec.versions[].schema.openAPIV3Schema field.
212    If Crossplane cannot find a schema for the requested kind, the schema will be
213    empty (get_required_schema will return None).
214    """
215    selector = fnv1.SchemaSelector(
216        api_version=api_version,
217        kind=kind,
218    )
219    rsp.requirements.schemas[name].CopyFrom(selector)

Add a schema requirement to the response.

Arguments:
  • rsp: The RunFunctionResponse to update.
  • name: The name to use for this requirement.
  • api_version: The API version of the resource kind, e.g. "example.org/v1".
  • kind: The kind of resource, e.g. "MyResource".

This tells Crossplane to fetch the OpenAPI schema for the specified resource kind and include it in the next call to the function in req.required_schemas[name]. Use request.get_required_schema to retrieve it.

For CRDs, Crossplane returns the spec.versions[].schema.openAPIV3Schema field. If Crossplane cannot find a schema for the requested kind, the schema will be empty (get_required_schema will return None).