diff --git a/azure-pipelines-templates/type_checking.yml b/azure-pipelines-templates/type_checking.yml new file mode 100644 index 000000000..dbabd6570 --- /dev/null +++ b/azure-pipelines-templates/type_checking.yml @@ -0,0 +1,48 @@ +# Copyright (c) 2025, Shotgun Software Inc. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# - Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# - Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# - Neither the name of the Shotgun Software Inc nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +jobs: +- job: type_checking + displayName: Type Checking (beta) + pool: + vmImage: 'ubuntu-latest' + + steps: + - task: UsePythonVersion@0 + inputs: + versionSpec: 3.9 + addToPath: True + architecture: 'x64' + + - script: | + pip install --upgrade pip setuptools wheel + pip install --upgrade mypy + displayName: Install dependencies + + # Placeholder to future static type checking. For now we just run mypy and skip all known errors. + - bash: mypy shotgun_api3/shotgun.py --follow-imports skip --pretty --no-strict-optional --disable-error-code arg-type --disable-error-code assignment --disable-error-code return --disable-error-code return-value --disable-error-code attr-defined + displayName: Run type checking diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 52e6cfa9c..0e465bf22 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -52,6 +52,7 @@ pr: # Jobs run in parallel. jobs: - template: azure-pipelines-templates/code_style_validation.yml +- template: azure-pipelines-templates/type_checking.yml # These are jobs templates, they allow to reduce the redundancy between # variations of the same build. We pass in the image name diff --git a/docs/installation.rst b/docs/installation.rst index b082b1669..fe64d2844 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -6,8 +6,6 @@ Installation Minimum Requirements ******************** -- Python 3.7 - .. note:: Some features of the API are only supported by more recent versions of the Flow Production Tracking server. These features are added to the Python API in a backwards compatible way so that existing @@ -15,6 +13,16 @@ Minimum Requirements your version of Flow Production Tracking will raise an appropriate exception. In general, we attempt to document these where possible. +Python versions +=============== + +The Python API library supports the following Python versions: `3.9 - 3.11`. We recommend using Python 3.11. + +.. important:: + Python versions older than 3.9 are no longer supported as of March 2025 and compatibility will be discontinued after + March 2026. + + ****************************** Installing into ``PYTHONPATH`` ****************************** diff --git a/docs/reference.rst b/docs/reference.rst index 28bfabf1b..d211dddfc 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -994,6 +994,19 @@ Will internally be transformed as if you invoked something like this: sg.find('Asset', [['project', 'is', {'id': 999, 'type': 'Project'}]]) +SHOTGUN_API_ALLOW_OLD_PYTHON +============================ + +When set to ``1``, ``shotgun_api3`` will allow being imported from Python versions that are no longer supported. +Otherwise, when unset (or set to any other value), importing the module will raise an exception. + +This is not recommended and should only be used for testing purposes. + +.. important:: + The ability to import the module does not guarantee that the module will work properly on the unsupported Python + version. In fact, it is very likely that it will not work properly. + + ************ Localization ************ diff --git a/shotgun_api3/__init__.py b/shotgun_api3/__init__.py index 943a9fa8b..a8486624e 100644 --- a/shotgun_api3/__init__.py +++ b/shotgun_api3/__init__.py @@ -8,14 +8,35 @@ # agreement to the Shotgun Pipeline Toolkit Source Code License. All rights # not expressly granted therein are reserved by Shotgun Software Inc. +import os import sys import warnings -if sys.version_info < (3, 9): +if sys.version_info < (3, 7): + if os.environ.get("SHOTGUN_API_ALLOW_OLD_PYTHON", "0") != "1": + # This is our preferred default behavior when using an old + # unsupported Python version. + # This way, we can control where the exception is raised, and it provides a + # comprehensive error message rather than having users facing a random + # Python traceback and trying to understand this is due to using an + # unsupported Python version. + + raise RuntimeError("This module requires Python version 3.7 or higher.") + + warnings.warn( + "Python versions older than 3.7 are no longer supported as of January " + "2023. Since the SHOTGUN_API_ALLOW_OLD_PYTHON variable is enabled, this " + "module is raising a warning instead of an exception. " + "However, it is very likely that this module will not be able to work " + "on this Python version.", + RuntimeWarning, + stacklevel=2, + ) +elif sys.version_info < (3, 9): warnings.warn( - "Python versions older than 3.9 are no longer supported since 2025-03 " - "and compatibility will be removed at any time after 2026-01. " - "Please update to Python 3.9 or a newer supported version.", + "Python versions older than 3.9 are no longer supported as of March " + "2025 and compatibility will be discontinued after March 2026. " + "Please update to Python 3.11 or any other supported version.", DeprecationWarning, stacklevel=2, ) diff --git a/shotgun_api3/shotgun.py b/shotgun_api3/shotgun.py index 2a1108aac..1b9d98452 100644 --- a/shotgun_api3/shotgun.py +++ b/shotgun_api3/shotgun.py @@ -29,6 +29,8 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ +from __future__ import annotations # Requried for 3.7 + import base64 import copy import datetime @@ -53,10 +55,11 @@ from typing import ( Any, BinaryIO, + Dict, Iterable, - Literal, + List, Optional, - TypedDict, + Tuple, TypeVar, Union, ) @@ -100,21 +103,25 @@ T = TypeVar("T") +if sys.version_info < (3, 9): + OrderItem = Dict + GroupingItem = Dict + BaseEntity = Dict +else: + from typing import TypedDict -class OrderItem(TypedDict): - field_name: str - direction: str - + class OrderItem(TypedDict): + field_name: str + direction: str -class GroupingItem(TypedDict): - field: str - type: str - direction: str + class GroupingItem(TypedDict): + field: str + type: str + direction: str - -class BaseEntity(TypedDict, total=False): - id: int - type: str + class BaseEntity(TypedDict, total=False): + id: int + type: str # ---------------------------------------------------------------------------- @@ -202,7 +209,7 @@ class ServerCapabilities(object): the future. Therefore, usage of this class is discouraged. """ - def __init__(self, host: str, meta: dict[str, Any]) -> None: + def __init__(self, host: str, meta: Dict[str, Any]) -> None: """ ServerCapabilities.__init__ @@ -215,7 +222,6 @@ def __init__(self, host: str, meta: dict[str, Any]) -> None: :ivar bool is_dev: ``True`` if server is running a development version of the Shotgun codebase. """ - self._ensure_python_version_supported() # Server host name self.host = host self.server_info = meta @@ -242,13 +248,6 @@ def __init__(self, host: str, meta: dict[str, Any]) -> None: self.version = tuple(self.version[:3]) self._ensure_json_supported() - def _ensure_python_version_supported(self) -> None: - """ - Checks the if current Python version is supported. - """ - if sys.version_info < (3, 7): - raise ShotgunError("This module requires Python version 3.7 or higher.") - def _ensure_support(self, feature: dict[str, Any], raise_hell: bool = True) -> bool: """ Checks the server version supports a given feature, raises an exception if it does not. @@ -421,7 +420,7 @@ def __init__(self, sg: "Shotgun"): self.auth_token: Optional[str] = None self.sudo_as_login: Optional[str] = None # Authentication parameters to be folded into final auth_params dict - self.extra_auth_params: Optional[dict[str, Any]] = None + self.extra_auth_params: Optional[Dict[str, Any]] = None # uuid as a string self.session_uuid: Optional[str] = None self.scheme: Optional[str] = None @@ -738,7 +737,7 @@ def __init__( self.config.user_password = None self.config.auth_token = None - def _split_url(self, base_url: str) -> tuple[Optional[str], Optional[str]]: + def _split_url(self, base_url: str) -> Tuple[Optional[str], Optional[str]]: """ Extract the hostname:port and username/password/token from base_url sent when connect to the API. @@ -770,7 +769,7 @@ def _split_url(self, base_url: str) -> tuple[Optional[str], Optional[str]]: # API Functions @property - def server_info(self) -> dict[str, Any]: + def server_info(self) -> Dict[str, Any]: """ Property containing server information. @@ -823,7 +822,7 @@ def close(self) -> None: self._close_connection() return - def info(self) -> dict[str, Any]: + def info(self) -> Dict[str, Any]: """ Get API-related metadata from the Shotgun server. @@ -857,13 +856,13 @@ def info(self) -> dict[str, Any]: def find_one( self, entity_type: str, - filters: Union[list, tuple, dict[str, Any]], - fields: Optional[list[str]] = None, - order: Optional[list[OrderItem]] = None, - filter_operator: Optional[Literal["all", "any"]] = None, + filters: Union[List, Tuple, Dict[str, Any]], + fields: Optional[List[str]] = None, + order: Optional[List[OrderItem]] = None, + filter_operator: Optional[str] = None, retired_only: bool = False, include_archived_projects: bool = True, - additional_filter_presets: Optional[list[dict[str, Any]]] = None, + additional_filter_presets: Optional[List[Dict[str, Any]]] = None, ) -> Optional[BaseEntity]: """ Shortcut for :meth:`~shotgun_api3.Shotgun.find` with ``limit=1`` so it returns a single @@ -937,16 +936,16 @@ def find_one( def find( self, entity_type: str, - filters: Union[list, tuple, dict[str, Any]], - fields: Optional[list[str]] = None, - order: Optional[list[OrderItem]] = None, - filter_operator: Optional[Literal["all", "any"]] = None, + filters: Union[List, Tuple, Dict[str, Any]], + fields: Optional[List[str]] = None, + order: Optional[List[OrderItem]] = None, + filter_operator: Optional[str] = None, limit: int = 0, retired_only: bool = False, page: int = 0, include_archived_projects: bool = True, - additional_filter_presets: Optional[list[dict[str, Any]]] = None, - ) -> list[BaseEntity]: + additional_filter_presets: Optional[List[Dict[str, Any]]] = None, + ) -> List[BaseEntity]: """ Find entities matching the given filters. @@ -1136,14 +1135,14 @@ def find( def _construct_read_parameters( self, entity_type: str, - fields: Optional[list[str]], - filters: dict[str, Any], + fields: Optional[List[str]], + filters: Dict[str, Any], retired_only: bool, - order: Optional[list[dict[str, Any]]], + order: Optional[List[Dict[str, Any]]], include_archived_projects: bool, - additional_filter_presets: Optional[list[dict[str, Any]]], - ) -> dict[str, Any]: - params: dict[str, Any] = {} + additional_filter_presets: Optional[List[Dict[str, Any]]], + ) -> Dict[str, Any]: + params: Dict[str, Any] = {} params["type"] = entity_type params["return_fields"] = fields or ["id"] params["filters"] = filters @@ -1174,8 +1173,8 @@ def _construct_read_parameters( return params def _add_project_param( - self, params: dict[str, Any], project_entity - ) -> dict[str, Any]: + self, params: Dict[str, Any], project_entity + ) -> Dict[str, Any]: if project_entity and self.server_caps.ensure_per_project_customization(): params["project"] = project_entity @@ -1186,9 +1185,9 @@ def _translate_update_params( self, entity_type: str, entity_id: int, - data: dict, - multi_entity_update_modes: Optional[dict], - ) -> dict[str, Any]: + data: Dict, + multi_entity_update_modes: Optional[Dict], + ) -> Dict[str, Any]: global SHOTGUN_API_DISABLE_ENTITY_OPTIMIZATION def optimize_field(field_dict): @@ -1211,12 +1210,12 @@ def optimize_field(field_dict): def summarize( self, entity_type: str, - filters: Union[list, dict[str, Any]], - summary_fields: list[dict[str, str]], + filters: Union[List, Dict[str, Any]], + summary_fields: List[Dict[str, str]], filter_operator: Optional[str] = None, - grouping: Optional[list[GroupingItem]] = None, + grouping: Optional[List[GroupingItem]] = None, include_archived_projects: bool = True, - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Summarize field data returned by a query. @@ -1419,9 +1418,9 @@ def summarize( def create( self, entity_type: str, - data: dict[str, Any], - return_fields: Optional[list] = None, - ) -> dict[str, Any]: + data: Dict[str, Any], + return_fields: Optional[List[str]] = None, + ) -> Dict[str, Any]: """ Create a new entity of the specified ``entity_type``. @@ -1508,8 +1507,8 @@ def update( self, entity_type: str, entity_id: int, - data: dict[str, Any], - multi_entity_update_modes: Optional[dict[str, Any]] = None, + data: Dict[str, Any], + multi_entity_update_modes: Optional[Dict[str, Any]] = None, ) -> BaseEntity: """ Update the specified entity with the supplied data. @@ -1631,7 +1630,7 @@ def revive(self, entity_type: str, entity_id: int) -> bool: return self._call_rpc("revive", params) - def batch(self, requests: list[dict[str, Any]]) -> list[dict[str, Any]]: + def batch(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Make a batch request of several :meth:`~shotgun_api3.Shotgun.create`, :meth:`~shotgun_api3.Shotgun.update`, and :meth:`~shotgun_api3.Shotgun.delete` calls. @@ -1750,9 +1749,9 @@ def work_schedule_read( self, start_date: str, end_date: str, - project: Optional[dict[str, Any]] = None, - user: Optional[dict[str, Any]] = None, - ) -> dict[str, Any]: + project: Optional[Dict[str, Any]] = None, + user: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: """ Return the work day rules for a given date range. @@ -1826,10 +1825,10 @@ def work_schedule_update( date: str, working: bool, description: Optional[str] = None, - project: Optional[dict[str, Any]] = None, - user: Optional[dict[str, Any]] = None, + project: Optional[Dict[str, Any]] = None, + user: Optional[Dict[str, Any]] = None, recalculate_field: Optional[str] = None, - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Update the work schedule for a given date. @@ -1883,7 +1882,7 @@ def work_schedule_update( return self._call_rpc("work_schedule_update", params) - def follow(self, user: dict[str, Any], entity: dict[str, Any]) -> dict[str, Any]: + def follow(self, user: Dict[str, Any], entity: Dict[str, Any]) -> Dict[str, Any]: """ Add the entity to the user's followed entities. @@ -1911,7 +1910,7 @@ def follow(self, user: dict[str, Any], entity: dict[str, Any]) -> dict[str, Any] return self._call_rpc("follow", params) - def unfollow(self, user: dict[str, Any], entity: dict[str, Any]) -> dict[str, Any]: + def unfollow(self, user: Dict[str, Any], entity: Dict[str, Any]) -> Dict[str, Any]: """ Remove entity from the user's followed entities. @@ -1938,7 +1937,7 @@ def unfollow(self, user: dict[str, Any], entity: dict[str, Any]) -> dict[str, An return self._call_rpc("unfollow", params) - def followers(self, entity: dict[str, Any]) -> list[dict[str, Any]]: + def followers(self, entity: Dict[str, Any]) -> List[Dict[str, Any]]: """ Return all followers for an entity. @@ -1968,10 +1967,10 @@ def followers(self, entity: dict[str, Any]) -> list[dict[str, Any]]: def following( self, - user: dict[str, Any], - project: Optional[dict[str, Any]] = None, + user: Dict[str, Any], + project: Optional[Dict[str, Any]] = None, entity_type: Optional[str] = None, - ) -> list[BaseEntity]: + ) -> List[BaseEntity]: """ Return all entity instances a user is following. @@ -2004,7 +2003,7 @@ def following( def schema_entity_read( self, project_entity: Optional[BaseEntity] = None - ) -> dict[str, dict[str, Any]]: + ) -> Dict[str, Dict[str, Any]]: """ Return all active entity types, their display names, and their visibility. @@ -2039,7 +2038,7 @@ def schema_entity_read( The returned display names for this method will be localized when the ``localize`` Shotgun config property is set to ``True``. See :ref:`localization` for more information. """ - params: dict[str, Any] = {} + params: Dict[str, Any] = {} params = self._add_project_param(params, project_entity) @@ -2050,7 +2049,7 @@ def schema_entity_read( def schema_read( self, project_entity: Optional[BaseEntity] = None - ) -> dict[str, dict[str, Any]]: + ) -> Dict[str, Dict[str, Any]]: """ Get the schema for all fields on all entities. @@ -2113,7 +2112,7 @@ def schema_read( The returned display names for this method will be localized when the ``localize`` Shotgun config property is set to ``True``. See :ref:`localization` for more information. """ - params: dict[str, Any] = {} + params: Dict[str, Any] = {} params = self._add_project_param(params, project_entity) @@ -2127,7 +2126,7 @@ def schema_field_read( entity_type: str, field_name: Optional[str] = None, project_entity: Optional[BaseEntity] = None, - ) -> dict[str, dict[str, Any]]: + ) -> Dict[str, Dict[str, Any]]: """ Get schema for all fields on the specified entity type or just the field name specified if provided. @@ -2196,7 +2195,7 @@ def schema_field_create( entity_type: str, data_type: str, display_name: str, - properties: Optional[dict[str, Any]] = None, + properties: Optional[Dict[str, Any]] = None, ) -> str: """ Create a field for the specified entity type. @@ -2238,7 +2237,7 @@ def schema_field_update( self, entity_type: str, field_name: str, - properties: dict[str, Any], + properties: Dict[str, Any], project_entity: Optional[BaseEntity] = None, ) -> bool: """ @@ -2348,7 +2347,7 @@ def set_session_uuid(self, session_uuid: str) -> None: def share_thumbnail( self, - entities: list[dict[str, Any]], + entities: List[Dict[str, Any]], thumbnail_path: Optional[str] = None, source_entity: Optional[BaseEntity] = None, filmstrip_thumbnail: bool = False, @@ -2838,7 +2837,7 @@ def _upload_to_sg( def _get_attachment_upload_info( self, is_thumbnail: bool, filename: str, is_multipart_upload: bool - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Internal function to get the information needed to upload a file to Cloud storage. @@ -2887,7 +2886,7 @@ def _get_attachment_upload_info( def download_attachment( self, - attachment: Union[dict[str, Any], Literal[False]] = False, + attachment: Union[Dict[str, Any], bool] = False, file_path: Optional[str] = None, attachment_id: Optional[int] = None, ) -> Union[str, bytes, None]: @@ -3100,7 +3099,7 @@ def get_attachment_download_url( def authenticate_human_user( self, user_login: str, user_password: str, auth_token: Optional[str] = None - ) -> Union[dict[str, Any], None]: + ) -> Union[Dict[str, Any], None]: """ Authenticate Shotgun HumanUser. @@ -3160,7 +3159,7 @@ def authenticate_human_user( raise def update_project_last_accessed( - self, project: dict[str, Any], user: Optional[dict[str, Any]] = None + self, project: Dict[str, Any], user: Optional[Dict[str, Any]] = None ) -> None: """ Update a Project's ``last_accessed_by_current_user`` field to the current timestamp. @@ -3208,8 +3207,8 @@ def update_project_last_accessed( self._parse_records(record)[0] def note_thread_read( - self, note_id: int, entity_fields: Optional[dict[str, Any]] = None - ) -> list[dict[str, Any]]: + self, note_id: int, entity_fields: Optional[Dict[str, Any]] = None + ) -> List[Dict[str, Any]]: """ Return the full conversation for a given note, including Replies and Attachments. @@ -3287,10 +3286,10 @@ def note_thread_read( def text_search( self, text: str, - entity_types: dict[str, Any], - project_ids: Optional[list] = None, + entity_types: Dict[str, Any], + project_ids: Optional[List] = None, limit: Optional[int] = None, - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Search across the specified entity types for the given text. @@ -3386,11 +3385,11 @@ def activity_stream_read( self, entity_type: str, entity_id: int, - entity_fields: Optional[dict[str, Any]] = None, + entity_fields: Optional[Dict[str, Any]] = None, min_id: Optional[int] = None, max_id: Optional[int] = None, limit: Optional[int] = None, - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Retrieve activity stream data from Shotgun. @@ -3524,8 +3523,8 @@ def nav_search_string( def nav_search_entity( self, root_path: str, - entity: dict[str, Any], - seed_entity_field: Optional[dict[str, Any]] = None, + entity: Dict[str, Any], + seed_entity_field: Optional[Dict[str, Any]] = None, ): """ Search function adapted to work with the navigation hierarchy. @@ -3570,7 +3569,7 @@ def get_session_token(self) -> str: return session_token - def preferences_read(self, prefs: Optional[list] = None) -> dict[str, Any]: + def preferences_read(self, prefs: Optional[List] = None) -> Dict[str, Any]: """ Get a subset of the site preferences. @@ -3593,7 +3592,7 @@ def preferences_read(self, prefs: Optional[list] = None) -> dict[str, Any]: return self._call_rpc("preferences_read", {"prefs": prefs}) - def user_subscriptions_read(self) -> list: + def user_subscriptions_read(self) -> List: """ Get the list of user subscriptions. @@ -3606,7 +3605,7 @@ def user_subscriptions_read(self) -> list: return self._call_rpc("user_subscriptions_read", None) def user_subscriptions_create( - self, users: list[dict[str, Union[str, list[str], None]]] + self, users: List[Dict[str, Union[str, List[str], None]]] ) -> bool: """ Assign subscriptions to users. @@ -3799,7 +3798,7 @@ def _call_rpc( return results[0] return results - def _auth_params(self) -> dict[str, Any]: + def _auth_params(self) -> Dict[str, Any]: """ Return a dictionary of the authentication parameters being used. """ @@ -3854,7 +3853,7 @@ def _auth_params(self) -> dict[str, Any]: return auth_params - def _sanitize_auth_params(self, params: dict[str, Any]) -> dict[str, Any]: + def _sanitize_auth_params(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Given an authentication parameter dictionary, sanitize any sensitive information and return the sanitized dict copy. @@ -3867,7 +3866,7 @@ def _sanitize_auth_params(self, params: dict[str, Any]) -> dict[str, Any]: def _build_payload( self, method: str, params, include_auth_params: bool = True - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Build the payload to be send to the rpc endpoint. """ @@ -3897,8 +3896,8 @@ def _encode_payload(self, payload) -> bytes: return json.dumps(payload, ensure_ascii=False).encode("utf-8") def _make_call( - self, verb: str, path: str, body, headers: Optional[dict[str, Any]] - ) -> tuple[tuple[int, str], dict[str, Any], str]: + self, verb: str, path: str, body, headers: Optional[Dict[str, Any]] + ) -> Tuple[Tuple[int, str], Dict[str, Any], str]: """ Make an HTTP call to the server. @@ -3949,8 +3948,8 @@ def _make_call( time.sleep(rpc_attempt_interval) def _http_request( - self, verb: str, path: str, body, headers: dict[str, Any] - ) -> tuple[tuple[int, str], dict[str, Any], str]: + self, verb: str, path: str, body, headers: Dict[str, Any] + ) -> Tuple[Tuple[int, str], Dict[str, Any], str]: """ Make the actual HTTP request. """ @@ -3988,7 +3987,7 @@ def _make_upload_request( raise return result - def _parse_http_status(self, status: tuple) -> None: + def _parse_http_status(self, status: Tuple) -> None: """ Parse the status returned from the http request. @@ -4007,8 +4006,8 @@ def _parse_http_status(self, status: tuple) -> None: return def _decode_response( - self, headers: dict[str, Any], body: str - ) -> Union[str, dict[str, Any]]: + self, headers: Dict[str, Any], body: str + ) -> Union[str, Dict[str, Any]]: """ Decode the response from the server from the wire format to a python data structure. @@ -4229,7 +4228,7 @@ def _close_connection(self) -> None: # ======================================================================== # Utility - def _parse_records(self, records: list) -> list: + def _parse_records(self, records: List) -> List: """ Parse 'records' returned from the api to do local modifications: @@ -4334,11 +4333,11 @@ def _build_thumb_url(self, entity_type: str, entity_id: int) -> str: def _dict_to_list( self, - d: Optional[dict[str, Any]], + d: Optional[Dict[str, Any]], key_name: str = "field_name", value_name: str = "value", extra_data=None, - ) -> list[dict[str, Any]]: + ) -> List[Dict[str, Any]]: """ Utility function to convert a dict into a list dicts using the key_name and value_name keys. @@ -4355,7 +4354,9 @@ def _dict_to_list( ret.append(d) return ret - def _dict_to_extra_data(self, d: Optional[dict], key_name="value") -> dict: + def _dict_to_extra_data( + self, d: Optional[Dict[str, Any]], key_name="value" + ) -> Dict[str, Any]: """ Utility function to convert a dict into a dict compatible with the extra_data arg of _dict_to_list. @@ -4385,7 +4386,7 @@ def _upload_file_to_storage(self, path: str, storage_url: str) -> None: LOG.debug("File uploaded to Cloud storage: %s", filename) def _multipart_upload_file_to_storage( - self, path: str, upload_info: dict[str, Any] + self, path: str, upload_info: Dict[str, Any] ) -> None: """ Internal function to upload a file to the Cloud storage in multiple parts. @@ -4429,7 +4430,7 @@ def _multipart_upload_file_to_storage( LOG.debug("File uploaded in multiple parts to Cloud storage: %s", path) def _get_upload_part_link( - self, upload_info: dict[str, Any], filename: str, part_number: int + self, upload_info: Dict[str, Any], filename: str, part_number: int ) -> str: """ Internal function to get the url to upload the next part of a file to the @@ -4528,7 +4529,7 @@ def _upload_data_to_storage( return etag def _complete_multipart_upload( - self, upload_info: dict[str, Any], filename: str, etags: Iterable[str] + self, upload_info: Dict[str, Any], filename: str, etags: Iterable[str] ) -> None: """ Internal function to complete a multi-part upload to the Cloud storage. @@ -4605,7 +4606,7 @@ def _requires_direct_s3_upload( else: return False - def _send_form(self, url: str, params: dict[str, Any]) -> str: + def _send_form(self, url: str, params: Dict[str, Any]) -> str: """ Utility function to send a Form to Shotgun and process any HTTP errors that could occur. @@ -4737,7 +4738,7 @@ def https_request(self, request): return self.http_request(request) -def _translate_filters(filters: Union[list, tuple], filter_operator) -> dict[str, Any]: +def _translate_filters(filters: Union[List, Tuple], filter_operator) -> Dict[str, Any]: """ Translate filters params into data structure expected by rpc call. """ @@ -4746,7 +4747,7 @@ def _translate_filters(filters: Union[list, tuple], filter_operator) -> dict[str return _translate_filters_dict(wrapped_filters) -def _translate_filters_dict(sg_filter: dict[str, Any]) -> dict[str, Any]: +def _translate_filters_dict(sg_filter: Dict[str, Any]) -> Dict[str, Any]: new_filters = {} filter_operator = sg_filter.get("filter_operator") @@ -4814,8 +4815,8 @@ def _version_str(version) -> str: def _optimize_filter_field( - field_value: Union[dict, list], recursive: bool = True -) -> Union[dict, list]: + field_value: Union[Dict[str, Any], List], recursive: bool = True +) -> Union[Dict, List]: """ For an FPT entity, returns a new dictionary with only the type, id, and other allowed keys.