diff --git a/unitree_sdk2_python-master/example/g1/audio/g1_audio_to_wav.py b/unitree_sdk2_python-master/example/g1/audio/g1_audio_to_wav.py new file mode 100644 index 0000000..88b8c58 --- /dev/null +++ b/unitree_sdk2_python-master/example/g1/audio/g1_audio_to_wav.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 +import sys +import time +import socket +import struct +import threading +import wave +import numpy as np +from typing import List, Tuple, Optional + +# 常量定义 +AUDIO_FILE_PATH = "../example/g1/audio/test.wav" +AUDIO_SUBSCRIBE_TOPIC = "rt/audio_msg" +GROUP_IP = "239.168.123.161" +PORT = 5555 + +WAV_SECOND = 5 +WAV_LEN = 16000 * 2 * WAV_SECOND + +class WavHandler: + """WAV文件处理类""" + + @staticmethod + def read_wave(file_path: str) -> Tuple[Optional[List[bytes]], int, int, bool]: + """读取WAV文件""" + try: + with wave.open(file_path, 'rb') as wav_file: + sample_rate = wav_file.getframerate() + num_channels = wav_file.getnchannels() + frames = wav_file.readframes(wav_file.getnframes()) + + # 转换为字节列表 + pcm_data = list(frames) + return pcm_data, sample_rate, num_channels, True + except Exception as e: + print(f"读取WAV文件失败: {e}") + return None, -1, 0, False + + @staticmethod + def write_wave(file_path: str, sample_rate: int, pcm_data: List[int], + sample_count: int, num_channels: int) -> bool: + """写入WAV文件""" + try: + with wave.open(file_path, 'wb') as wav_file: + wav_file.setnchannels(num_channels) + wav_file.setsampwidth(2) # 16位 + wav_file.setframerate(sample_rate) + + # 转换int16数据为bytes + pcm_bytes = struct.pack('<' + 'h' * len(pcm_data), *pcm_data) + wav_file.writeframes(pcm_bytes) + return True + except Exception as e: + print(f"写入WAV文件失败: {e}") + return False + +class G1AudioClient: + """Unitree G1音频客户端""" + + def __init__(self): + self.timeout = 10.0 + self.volume = 50 + + def init(self) -> bool: + """初始化客户端""" + print("G1AudioClient 初始化完成") + return True + + def set_timeout(self, timeout: float): + """设置超时时间""" + self.timeout = timeout + + def get_volume(self) -> Tuple[int, int]: + """获取音量""" + # 模拟API调用 + return 0, self.volume + + def set_volume(self, volume: int) -> int: + """设置音量""" + try: + if 0 <= volume <= 100: + self.volume = volume + return 0 # 成功 + return -1 # 参数错误 + except Exception: + return -1 + + def tts_maker(self, text: str, language: int = 0) -> int: + """文字转语音""" + try: + lang_str = "中文" if language == 0 else "英文" + print(f"TTS播放 ({lang_str}): {text}") + # 这里应该调用实际的TTS API + return 0 + except Exception as e: + print(f"TTS失败: {e}") + return -1 + + def play_stream(self, stream_name: str, timestamp: str, pcm_data: List[bytes]) -> int: + """播放音频流""" + try: + print(f"开始播放音频流: {stream_name}, 时间戳: {timestamp}") + print(f"PCM数据大小: {len(pcm_data)} 字节") + # 这里应该调用实际的音频播放API + return 0 + except Exception as e: + print(f"播放音频流失败: {e}") + return -1 + + def play_stop(self, stream_name: str) -> int: + """停止播放""" + try: + print(f"停止播放音频流: {stream_name}") + # 这里应该调用实际的停止播放API + return 0 + except Exception as e: + print(f"停止播放失败: {e}") + return -1 + + def led_control(self, r: int, g: int, b: int) -> int: + """LED控制""" + try: + print(f"LED控制 - R:{r}, G:{g}, B:{b}") + # 这里应该调用实际的LED控制API + return 0 + except Exception as e: + print(f"LED控制失败: {e}") + return -1 + +class AudioSubscriber: + """音频消息订阅器""" + + def __init__(self, topic: str): + self.topic = topic + self.running = False + + def init_channel(self, callback): + """初始化通道""" + self.callback = callback + self.running = True + print(f"音频订阅器初始化完成,主题: {self.topic}") + + def asr_handler(self, message: str): + """ASR消息处理器""" + if self.callback: + self.callback(message) + +def asr_handler(message: str): + """ASR消息处理函数""" + print(f'Topic:"{AUDIO_SUBSCRIBE_TOPIC}" recv: {message}') + +def thread_mic(): + """麦克风录音线程""" + try: + # 创建UDP socket + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + # 绑定到本地地址 + local_addr = ('', PORT) + sock.bind(local_addr) + + # 加入组播组 + mreq = struct.pack('4sl', socket.inet_aton(GROUP_IP), socket.INADDR_ANY) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + + total_bytes = 0 + pcm_data = [] + + print("开始录音!") + + while total_bytes < WAV_LEN: + try: + buffer, addr = sock.recvfrom(2048) + if len(buffer) > 0: + # 将字节数据转换为int16样本 + sample_count = len(buffer) // 2 + samples = struct.unpack('<' + 'h' * sample_count, buffer[:sample_count * 2]) + pcm_data.extend(samples) + total_bytes += len(buffer) + except socket.timeout: + continue + except Exception as e: + print(f"接收数据错误: {e}") + break + + # 保存录音文件 + if WavHandler.write_wave("record.wav", 16000, pcm_data, len(pcm_data), 1): + print("录音完成! 保存到 record.wav") + else: + print("保存录音文件失败") + + except Exception as e: + print(f"录音线程错误: {e}") + finally: + if 'sock' in locals(): + sock.close() + +def get_current_time_millisecond() -> int: + """获取当前时间戳(毫秒)""" + return int(time.time() * 1000) + +def main(): + if len(sys.argv) < 2: + print("用法: python audio_client_example.py [NetworkInterface(eth0)]") + sys.exit(0) + + network_interface = sys.argv[1] + print(f"使用网络接口: {network_interface}") + + # 初始化通道工厂(在实际实现中需要) + print("初始化ChannelFactory...") + + # 创建音频客户端 + client = G1AudioClient() + client.init() + client.set_timeout(10.0) + + # ASR消息订阅示例 + subscriber = AudioSubscriber(AUDIO_SUBSCRIBE_TOPIC) + subscriber.init_channel(asr_handler) + + # 音量控制示例 + ret, volume = client.get_volume() + print(f"GetVolume API ret:{ret} volume = {volume}") + + ret = client.set_volume(100) + print(f"SetVolume to 100% , API ret:{ret}") + + # TTS示例 + ret = client.tts_maker("你好。我是宇树科技的机器人。例程启动成功", 0) # 自动播放 + print(f"TtsMaker API ret:{ret}") + time.sleep(5) + + ret = client.tts_maker( + "Hello. I'm a robot from Unitree Robotics. The example has started successfully. ", + 1) # 英文TTS + print(f"TtsMaker API ret:{ret}") + time.sleep(8) + + # 音频播放示例 + pcm_data, sample_rate, num_channels, file_state = WavHandler.read_wave(AUDIO_FILE_PATH) + + print(f"wav文件 sample_rate = {sample_rate} num_channels = {num_channels} filestate = {file_state}") + + if file_state and sample_rate == 16000 and num_channels == 1: + timestamp = str(get_current_time_millisecond()) + client.play_stream("example", timestamp, pcm_data) + print("开始播放音频流") + time.sleep(3) + print("停止播放音频流") + ret = client.play_stop("example") + else: + print("音频文件格式错误,请检查!") + + # LED控制示例 + client.led_control(0, 255, 0) # 绿色 + time.sleep(1) + client.led_control(0, 0, 0) # 关闭 + time.sleep(1) + client.led_control(0, 0, 255) # 蓝色 + + print("AudioClient api测试完成,ASR开始...") + + # 启动麦克风录音线程 + mic_thread = threading.Thread(target=thread_mic) + mic_thread.daemon = True + mic_thread.start() + + try: + while True: + time.sleep(1) # 等待ASR消息 + except KeyboardInterrupt: + print("\n程序退出") + sys.exit(0) + +if __name__ == "__main__": + main() \ No newline at end of file