1.添加g1获取原始音频脚本

This commit is contained in:
haotian 2025-08-29 17:27:26 +08:00
parent a4c377c721
commit 241ac482e4

View File

@ -0,0 +1,277 @@
#!/usr/bin/env python3
import sys
import time
import socket
import struct
import threading
import wave
import numpy as np
from typing import List, Tuple, Optional
# 常量定义
AUDIO_FILE_PATH = "../example/g1/audio/test.wav"
AUDIO_SUBSCRIBE_TOPIC = "rt/audio_msg"
GROUP_IP = "239.168.123.161"
PORT = 5555
WAV_SECOND = 5
WAV_LEN = 16000 * 2 * WAV_SECOND
class WavHandler:
"""WAV文件处理类"""
@staticmethod
def read_wave(file_path: str) -> Tuple[Optional[List[bytes]], int, int, bool]:
"""读取WAV文件"""
try:
with wave.open(file_path, 'rb') as wav_file:
sample_rate = wav_file.getframerate()
num_channels = wav_file.getnchannels()
frames = wav_file.readframes(wav_file.getnframes())
# 转换为字节列表
pcm_data = list(frames)
return pcm_data, sample_rate, num_channels, True
except Exception as e:
print(f"读取WAV文件失败: {e}")
return None, -1, 0, False
@staticmethod
def write_wave(file_path: str, sample_rate: int, pcm_data: List[int],
sample_count: int, num_channels: int) -> bool:
"""写入WAV文件"""
try:
with wave.open(file_path, 'wb') as wav_file:
wav_file.setnchannels(num_channels)
wav_file.setsampwidth(2) # 16位
wav_file.setframerate(sample_rate)
# 转换int16数据为bytes
pcm_bytes = struct.pack('<' + 'h' * len(pcm_data), *pcm_data)
wav_file.writeframes(pcm_bytes)
return True
except Exception as e:
print(f"写入WAV文件失败: {e}")
return False
class G1AudioClient:
"""Unitree G1音频客户端"""
def __init__(self):
self.timeout = 10.0
self.volume = 50
def init(self) -> bool:
"""初始化客户端"""
print("G1AudioClient 初始化完成")
return True
def set_timeout(self, timeout: float):
"""设置超时时间"""
self.timeout = timeout
def get_volume(self) -> Tuple[int, int]:
"""获取音量"""
# 模拟API调用
return 0, self.volume
def set_volume(self, volume: int) -> int:
"""设置音量"""
try:
if 0 <= volume <= 100:
self.volume = volume
return 0 # 成功
return -1 # 参数错误
except Exception:
return -1
def tts_maker(self, text: str, language: int = 0) -> int:
"""文字转语音"""
try:
lang_str = "中文" if language == 0 else "英文"
print(f"TTS播放 ({lang_str}): {text}")
# 这里应该调用实际的TTS API
return 0
except Exception as e:
print(f"TTS失败: {e}")
return -1
def play_stream(self, stream_name: str, timestamp: str, pcm_data: List[bytes]) -> int:
"""播放音频流"""
try:
print(f"开始播放音频流: {stream_name}, 时间戳: {timestamp}")
print(f"PCM数据大小: {len(pcm_data)} 字节")
# 这里应该调用实际的音频播放API
return 0
except Exception as e:
print(f"播放音频流失败: {e}")
return -1
def play_stop(self, stream_name: str) -> int:
"""停止播放"""
try:
print(f"停止播放音频流: {stream_name}")
# 这里应该调用实际的停止播放API
return 0
except Exception as e:
print(f"停止播放失败: {e}")
return -1
def led_control(self, r: int, g: int, b: int) -> int:
"""LED控制"""
try:
print(f"LED控制 - R:{r}, G:{g}, B:{b}")
# 这里应该调用实际的LED控制API
return 0
except Exception as e:
print(f"LED控制失败: {e}")
return -1
class AudioSubscriber:
"""音频消息订阅器"""
def __init__(self, topic: str):
self.topic = topic
self.running = False
def init_channel(self, callback):
"""初始化通道"""
self.callback = callback
self.running = True
print(f"音频订阅器初始化完成,主题: {self.topic}")
def asr_handler(self, message: str):
"""ASR消息处理器"""
if self.callback:
self.callback(message)
def asr_handler(message: str):
"""ASR消息处理函数"""
print(f'Topic:"{AUDIO_SUBSCRIBE_TOPIC}" recv: {message}')
def thread_mic():
"""麦克风录音线程"""
try:
# 创建UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定到本地地址
local_addr = ('', PORT)
sock.bind(local_addr)
# 加入组播组
mreq = struct.pack('4sl', socket.inet_aton(GROUP_IP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
total_bytes = 0
pcm_data = []
print("开始录音!")
while total_bytes < WAV_LEN:
try:
buffer, addr = sock.recvfrom(2048)
if len(buffer) > 0:
# 将字节数据转换为int16样本
sample_count = len(buffer) // 2
samples = struct.unpack('<' + 'h' * sample_count, buffer[:sample_count * 2])
pcm_data.extend(samples)
total_bytes += len(buffer)
except socket.timeout:
continue
except Exception as e:
print(f"接收数据错误: {e}")
break
# 保存录音文件
if WavHandler.write_wave("record.wav", 16000, pcm_data, len(pcm_data), 1):
print("录音完成! 保存到 record.wav")
else:
print("保存录音文件失败")
except Exception as e:
print(f"录音线程错误: {e}")
finally:
if 'sock' in locals():
sock.close()
def get_current_time_millisecond() -> int:
"""获取当前时间戳(毫秒)"""
return int(time.time() * 1000)
def main():
if len(sys.argv) < 2:
print("用法: python audio_client_example.py [NetworkInterface(eth0)]")
sys.exit(0)
network_interface = sys.argv[1]
print(f"使用网络接口: {network_interface}")
# 初始化通道工厂(在实际实现中需要)
print("初始化ChannelFactory...")
# 创建音频客户端
client = G1AudioClient()
client.init()
client.set_timeout(10.0)
# ASR消息订阅示例
subscriber = AudioSubscriber(AUDIO_SUBSCRIBE_TOPIC)
subscriber.init_channel(asr_handler)
# 音量控制示例
ret, volume = client.get_volume()
print(f"GetVolume API ret:{ret} volume = {volume}")
ret = client.set_volume(100)
print(f"SetVolume to 100% , API ret:{ret}")
# TTS示例
ret = client.tts_maker("你好。我是宇树科技的机器人。例程启动成功", 0) # 自动播放
print(f"TtsMaker API ret:{ret}")
time.sleep(5)
ret = client.tts_maker(
"Hello. I'm a robot from Unitree Robotics. The example has started successfully. ",
1) # 英文TTS
print(f"TtsMaker API ret:{ret}")
time.sleep(8)
# 音频播放示例
pcm_data, sample_rate, num_channels, file_state = WavHandler.read_wave(AUDIO_FILE_PATH)
print(f"wav文件 sample_rate = {sample_rate} num_channels = {num_channels} filestate = {file_state}")
if file_state and sample_rate == 16000 and num_channels == 1:
timestamp = str(get_current_time_millisecond())
client.play_stream("example", timestamp, pcm_data)
print("开始播放音频流")
time.sleep(3)
print("停止播放音频流")
ret = client.play_stop("example")
else:
print("音频文件格式错误,请检查!")
# LED控制示例
client.led_control(0, 255, 0) # 绿色
time.sleep(1)
client.led_control(0, 0, 0) # 关闭
time.sleep(1)
client.led_control(0, 0, 255) # 蓝色
print("AudioClient api测试完成ASR开始...")
# 启动麦克风录音线程
mic_thread = threading.Thread(target=thread_mic)
mic_thread.daemon = True
mic_thread.start()
try:
while True:
time.sleep(1) # 等待ASR消息
except KeyboardInterrupt:
print("\n程序退出")
sys.exit(0)
if __name__ == "__main__":
main()