Skip to content

Commit

Permalink
Fix types
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Nov 13, 2024
1 parent d4eca2c commit 5a5c105
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 79 deletions.
4 changes: 2 additions & 2 deletions jupyter_client/_version.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"""The version information for jupyter client."""
import re
from typing import List, Union
from typing import Union

__version__ = "8.6.3"

# Build up version_info tuple for backwards compatibility
pattern = r"(?P<major>\d+).(?P<minor>\d+).(?P<patch>\d+)(?P<rest>.*)"
match = re.match(pattern, __version__)
if match:
parts: List[Union[int, str]] = [int(match[part]) for part in ["major", "minor", "patch"]]
parts: list[Union[int, str]] = [int(match[part]) for part in ["major", "minor", "patch"]]
if match["rest"]:
parts.append(match["rest"])
else:
Expand Down
64 changes: 32 additions & 32 deletions jupyter_client/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
# Distributed under the terms of the Modified BSD License.
import json
import re
from typing import Any, Dict, List, Tuple
from typing import Any

from ._version import protocol_version_info


def code_to_line(code: str, cursor_pos: int) -> Tuple[str, int]:
def code_to_line(code: str, cursor_pos: int) -> tuple[str, int]:
"""Turn a multiline code block and cursor position into a single line
and new cursor position.
Expand Down Expand Up @@ -59,32 +59,32 @@ class Adapter:
Override message_type(msg) methods to create adapters.
"""

msg_type_map: Dict[str, str] = {}
msg_type_map: dict[str, str] = {}

def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def update_header(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Update the header."""
return msg

def update_metadata(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def update_metadata(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Update the metadata."""
return msg

def update_msg_type(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def update_msg_type(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Update the message type."""
header = msg["header"]
msg_type = header["msg_type"]
if msg_type in self.msg_type_map:
msg["msg_type"] = header["msg_type"] = self.msg_type_map[msg_type]
return msg

def handle_reply_status_error(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def handle_reply_status_error(self, msg: dict[str, Any]) -> dict[str, Any]:
"""This will be called *instead of* the regular handler
on any reply with status != ok
"""
return msg

def __call__(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def __call__(self, msg: dict[str, Any]) -> dict[str, Any]:
msg = self.update_header(msg)
msg = self.update_metadata(msg)
msg = self.update_msg_type(msg)
Expand All @@ -100,7 +100,7 @@ def __call__(self, msg: Dict[str, Any]) -> Dict[str, Any]:
return handler(msg)


def _version_str_to_list(version: str) -> List[int]:
def _version_str_to_list(version: str) -> list[int]:
"""convert a version string to a list of ints
non-int segments are excluded
Expand All @@ -127,15 +127,15 @@ class V5toV4(Adapter):
"inspect_reply": "object_info_reply",
}

def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def update_header(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Update the header."""
msg["header"].pop("version", None)
msg["parent_header"].pop("version", None)
return msg

# shell channel

def kernel_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def kernel_info_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a kernel info reply."""
v4c = {}
content = msg["content"]
Expand All @@ -152,20 +152,20 @@ def kernel_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
msg["content"] = v4c
return msg

def execute_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def execute_request(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle an execute request."""
content = msg["content"]
content.setdefault("user_variables", [])
return msg

def execute_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def execute_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle an execute reply."""
content = msg["content"]
content.setdefault("user_variables", {})
# TODO: handle payloads
return msg

def complete_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def complete_request(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a complete request."""
content = msg["content"]
code = content["code"]
Expand All @@ -179,7 +179,7 @@ def complete_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
new_content["cursor_pos"] = cursor_pos
return msg

def complete_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def complete_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a complete reply."""
content = msg["content"]
cursor_start = content.pop("cursor_start")
Expand All @@ -189,7 +189,7 @@ def complete_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
content.pop("metadata", None)
return msg

def object_info_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def object_info_request(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle an object info request."""
content = msg["content"]
code = content["code"]
Expand All @@ -201,20 +201,20 @@ def object_info_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
new_content["detail_level"] = content["detail_level"]
return msg

def object_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def object_info_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
"""inspect_reply can't be easily backward compatible"""
msg["content"] = {"found": False, "oname": "unknown"}
return msg

# iopub channel

def stream(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def stream(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a stream message."""
content = msg["content"]
content["data"] = content.pop("text")
return msg

def display_data(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def display_data(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a display data message."""
content = msg["content"]
content.setdefault("source", "display")
Expand All @@ -229,7 +229,7 @@ def display_data(self, msg: Dict[str, Any]) -> Dict[str, Any]:

# stdin channel

def input_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def input_request(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle an input request."""
msg["content"].pop("password", None)
return msg
Expand All @@ -243,7 +243,7 @@ class V4toV5(Adapter):
# invert message renames above
msg_type_map = {v: k for k, v in V5toV4.msg_type_map.items()}

def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def update_header(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Update the header."""
msg["header"]["version"] = self.version
if msg["parent_header"]:
Expand All @@ -252,7 +252,7 @@ def update_header(self, msg: Dict[str, Any]) -> Dict[str, Any]:

# shell channel

def kernel_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def kernel_info_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a kernel info reply."""
content = msg["content"]
for key in ("protocol_version", "ipython_version"):
Expand All @@ -275,7 +275,7 @@ def kernel_info_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
content["banner"] = ""
return msg

def execute_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def execute_request(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle an execute request."""
content = msg["content"]
user_variables = content.pop("user_variables", [])
Expand All @@ -284,7 +284,7 @@ def execute_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
user_expressions[v] = v
return msg

def execute_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def execute_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle an execute reply."""
content = msg["content"]
user_expressions = content.setdefault("user_expressions", {})
Expand All @@ -301,7 +301,7 @@ def execute_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:

return msg

def complete_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def complete_request(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a complete request."""
old_content = msg["content"]

Expand All @@ -310,7 +310,7 @@ def complete_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
new_content["cursor_pos"] = old_content["cursor_pos"]
return msg

def complete_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def complete_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a complete reply."""
# complete_reply needs more context than we have to get cursor_start and end.
# use special end=null to indicate current cursor position and negative offset
Expand All @@ -328,7 +328,7 @@ def complete_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
new_content["metadata"] = {}
return msg

def inspect_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def inspect_request(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle an inspect request."""
content = msg["content"]
name = content["oname"]
Expand All @@ -339,7 +339,7 @@ def inspect_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
new_content["detail_level"] = content["detail_level"]
return msg

def inspect_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def inspect_reply(self, msg: dict[str, Any]) -> dict[str, Any]:
"""inspect_reply can't be easily backward compatible"""
content = msg["content"]
new_content = msg["content"] = {"status": "ok"}
Expand All @@ -363,13 +363,13 @@ def inspect_reply(self, msg: Dict[str, Any]) -> Dict[str, Any]:

# iopub channel

def stream(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def stream(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle a stream message."""
content = msg["content"]
content["text"] = content.pop("data")
return msg

def display_data(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def display_data(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle display data."""
content = msg["content"]
content.pop("source", None)
Expand All @@ -384,13 +384,13 @@ def display_data(self, msg: Dict[str, Any]) -> Dict[str, Any]:

# stdin channel

def input_request(self, msg: Dict[str, Any]) -> Dict[str, Any]:
def input_request(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Handle an input request."""
msg["content"].setdefault("password", False)
return msg


def adapt(msg: Dict[str, Any], to_version: int = protocol_version_info[0]) -> Dict[str, Any]:
def adapt(msg: dict[str, Any], to_version: int = protocol_version_info[0]) -> dict[str, Any]:
"""Adapt a single message to a target version
Parameters
Expand Down
6 changes: 3 additions & 3 deletions jupyter_client/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import tempfile
import warnings
from getpass import getpass
from typing import TYPE_CHECKING, Any, Dict, Union, cast
from typing import TYPE_CHECKING, Any, Union, cast

import zmq
from jupyter_core.paths import jupyter_data_dir, jupyter_runtime_dir, secure_write
Expand All @@ -32,7 +32,7 @@
from .session import Session

# Define custom type for kernel connection info
KernelConnectionInfo = Dict[str, Union[int, str, bytes]]
KernelConnectionInfo = dict[str, Union[int, str, bytes]]


def write_connection_file(
Expand Down Expand Up @@ -275,7 +275,7 @@ def tunnel_to_kernel(
with open(connection_info) as f:
connection_info = json.loads(f.read())

cf = cast(Dict[str, Any], connection_info)
cf = cast(dict[str, Any], connection_info)

lports = tunnel.select_random_ports(5)
rports = (
Expand Down
6 changes: 3 additions & 3 deletions jupyter_client/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
import sys
import warnings
from subprocess import PIPE, Popen
from typing import Any, Dict, List, Optional
from typing import Any, Optional

from traitlets.log import get_logger


def launch_kernel(
cmd: List[str],
cmd: list[str],
stdin: Optional[int] = None,
stdout: Optional[int] = None,
stderr: Optional[int] = None,
env: Optional[Dict[str, str]] = None,
env: Optional[dict[str, str]] = None,
independent: bool = False,
cwd: Optional[str] = None,
**kw: Any,
Expand Down
4 changes: 1 addition & 3 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ async def _async_start_kernel(self, *, kernel_name: str | None = None, **kwargs:
km, kernel_name, kernel_id = self.pre_start_kernel(kernel_name, kwargs)
if not isinstance(km, KernelManager):
self.log.warning( # type:ignore[unreachable]
"Kernel manager class ({km_class}) is not an instance of 'KernelManager'!".format(
km_class=self.kernel_manager_class.__class__
)
f"Kernel manager class ({self.kernel_manager_class.__class__}) is not an instance of 'KernelManager'!"
)
kwargs["kernel_id"] = kernel_id # Make kernel_id available to manager and provisioner

Expand Down
10 changes: 5 additions & 5 deletions jupyter_client/provisioning/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import glob
import sys
from os import getenv, path
from typing import Any, Dict, List
from typing import Any

# See compatibility note on `group` keyword in https://docs.python.org/3/library/importlib.metadata.html#entry-points
if sys.version_info < (3, 10): # pragma: no cover
Expand Down Expand Up @@ -33,7 +33,7 @@ class KernelProvisionerFactory(SingletonConfigurable):
"""

GROUP_NAME = "jupyter_client.kernel_provisioners"
provisioners: Dict[str, EntryPoint] = {}
provisioners: dict[str, EntryPoint] = {}

default_provisioner_name_env = "JUPYTER_DEFAULT_PROVISIONER_NAME"
default_provisioner_name = Unicode(
Expand Down Expand Up @@ -122,7 +122,7 @@ def _check_availability(self, provisioner_name: str) -> bool:
is_available = False
return is_available

def _get_provisioner_config(self, kernel_spec: Any) -> Dict[str, Any]:
def _get_provisioner_config(self, kernel_spec: Any) -> dict[str, Any]:
"""
Return the kernel_provisioner stanza from the kernel_spec.
Expand Down Expand Up @@ -151,7 +151,7 @@ def _get_provisioner_config(self, kernel_spec: Any) -> Dict[str, Any]:
return env_provisioner # Return what we found (plus config stanza if necessary)
return {"provisioner_name": self.default_provisioner_name, "config": {}}

def get_provisioner_entries(self) -> Dict[str, str]:
def get_provisioner_entries(self) -> dict[str, str]:
"""
Returns a dictionary of provisioner entries.
Expand All @@ -164,7 +164,7 @@ def get_provisioner_entries(self) -> Dict[str, str]:
return entries

@staticmethod
def _get_all_provisioners() -> List[EntryPoint]:
def _get_all_provisioners() -> list[EntryPoint]:
"""Wrapper around entry_points (to fetch the set of provisioners) - primarily to facilitate testing."""
return entry_points(group=KernelProvisionerFactory.GROUP_NAME)

Expand Down
Loading

0 comments on commit 5a5c105

Please sign in to comment.