Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 72 additions & 34 deletions Arctis_7_Plus_ChatMix.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@
import re
import usb.core

# First byte of the chatmix interrupt
CHATMIX_OPCODE = 0x45

supported_devices = [
{
"name": "Arctis 7P",
"idVendor": 0x1038,
"idProduct": 0x220e,
"hidIndex": 7
},
{
"name": "Nova 7 WOW Edition",
"idVendor": 0x1038,
"idProduct": 0x227a,
"hidIndex": 7
},
{
"name": "Arctis Nova 7 Gen 2",
"idVendor": 0x1038,
"idProduct": 0x227e,
"hidIndex": 8
}
]

class Arctis7PlusChatMix:
def __init__(self):
Expand All @@ -37,30 +60,12 @@ def __init__(self):
self.log = self._init_log()
self.log.info("Initializing ac7pcm...")

# identify the arctis 7+ device
try:
# Support both Arctis 7P (0x220e) and Nova 7 WOW Edition (0x227a)
self.dev = usb.core.find(idVendor=0x1038, idProduct=0x220e) or \
usb.core.find(idVendor=0x1038, idProduct=0x227a)
except Exception as e:
self.log.error("""Failed to identify the Arctis 7+ device.
Please ensure it is connected.\n
Please note: This program only supports the '7+' or the Nova 7 WoW Edition models""")
self.die_gracefully(trigger ="Couldn't find arctis7 model")

# select its interface and USB endpoint, and capture the endpoint address
try:
# interface index 7 of the Arctis 7+ is the USB HID for the ChatMix dial;
# its actual interface number on the device itself is 5.
self.interface = self.dev[0].interfaces()[7]
self.interface_num = self.interface.bInterfaceNumber
self.endpoint = self.interface.endpoints()[0]
self.addr = self.endpoint.bEndpointAddress
# get the default sink id from pactl
self.system_default_sink = os.popen("pactl get-default-sink").read().strip()
self.log.info(f"default sink identified as {self.system_default_sink}")

except Exception as e:
self.log.error("""Failure to identify relevant
USB device's interface or endpoint. Shutting down...""")
self.die_gracefully(exc=True, trigger ="identification of USB endpoint")
# Find a supported arctis device
self._bind_device()

# detach if the device is active
if self.dev.is_kernel_driver_active(self.interface_num):
Expand All @@ -78,15 +83,45 @@ def _init_log(self):
log.addHandler(stdout_handler)
return (log)


def _bind_device(self):
# Match on any devices which we have explicitly listed
def is_supported(dev):
return any(device_info["idVendor"] == dev.idVendor and device_info["idProduct"] == dev.idProduct for device_info in supported_devices)

# TODO: multi-device support... for now we just bind to the first supported device we find
matched_device = usb.core.find(find_all=False, custom_match=is_supported)

if not matched_device:
all_device_names = ", ".join([device_info["name"] for device_info in supported_devices])
self.log.error(f"""Failed to identify the Arctis device.
Please ensure it is connected and supported.\n
Supported models: {all_device_names}.""")
self.die_gracefully(trigger="Couldn't find supported arctis model")
return

self.dev = matched_device
device_info = next(d for d in supported_devices if d["idProduct"] == self.dev.idProduct)
self.log.info(f"Found supported device: {device_info['name']} ({hex(device_info['idVendor'])}:{hex(device_info['idProduct'])})")

# select its interface and USB endpoint, and capture the endpoint address
try:
self.interface = self.dev[0].interfaces()[device_info["hidIndex"]]
self.interface_num = self.interface.bInterfaceNumber
self.endpoint = self.interface.endpoints()[0]
self.addr = self.endpoint.bEndpointAddress

except Exception as e:
self.log.error("""Failure to identify relevant
USB device's interface or endpoint. Shutting down...""")
self.die_gracefully(exc=True, trigger ="identification of USB endpoint")


def _init_VAC(self):
"""Get name of default sink, establish virtual sink
and pipe its output to the default sink
"""

# get the default sink id from pactl
self.system_default_sink = os.popen("pactl get-default-sink").read().strip()
self.log.info(f"default sink identified as {self.system_default_sink}")

# attempt to identify an Arctis sink via pactl
try:
pactl_short_sinks = os.popen("pactl list short sinks").readlines()
Expand Down Expand Up @@ -184,15 +219,18 @@ def start_modulator_signal(self):
while True:
try:
# read the input of the USB signal. Signal is sent in 64-bit interrupt packets.
# read_input[0] is the opcode. We only care about chatmix (0x45)
# read_input[1] returns value to use for default device volume
# read_input[2] returns the value to use for virtual device volume
read_input = self.dev.read(self.addr, 64)
default_device_volume = "{}%".format(read_input[1])
virtual_device_volume = "{}%".format(read_input[2])

# os.system calls to issue the commands directly to pactl
os.system(f'pactl set-sink-volume Arctis_Game {default_device_volume}')
os.system(f'pactl set-sink-volume Arctis_Chat {virtual_device_volume}')
if read_input[0] == CHATMIX_OPCODE:
default_device_volume = "{}%".format(read_input[1])
virtual_device_volume = "{}%".format(read_input[2])

# os.system calls to issue the commands directly to pactl
os.system(f'pactl set-sink-volume Arctis_Game {default_device_volume}')
os.system(f'pactl set-sink-volume Arctis_Chat {virtual_device_volume}')
self.log.debug(f"Game Volume: {default_device_volume} | Chat Volume: {virtual_device_volume}")
except usb.core.USBTimeoutError:
pass
except usb.core.USBError:
Expand Down Expand Up @@ -223,7 +261,7 @@ def die_gracefully(self, sink_creation_fail=False, trigger=None):
sys.exit(1)
else:
self.log.info("-"*45)
self.log.info("Artcis 7+ ChatMix shut down gracefully... Bye Bye!")
self.log.info("Arctis 7+ ChatMix shut down gracefully... Bye Bye!")
self.log.info("-"*45)
sys.exit(0)

Expand Down
5 changes: 3 additions & 2 deletions system-config/91-steelseries-arctis-7p.rules
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# TODO: use envsubst to fill in the appropriate $USER in install.sh
SUBSYSTEM=="usb", ATTRS{idVendor}=="1038", ATTRS{idProduct}=="220e", OWNER="${USER}", GROUP="${USER}", MODE="0664"
SUBSYSTEM=="usb", ATTRS{idVendor}=="1038", ATTRS{idProduct}=="227a", OWNER="${USER}", GROUP="${USER}", MODE="0664"
SUBSYSTEM=="usb", ATTRS{idVendor}=="1038", ATTRS{idProduct}=="227e", OWNER="${USER}", GROUP="${USER}", MODE="0664"

ACTION=="add", SUBSYSTEM=="usb", ATTRS{idVendor}=="1038", ATTRS{idProduct}=="220e", TAG+="systemd", ENV{SYSTEMD_ALIAS}="/dev/arctis7"
ACTION=="add", SUBSYSTEM=="usb", ATTRS{idVendor}=="1038", ATTRS{idProduct}=="227a", TAG+="systemd", ENV{SYSTEMD_ALIAS}="/dev/arctis7"
ACTION=="remove", SUBSYSTEM=="usb", ENV{PRODUCT}=="1038/220e/*", TAG+="systemd"
ACTION=="add", SUBSYSTEM=="usb", ATTRS{idVendor}=="1038", ATTRS{idProduct}=="227e", TAG+="systemd", ENV{SYSTEMD_ALIAS}="/dev/arctis7"
ACTION=="remove", SUBSYSTEM=="usb", ENV{PRODUCT}=="1038/220e/*", TAG+="systemd"
5 changes: 2 additions & 3 deletions system-config/arctis7pcm.service
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
[Unit]
Description=Arctis 7+ ChatMix
Requisite=dev-arctis7.device
After=dev-arctis7.device
After=pipewire.service pipewire-pulse.service
StartLimitIntervalSec=1m
StartLimitBurst=5

Expand All @@ -12,4 +11,4 @@ Restart=on-failure
RestartSec=1

[Install]
WantedBy=dev-arctis7.device
WantedBy=default.target