function.response
Utilities for working with RunFunctionResponses.
1# Copyright 2023 The Crossplane Authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Utilities for working with RunFunctionResponses.""" 16 17import datetime 18 19from google.protobuf import duration_pb2 as durationpb 20from google.protobuf import struct_pb2 as structpb 21 22import crossplane.function.proto.v1.run_function_pb2 as fnv1 23from crossplane.function import resource 24 25"""The default TTL for which a RunFunctionResponse may be cached.""" 26DEFAULT_TTL = datetime.timedelta(minutes=1) 27 28 29def to( 30 req: fnv1.RunFunctionRequest, 31 ttl: datetime.timedelta = DEFAULT_TTL, 32) -> fnv1.RunFunctionResponse: 33 """Create a response to the supplied request. 34 35 Args: 36 req: The request to respond to. 37 ttl: How long Crossplane may optionally cache the response. 38 39 Returns: 40 A response to the supplied request. 41 42 The request's tag, desired resources, and context is automatically copied to 43 the response. Using response.to is a good pattern to ensure 44 """ 45 dttl = durationpb.Duration() 46 dttl.FromTimedelta(ttl) 47 return fnv1.RunFunctionResponse( 48 meta=fnv1.ResponseMeta(tag=req.meta.tag, ttl=dttl), 49 desired=req.desired, 50 context=req.context, 51 ) 52 53 54def normal(rsp: fnv1.RunFunctionResponse, message: str) -> None: 55 """Add a normal result to the response.""" 56 rsp.results.append( 57 fnv1.Result( 58 severity=fnv1.SEVERITY_NORMAL, 59 message=message, 60 ) 61 ) 62 63 64def warning(rsp: fnv1.RunFunctionResponse, message: str) -> None: 65 """Add a warning result to the response.""" 66 rsp.results.append( 67 fnv1.Result( 68 severity=fnv1.SEVERITY_WARNING, 69 message=message, 70 ) 71 ) 72 73 74def fatal(rsp: fnv1.RunFunctionResponse, message: str) -> None: 75 """Add a fatal result to the response.""" 76 rsp.results.append( 77 fnv1.Result( 78 severity=fnv1.SEVERITY_FATAL, 79 message=message, 80 ) 81 ) 82 83 84_STATUS_MAP = { 85 "True": fnv1.STATUS_CONDITION_TRUE, 86 "False": fnv1.STATUS_CONDITION_FALSE, 87 "Unknown": fnv1.STATUS_CONDITION_UNKNOWN, 88} 89 90 91def set_conditions( 92 rsp: fnv1.RunFunctionResponse, 93 *conditions: resource.Condition, 94) -> None: 95 """Set one or more conditions on the composite resource (XR). 96 97 Args: 98 rsp: The RunFunctionResponse to update. 99 *conditions: The conditions to set. 100 101 Each condition is appended to ``rsp.conditions``. Crossplane uses the 102 conditions returned by a function to set custom status conditions on 103 the composite resource. 104 105 The ``last_transition_time`` field of each condition is ignored. 106 Crossplane sets the transition time itself. 107 108 Do not set the ``Ready`` condition type. Crossplane manages it based 109 on resource readiness. 110 """ 111 for condition in conditions: 112 c = fnv1.Condition( 113 type=condition.typ, 114 status=_STATUS_MAP.get(condition.status, fnv1.STATUS_CONDITION_UNKNOWN), 115 reason=condition.reason or "", 116 ) 117 if condition.message: 118 c.message = condition.message 119 rsp.conditions.append(c) 120 121 122def set_output(rsp: fnv1.RunFunctionResponse, output: dict | structpb.Struct) -> None: 123 """Set the output field in a RunFunctionResponse for operation functions. 124 125 Args: 126 rsp: The RunFunctionResponse to update. 127 output: The output data as a dictionary or protobuf Struct. 128 129 Operation functions can return arbitrary output data that will be written 130 to the Operation's status.pipeline field. This function sets that output 131 on the response. 132 """ 133 match output: 134 case dict(): 135 rsp.output.CopyFrom(resource.dict_to_struct(output)) 136 case structpb.Struct(): 137 rsp.output.CopyFrom(output) 138 case _: 139 t = type(output) 140 msg = f"Unsupported output type: {t}" 141 raise TypeError(msg) 142 143 144def require_resources( # noqa: PLR0913 145 rsp: fnv1.RunFunctionResponse, 146 name: str, 147 api_version: str, 148 kind: str, 149 *, 150 match_name: str | None = None, 151 match_labels: dict[str, str] | None = None, 152 namespace: str | None = None, 153) -> None: 154 """Add a resource requirement to the response. 155 156 Args: 157 rsp: The RunFunctionResponse to update. 158 name: The name to use for this requirement. 159 api_version: The API version of resources to require. 160 kind: The kind of resources to require. 161 match_name: Match a resource by name (mutually exclusive with match_labels). 162 match_labels: Match resources by labels (mutually exclusive with match_name). 163 namespace: The namespace to search in (optional). 164 165 Raises: 166 ValueError: If both match_name and match_labels are provided, or neither. 167 168 This tells Crossplane to fetch the specified resources and include them 169 in the next call to the function in req.required_resources[name]. 170 """ 171 if (match_name is None) == (match_labels is None): 172 msg = "Exactly one of match_name or match_labels must be provided" 173 raise ValueError(msg) 174 175 selector = fnv1.ResourceSelector( 176 api_version=api_version, 177 kind=kind, 178 ) 179 180 if match_name is not None: 181 selector.match_name = match_name 182 183 if match_labels is not None: 184 selector.match_labels.labels.update(match_labels) 185 186 if namespace is not None: 187 selector.namespace = namespace 188 189 rsp.requirements.resources[name].CopyFrom(selector) 190 191 192def require_schema( 193 rsp: fnv1.RunFunctionResponse, 194 name: str, 195 api_version: str, 196 kind: str, 197) -> None: 198 """Add a schema requirement to the response. 199 200 Args: 201 rsp: The RunFunctionResponse to update. 202 name: The name to use for this requirement. 203 api_version: The API version of the resource kind, e.g. "example.org/v1". 204 kind: The kind of resource, e.g. "MyResource". 205 206 This tells Crossplane to fetch the OpenAPI schema for the specified resource 207 kind and include it in the next call to the function in 208 req.required_schemas[name]. Use request.get_required_schema to retrieve it. 209 210 For CRDs, Crossplane returns the spec.versions[].schema.openAPIV3Schema field. 211 If Crossplane cannot find a schema for the requested kind, the schema will be 212 empty (get_required_schema will return None). 213 """ 214 selector = fnv1.SchemaSelector( 215 api_version=api_version, 216 kind=kind, 217 ) 218 rsp.requirements.schemas[name].CopyFrom(selector)
30def to( 31 req: fnv1.RunFunctionRequest, 32 ttl: datetime.timedelta = DEFAULT_TTL, 33) -> fnv1.RunFunctionResponse: 34 """Create a response to the supplied request. 35 36 Args: 37 req: The request to respond to. 38 ttl: How long Crossplane may optionally cache the response. 39 40 Returns: 41 A response to the supplied request. 42 43 The request's tag, desired resources, and context is automatically copied to 44 the response. Using response.to is a good pattern to ensure 45 """ 46 dttl = durationpb.Duration() 47 dttl.FromTimedelta(ttl) 48 return fnv1.RunFunctionResponse( 49 meta=fnv1.ResponseMeta(tag=req.meta.tag, ttl=dttl), 50 desired=req.desired, 51 context=req.context, 52 )
Create a response to the supplied request.
Arguments:
- req: The request to respond to.
- ttl: How long Crossplane may optionally cache the response.
Returns:
A response to the supplied request.
The request's tag, desired resources, and context is automatically copied to the response. Using response.to is a good pattern to ensure
55def normal(rsp: fnv1.RunFunctionResponse, message: str) -> None: 56 """Add a normal result to the response.""" 57 rsp.results.append( 58 fnv1.Result( 59 severity=fnv1.SEVERITY_NORMAL, 60 message=message, 61 ) 62 )
Add a normal result to the response.
65def warning(rsp: fnv1.RunFunctionResponse, message: str) -> None: 66 """Add a warning result to the response.""" 67 rsp.results.append( 68 fnv1.Result( 69 severity=fnv1.SEVERITY_WARNING, 70 message=message, 71 ) 72 )
Add a warning result to the response.
75def fatal(rsp: fnv1.RunFunctionResponse, message: str) -> None: 76 """Add a fatal result to the response.""" 77 rsp.results.append( 78 fnv1.Result( 79 severity=fnv1.SEVERITY_FATAL, 80 message=message, 81 ) 82 )
Add a fatal result to the response.
92def set_conditions( 93 rsp: fnv1.RunFunctionResponse, 94 *conditions: resource.Condition, 95) -> None: 96 """Set one or more conditions on the composite resource (XR). 97 98 Args: 99 rsp: The RunFunctionResponse to update. 100 *conditions: The conditions to set. 101 102 Each condition is appended to ``rsp.conditions``. Crossplane uses the 103 conditions returned by a function to set custom status conditions on 104 the composite resource. 105 106 The ``last_transition_time`` field of each condition is ignored. 107 Crossplane sets the transition time itself. 108 109 Do not set the ``Ready`` condition type. Crossplane manages it based 110 on resource readiness. 111 """ 112 for condition in conditions: 113 c = fnv1.Condition( 114 type=condition.typ, 115 status=_STATUS_MAP.get(condition.status, fnv1.STATUS_CONDITION_UNKNOWN), 116 reason=condition.reason or "", 117 ) 118 if condition.message: 119 c.message = condition.message 120 rsp.conditions.append(c)
Set one or more conditions on the composite resource (XR).
Arguments:
- rsp: The RunFunctionResponse to update.
- *conditions: The conditions to set.
Each condition is appended to rsp.conditions. Crossplane uses the
conditions returned by a function to set custom status conditions on
the composite resource.
The last_transition_time field of each condition is ignored.
Crossplane sets the transition time itself.
Do not set the Ready condition type. Crossplane manages it based
on resource readiness.
123def set_output(rsp: fnv1.RunFunctionResponse, output: dict | structpb.Struct) -> None: 124 """Set the output field in a RunFunctionResponse for operation functions. 125 126 Args: 127 rsp: The RunFunctionResponse to update. 128 output: The output data as a dictionary or protobuf Struct. 129 130 Operation functions can return arbitrary output data that will be written 131 to the Operation's status.pipeline field. This function sets that output 132 on the response. 133 """ 134 match output: 135 case dict(): 136 rsp.output.CopyFrom(resource.dict_to_struct(output)) 137 case structpb.Struct(): 138 rsp.output.CopyFrom(output) 139 case _: 140 t = type(output) 141 msg = f"Unsupported output type: {t}" 142 raise TypeError(msg)
Set the output field in a RunFunctionResponse for operation functions.
Arguments:
- rsp: The RunFunctionResponse to update.
- output: The output data as a dictionary or protobuf Struct.
Operation functions can return arbitrary output data that will be written to the Operation's status.pipeline field. This function sets that output on the response.
145def require_resources( # noqa: PLR0913 146 rsp: fnv1.RunFunctionResponse, 147 name: str, 148 api_version: str, 149 kind: str, 150 *, 151 match_name: str | None = None, 152 match_labels: dict[str, str] | None = None, 153 namespace: str | None = None, 154) -> None: 155 """Add a resource requirement to the response. 156 157 Args: 158 rsp: The RunFunctionResponse to update. 159 name: The name to use for this requirement. 160 api_version: The API version of resources to require. 161 kind: The kind of resources to require. 162 match_name: Match a resource by name (mutually exclusive with match_labels). 163 match_labels: Match resources by labels (mutually exclusive with match_name). 164 namespace: The namespace to search in (optional). 165 166 Raises: 167 ValueError: If both match_name and match_labels are provided, or neither. 168 169 This tells Crossplane to fetch the specified resources and include them 170 in the next call to the function in req.required_resources[name]. 171 """ 172 if (match_name is None) == (match_labels is None): 173 msg = "Exactly one of match_name or match_labels must be provided" 174 raise ValueError(msg) 175 176 selector = fnv1.ResourceSelector( 177 api_version=api_version, 178 kind=kind, 179 ) 180 181 if match_name is not None: 182 selector.match_name = match_name 183 184 if match_labels is not None: 185 selector.match_labels.labels.update(match_labels) 186 187 if namespace is not None: 188 selector.namespace = namespace 189 190 rsp.requirements.resources[name].CopyFrom(selector)
Add a resource requirement to the response.
Arguments:
- rsp: The RunFunctionResponse to update.
- name: The name to use for this requirement.
- api_version: The API version of resources to require.
- kind: The kind of resources to require.
- match_name: Match a resource by name (mutually exclusive with match_labels).
- match_labels: Match resources by labels (mutually exclusive with match_name).
- namespace: The namespace to search in (optional).
Raises:
- ValueError: If both match_name and match_labels are provided, or neither.
This tells Crossplane to fetch the specified resources and include them in the next call to the function in req.required_resources[name].
193def require_schema( 194 rsp: fnv1.RunFunctionResponse, 195 name: str, 196 api_version: str, 197 kind: str, 198) -> None: 199 """Add a schema requirement to the response. 200 201 Args: 202 rsp: The RunFunctionResponse to update. 203 name: The name to use for this requirement. 204 api_version: The API version of the resource kind, e.g. "example.org/v1". 205 kind: The kind of resource, e.g. "MyResource". 206 207 This tells Crossplane to fetch the OpenAPI schema for the specified resource 208 kind and include it in the next call to the function in 209 req.required_schemas[name]. Use request.get_required_schema to retrieve it. 210 211 For CRDs, Crossplane returns the spec.versions[].schema.openAPIV3Schema field. 212 If Crossplane cannot find a schema for the requested kind, the schema will be 213 empty (get_required_schema will return None). 214 """ 215 selector = fnv1.SchemaSelector( 216 api_version=api_version, 217 kind=kind, 218 ) 219 rsp.requirements.schemas[name].CopyFrom(selector)
Add a schema requirement to the response.
Arguments:
- rsp: The RunFunctionResponse to update.
- name: The name to use for this requirement.
- api_version: The API version of the resource kind, e.g. "example.org/v1".
- kind: The kind of resource, e.g. "MyResource".
This tells Crossplane to fetch the OpenAPI schema for the specified resource kind and include it in the next call to the function in req.required_schemas[name]. Use request.get_required_schema to retrieve it.
For CRDs, Crossplane returns the spec.versions[].schema.openAPIV3Schema field. If Crossplane cannot find a schema for the requested kind, the schema will be empty (get_required_schema will return None).