本地部署 xiaozhi-esp32-server

注意
本文最后更新于 2025-03-20,文中内容可能已过时。

安装 Anaconda

下载安装

清华大学开源软件镜像站:https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/

启动安装程序时,以管理员身份运行

/images/documents/本地部署xiaozhi-esp32-server/1.png
(图1)
/images/documents/本地部署xiaozhi-esp32-server/2.png
(图2)

安装的过程中会非常的慢

环境变量

path 环境变量添加以下内容:

  • D:\Anaconda
  • D:\Anaconda\Scripts
  • D:\Anaconda\Library\bin
  • D:\Anaconda\Library\mingw-w64\bin
  • D:\Anaconda\Library\usr\bin
  • D:\Anaconda\pkgs

测试是否配置成功 conda --version

部署 xiaozhi-sep32-server

安装基础环境

/images/documents/本地部署xiaozhi-esp32-server/3.png
(图3)

运行之后,如果你能看到命令行窗口前面有一个( base )字样,说明你成功进入了 conda 环境。那么你就可以执行以下命令了。

/images/documents/本地部署xiaozhi-esp32-server/4.png
(图4)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
conda remove -n xiaozhi-esp32-server --all -y
conda create -n xiaozhi-esp32-server python=3.10 -y
conda activate xiaozhi-esp32-server

# 添加清华源通道
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge

conda install libopus -y
conda install ffmpeg -y

安装本项目依赖

xiaozhi-esp32-server 地址,点击 Download ZIP 按钮下载项目压缩包。

此时它的名字可能叫 xiaozhi-esp32-server-main,你需要把它重命名成 xiaozhi-esp32-server,在这个文件里,进入到 main 文件夹,再进入到 xiaozhi-server,好了请记住这个目录 xiaozhi-server

1
2
3
4
5
6
# 继续使用conda环境
conda activate xiaozhi-esp32-server
# 进入到你的项目根目录,再进入main/xiaozhi-server
cd main/xiaozhi-server
pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/
pip install -r requirements.txt

下载语音识别模型文件

到项目作者提供的连接处下载,下载后把 model.pt 文件放在 models/SenseVoiceSmall 目录下。

配置项目文件

如果你的 xiaozhi-server 目录没有 data,你需要创建 data 目录。 如果你的 data 下面没有 .config.yaml 文件,你可以把源码目录下的 config.yaml 文件复制一份,重命名为 .config.yaml

运行项目

1
2
3
# 确保在xiaozhi-server目录下执行
conda activate xiaozhi-esp32-server
python app.py

报错处理

错误一

Exception: Could not find Opus library. Make sure it is installed.

/images/documents/本地部署xiaozhi-esp32-server/5.png
(图5)

1.找到报错文件 {python下目录}\lib\site-packages\opuslib_next\api\__init__.py 并打开 __init__.py

我的位置是在:E:\software\anaconda3\envs\xiaozhi-esp32-server\Lib\site-packages\opuslib_next\api\__init__.py

/images/documents/本地部署xiaozhi-esp32-server/6.png
(图6)

2.下载 opus:https://github.com/ShiftMediaProject/opus/releases

/images/documents/本地部署xiaozhi-esp32-server/7.png
(图7)

3.解压找到 x64 文件下的 opus.dll,并复制路径

/images/documents/本地部署xiaozhi-esp32-server/8.png
(图8)

4.添加代码至 步骤1 的红框位置处保存文件

1
2
if lib_location is None:
    lib_location = r'D:\ProgramData\Anaconda3\envs\xiaozhi-esp32-server\Lib\site-packages\libopus_v1.4_msvc17\bin\x64\opus.dll'

错误二

提示 ffmpeg is not installed 没有安装,重新进行 conda install conda-forge::ffmpeg -y 依然无果

1.打开 Dpwnload FFmpeg 官网 https://ffmpeg.org/download.html,选择安装包 Windows builds from gyan.dev

/images/documents/本地部署xiaozhi-esp32-server/9.png
(图9)

2.下滑找到 release bulids 部分,选择 ffmpeg-7.0.2-essentials_build.zip

/images/documents/本地部署xiaozhi-esp32-server/10.png
(图10)

3.下载完成后,解压缩得到 FFmpeg 文件夹。

解压后的文件夹中应包含以下目录:

  • bin:FFmpeg 可执行文件所在的文件夹,运行 FFmpeg 的所有命令都需通过此目录下的文件。
  • doc:文档资料。
  • presets:预设的格式和编码方案。

进入 bin 目录,可以看到 FFmpeg 的三个核心可执行文件:

/images/documents/本地部署xiaozhi-esp32-server/11.png
(图11)

4.将上述三个 exe 文件复制到 conda xiaozhi-esp32-server 环境下,例:

C:\Users\用户名\.conda\envs\xiaozhi-esp32-server\Library\bin

/images/documents/本地部署xiaozhi-esp32-server/12.png
(图12)

5.重新进入到你的项目目录,执行以下命令

1
2
3
4
5
conda activate xiaozhi-esp32-server

pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/

pip install -r requirements.txt

PyCharm 调试 xiaozhi-esp32-server

Anaconda 配置好虚拟环境后,需要将环境添加进 PyCharm 中。

/images/documents/本地部署xiaozhi-esp32-server/13.png
(图13)

点击文件夹图标

/images/documents/本地部署xiaozhi-esp32-server/14.png
(图14)
  • 因为是 Conda executable,所以我们要选择 Conda.exe 不能选择 Python 解释器,因此我们要选择那个在 Anaconda 根目录下的 _Conda.exe (注意:文件名中有下划线)。如果找不到 _conda.exe,可以选择 base 环境下的 conda.exe(即 Anaconda 根目录下的 Conda.exe ),在新版本的 anaconda 中好像已经无法找到 _conda.exe 了,选择 conda.exe 即可,然后点击右侧的 Load Environments。如果在 base 环境中找不到 conda.exe,进 Scripts 选择 conda.exe 即可。
  • 根目录下的 _conda.exe 或者 conda.exe
  • 如果根目录没有,选择 Scripts 目录下的 conda.exe

选择完成后,最后点击右侧的 Load Environments

/images/documents/本地部署xiaozhi-esp32-server/15.png
(图15)

接下来,因为我们已经在 Anaconda 中创建了虚拟环境,因此点击 Use existing environment, 选择已安装的 Anaconda 中的虚拟环境即可,点击右下角的 OK,即可

/images/documents/本地部署xiaozhi-esp32-server/16.png
(图16)

根据需求选择已经存在的环境即可

/images/documents/本地部署xiaozhi-esp32-server/17.png
(图17)

返回创建项目页面,点击 Create 即可

/images/documents/本地部署xiaozhi-esp32-server/18.png
(图18)

成功进入环境

/images/documents/本地部署xiaozhi-esp32-server/19.png
(图19)

Previously configured interpreter 中无法识别
看看所要添加的虚拟环境中是否 installpython,环境中没有安装 python 是识别不到的。 也就说仅创建一个虚拟环境,pycharm 应该是无法识别到的。
提示
环境变量 Path 里添加了 AnacondaScripts 的话就可以用 Scripts 里面的 conda.exe

补充

项目在 PyCharm 中突然无法启动

/images/documents/本地部署xiaozhi-esp32-server/21.png
(图21)

但是我在 Anaconda Prompt 中运行是正常的

/images/documents/本地部署xiaozhi-esp32-server/20.png
(图20)

经过测试,ffmpeg 确实安装了,没有遗漏,我在 app.py 中将 check_ffmpeg_installed() 注释,可以正常启动使用,就是控制台会打印提示信息

/images/documents/本地部署xiaozhi-esp32-server/22.png
(图22)

构建 Docker 镜像

将本地项目复制到服务器上,注意是整个项目,而不是只复制一个 xiaozhi-server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 依赖中 torch 太大了,下载会超时,所以先单独下载依赖,构建镜像时,复制过去就行了
# 注意:需要的是 python 为 3.10 的依赖,如果你的服务器的 python 版本不是 3.10,那么可以通过 conda 来下载 3.10 的依赖

# 创建 python 为 3.10 的虚拟环境
conda create --name xiaozhi-esp32-server python=3.10

# 下载依赖,根据 /xiaozhi-esp32-server/main/xiaozhi-server/requirements.txt 下载依赖到 /xiaozhi-esp32-server/dependencies
pip download -r /xiaozhi-esp32-server/main/xiaozhi-server/requirements.txt -d /xiaozhi-esp32-server/dependencies 

# 退出并删除虚拟环境
conda deactivate
conda remove --name xiaozhi-esp32-server --all

由于自身需求我在 requirements.txt 中添加了一个新的依赖 psycopg2,因此我需要修改 Dockerfile-server 文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 第一阶段:构建Python依赖
FROM python:3.10-slim AS builder

WORKDIR /app

# 安装编译工具链和 PostgreSQL 开发库
RUN apt-get update && \
    apt-get install -y --no-install-recommends build-essential libpq-dev && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# 复制本地下载的依赖
COPY dependencies /app/dependencies

# 复制 requirements.txt
COPY main/xiaozhi-server/requirements.txt .

# 离线安装依赖
RUN pip install --no-index --find-links=/app/dependencies -r requirements.txt

# 第二阶段:生产镜像
FROM python:3.10-slim

WORKDIR /opt/xiaozhi-esp32-server

# 安装系统依赖(包括 libpq 运行时库)
RUN apt-get update && \
    apt-get install -y --no-install-recommends libopus0 ffmpeg libpq5 && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# 从构建阶段复制Python包和前端构建产物
COPY --from=builder /usr/local/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages

# 复制应用代码
COPY main/xiaozhi-server .

# 启动应用
CMD ["python", "app.py"]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 第一阶段:构建Python依赖
FROM python:3.10-slim AS builder

WORKDIR /app

# 安装编译工具链(保留 build-essential 用于编译某些 Python 包)
RUN apt-get update && \
    apt-get install -y --no-install-recommends build-essential && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# 复制本地下载的依赖
COPY dependencies /app/dependencies

# 复制 requirements.txt
COPY main/xiaozhi-server/requirements.txt .

# 离线安装依赖
RUN pip install --no-index --find-links=/app/dependencies -r requirements.txt

# 第二阶段:生产镜像
FROM python:3.10-slim

WORKDIR /opt/xiaozhi-esp32-server

# 安装系统依赖(去掉了 libpq5,保留其他可能需要的库)
RUN apt-get update && \
    apt-get install -y --no-install-recommends libopus0 ffmpeg && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# 从构建阶段复制Python包
COPY --from=builder /usr/local/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages

# 复制应用代码
COPY main/xiaozhi-server .

# 启动应用
CMD ["python", "app.py"]
  • libpq-dev 移除编译时 PostgreSQL 依赖,现在不需要了
  • libpq5 移除运行时 PostgreSQL 依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 进入项目根目录,编译server
docker build -t xiaozhi-esp32-server:server_latest -f ./Dockerfile-server .

# 启动镜像看是否报错
docker run -it --rm xiaozhi-esp32-server:server_latest python app.py

# 或者后台启动容器
# -d: 以分离模式(后台运行)启动容器。
# --name my-running-app: 给你的容器指定一个名称(在这个例子中是 my-running-app)。如果不指定名称,默认会分配一个随机生成的名字。
# -p 8000:8000:将宿主机的 30800 端口映射到容器内的 30800 端口。
# xiaozhi-esp32-server:v1.0: 这是你想要运行的镜像名和标签。
docker run -d --name my-websocket-app -p 30800:30800 xiaozhi-esp32-server:v1.0
# 不指定容器名称
docker run -d -p 30800:30800 xiaozhi-esp32-server:v1.0

# 提示:配置文件太旧了
# 复制配置文件到容器内
docker cp /home/agi/xiaozhi-esp32-server/main/xiaozhi-server/data/.config.yaml 容器名称/容器id:/opt/xiaozhi-esp32-server/data/.config.yaml

# 再次测试
# 启动容器
docker start 容器id

# 进入容器
docker exec -it 容器id /bin/bash

# 启动
python app.py

# 保存镜像为 .tar 文件
# 这里的 -o 参数用于指定输出文件名(在这个例子中是 xiaozhi-esp32-server_v1.0.tar),后面跟着的是你要保存的镜像名和标签
docker save -o xiaozhi-esp32-server_v1.0.tar xiaozhi-esp32-server:v1.0

# 发送镜像到另一台服务器的目录下
scp xiaozhi-esp32-server_v1.0.tar ip:目录

# 加载 .tar 文件为镜像
docker load -i /path/to/xiaozhi-esp32-server_v1.0.tar
依赖冲突

在构建镜像的过程中提示依赖冲突

/images/documents/本地部署xiaozhi-esp32-server/23.png
(图23)

当前操作系统不支持 onnxruntime>=1.17.0 所以需要降低 onnxruntime 版本到 1.16.0,被 magika 所依赖,但是 magika 版本大于 0.5.0 时需要 onnxruntime>=1.17.0,所以需要调整 magika==0.5.0,同时 markitdown 依赖于 magika

最终的解决方法是指定 magika 的版本 magika==0.5.0markitdown 移除 markitdown 版本的指定

缺少依赖
pip download google-api-core[grpc] --dest /xxxxxx/xiaozhi-esp32-server/dependencies/

Conda 相关命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 检查当前 Conda 配置中的通道设置
conda config --show channels

# 添加镜像
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main

# 移除镜像
conda config --remove channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge
conda config --remove channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
conda config --remove channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main

过滤LLM返回结果中括号及括号中的内容

在调用 TTS 合成前过滤括号及括号中的内容,路径:\xiaozhi-esp32-server\main\xiaozhi-server\core\providers\tts\base.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
import os
import queue
import uuid
import asyncio
import threading
from core.utils import p3
from datetime import datetime
from core.utils import textUtils
from abc import ABC, abstractmethod
from config.logger import setup_logging
from core.utils.util import audio_to_data
from core.utils.tts import MarkdownCleaner
from core.utils.output_counter import add_device_output
from core.handle.reportHandle import enqueue_tts_report
from core.handle.sendAudioHandle import sendAudioMessage
from core.providers.tts.dto.dto import (
    TTSMessageDTO,
    SentenceType,
    ContentType,
    InterfaceType,
)
import re


import traceback

TAG = __name__
logger = setup_logging()


class TTSProviderBase(ABC):
    def __init__(self, config, delete_audio_file):
        self.interface_type = InterfaceType.NON_STREAM
        self.conn = None
        self.tts_timeout = 10
        self.delete_audio_file = delete_audio_file
        self.output_file = config.get("output_dir", "tmp/")
        self.tts_text_queue = queue.Queue()
        self.tts_audio_queue = queue.Queue()
        self.tts_audio_first_sentence = True

        self.tts_text_buff = []
        self.punctuations = (
            "。",
            ".",
            "?",
            "?",
            "!",
            "!",
            ";",
            ";",
            ":",
        )
        self.first_sentence_punctuations = (
            ",",
            "~",
            "~",
            "、",
            ",",
            "。",
            ".",
            "?",
            "?",
            "!",
            "!",
            ";",
            ";",
            ":",
        )
        self.tts_stop_request = False
        self.processed_chars = 0
        self.is_first_sentence = True
        self.brackets_arr = []  # 存放找到的括号及内容
        self.text_before_brackets = ""  # 括号前被忽略的文本
        self.before_text_arr = []   # 括号前被忽略的文本数组

    def generate_filename(self, extension=".wav"):
        return os.path.join(
            self.output_file,
            f"tts-{datetime.now().date()}@{uuid.uuid4().hex}{extension}",
        )

    def to_tts(self, text):
        tmp_file = self.generate_filename()
        try:
            max_repeat_time = 5
            text = MarkdownCleaner.clean_markdown(text)
            while not os.path.exists(tmp_file) and max_repeat_time > 0:
                try:
                    asyncio.run(self.text_to_speak(text, tmp_file))
                except Exception as e:
                    logger.bind(tag=TAG).warning(
                        f"语音生成失败{5 - max_repeat_time + 1}次: {text},错误: {e}"
                    )
                    # 未执行成功,删除文件
                    if os.path.exists(tmp_file):
                        os.remove(tmp_file)
                    max_repeat_time -= 1

            if max_repeat_time > 0:
                logger.bind(tag=TAG).info(
                    f"语音生成成功: {text}:{tmp_file},重试{5 - max_repeat_time}次"
                )
            else:
                logger.bind(tag=TAG).error(
                    f"语音生成失败: {text},请检查网络或服务是否正常"
                )

            return tmp_file
        except Exception as e:
            logger.bind(tag=TAG).error(f"Failed to generate TTS file: {e}")
            return None

    @abstractmethod
    async def text_to_speak(self, text, output_file):
        pass

    def audio_to_pcm_data(self, audio_file_path):
        """音频文件转换为PCM编码"""
        return audio_to_data(audio_file_path, is_opus=False)

    def audio_to_opus_data(self, audio_file_path):
        """音频文件转换为Opus编码"""
        return audio_to_data(audio_file_path, is_opus=True)

    def tts_one_sentence(
        self,
        conn,
        content_type,
        content_detail=None,
        content_file=None,
        sentence_id=None,
    ):
        """发送一句话"""
        if not sentence_id:
            if conn.sentence_id:
                sentence_id = conn.sentence_id
            else:
                sentence_id = str(uuid.uuid4()).replace("-", "")
                conn.sentence_id = sentence_id
        self.tts_text_queue.put(
            TTSMessageDTO(
                sentence_id=sentence_id,
                sentence_type=SentenceType.FIRST,
                content_type=ContentType.ACTION,
            )
        )
        self.tts_text_queue.put(
            TTSMessageDTO(
                sentence_id=sentence_id,
                sentence_type=SentenceType.MIDDLE,
                content_type=content_type,
                content_detail=content_detail,
                content_file=content_file,
            )
        )
        self.tts_text_queue.put(
            TTSMessageDTO(
                sentence_id=sentence_id,
                sentence_type=SentenceType.LAST,
                content_type=ContentType.ACTION,
            )
        )

    async def open_audio_channels(self, conn):
        self.conn = conn
        self.tts_timeout = conn.config.get("tts_timeout", 10)
        # tts 消化线程
        self.tts_priority_thread = threading.Thread(
            target=self.tts_text_priority_thread, daemon=True
        )
        self.tts_priority_thread.start()

        # 音频播放 消化线程
        self.audio_play_priority_thread = threading.Thread(
            target=self._audio_play_priority_thread, daemon=True
        )
        self.audio_play_priority_thread.start()

    # 这里默认是非流式的处理方式
    # 流式处理方式请在子类中重写
    def tts_text_priority_thread(self):
        while not self.conn.stop_event.is_set():
            try:
                message = self.tts_text_queue.get(timeout=1)
                if self.conn.client_abort:
                    logger.bind(tag=TAG).info("收到打断信息,终止TTS文本处理线程")
                    continue
                if message.sentence_type == SentenceType.FIRST:
                    # 初始化参数
                    self.tts_stop_request = False
                    self.processed_chars = 0
                    self.tts_text_buff = []
                    self.is_first_sentence = True
                    self.tts_audio_first_sentence = True
                    self.brackets_arr = []  # 重置
                    self.text_before_brackets = ""  # 重置
                    self.before_text_arr = []  # 重置
                elif ContentType.TEXT == message.content_type:
                    self.tts_text_buff.append(message.content_detail)
                    segment_text = self._get_segment_text()
                    if segment_text:
                        tts_file = self.to_tts(segment_text)
                        if tts_file:
                            audio_datas = self._process_audio_file(tts_file)
                            self.tts_audio_queue.put(
                                (message.sentence_type, audio_datas, segment_text)
                            )
                elif ContentType.FILE == message.content_type:
                    self._process_remaining_text()
                    tts_file = message.content_file
                    if tts_file and os.path.exists(tts_file):
                        audio_datas = self._process_audio_file(tts_file)
                        self.tts_audio_queue.put(
                            (message.sentence_type, audio_datas, message.content_detail)
                        )

                if message.sentence_type == SentenceType.LAST:
                    self._process_remaining_text()
                    self.tts_audio_queue.put(
                        (message.sentence_type, [], message.content_detail)
                    )

            except queue.Empty:
                continue
            except Exception as e:
                logger.bind(tag=TAG).error(
                    f"处理TTS文本失败: {str(e)}, 类型: {type(e).__name__}, 堆栈: {traceback.format_exc()}"
                )
                continue

    def _audio_play_priority_thread(self):
        while not self.conn.stop_event.is_set():
            text = None
            try:
                try:
                    sentence_type, audio_datas, text = self.tts_audio_queue.get(
                        timeout=1
                    )
                except queue.Empty:
                    if self.conn.stop_event.is_set():
                        break
                    continue
                future = asyncio.run_coroutine_threadsafe(
                    sendAudioMessage(self.conn, sentence_type, audio_datas, text),
                    self.conn.loop,
                )
                future.result()
                if self.conn.max_output_size > 0 and text:
                    add_device_output(self.conn.headers.get("device-id"), len(text))
                enqueue_tts_report(self.conn, text, audio_datas)
            except Exception as e:
                logger.bind(tag=TAG).error(
                    f"audio_play_priority priority_thread: {text} {e}"
                )

    async def start_session(self, session_id):
        pass

    async def finish_session(self, session_id):
        pass

    async def close(self):
        """资源清理方法"""
        if hasattr(self, "ws") and self.ws:
            await self.ws.close()

    # def _get_segment_text(self):
    #     # 合并当前全部文本并处理未分割部分
    #     full_text = "".join(self.tts_text_buff)
    #     current_text = full_text[self.processed_chars :]  # 从未处理的位置开始
    #     last_punct_pos = -1
    #
    #     # 根据是否是第一句话选择不同的标点符号集合
    #     punctuations_to_use = (
    #         self.first_sentence_punctuations
    #         if self.is_first_sentence
    #         else self.punctuations
    #     )
    #
    #     for punct in punctuations_to_use:
    #         pos = current_text.rfind(punct)
    #         if (pos != -1 and last_punct_pos == -1) or (
    #             pos != -1 and pos < last_punct_pos
    #         ):
    #             last_punct_pos = pos
    #
    #     if last_punct_pos != -1:
    #         segment_text_raw = current_text[: last_punct_pos + 1]
    #         segment_text = textUtils.get_string_no_punctuation_or_emoji(
    #             segment_text_raw
    #         )
    #         self.processed_chars += len(segment_text_raw)  # 更新已处理字符位置
    #
    #         # 如果是第一句话,在找到第一个逗号后,将标志设置为False
    #         if self.is_first_sentence:
    #             self.is_first_sentence = False
    #
    #         return segment_text
    #     elif self.tts_stop_request and current_text:
    #         segment_text = current_text
    #         self.is_first_sentence = True  # 重置标志
    #         return segment_text
    #     else:
    #         return None

    def _get_segment_text(self):
        # 合并当前全部文本并处理未分割部分
        full_text = "".join(self.tts_text_buff)
        # 判断是否有不成对的括号
        single_bracket = self.has_unpaired_brackets(full_text)
        if single_bracket:
            return None

        skip_text_len = 0

        # 判断是否有双括号
        found, brackets = self.has_paired_brackets(full_text)
        if found and len(self.brackets_arr) < len(brackets) and len(brackets) > 0:
            # 有括号, 且是新的括号
            self.brackets_arr = brackets

            skip_text_len = len(full_text) - self.processed_chars - len(brackets[-1])
            skip_text = full_text[self.processed_chars : (self.processed_chars+skip_text_len)]
            self.before_text_arr.append(skip_text)

            # 将新的括号及内容所占字符的个数加到开始索引上
            self.processed_chars = self.processed_chars + len(brackets[-1]) + skip_text_len


        current_text = "".join(self.before_text_arr) + full_text[self.processed_chars:]



        # 去除'”'后,如果为空字符串返回None
        if self.is_text_empty_after_removing_quotes(current_text):
            return None

        last_punct_pos = -1


        # 根据是否是第一句话选择不同的标点符号集合
        punctuations_to_use = (
            self.first_sentence_punctuations
            if self.is_first_sentence
            else self.punctuations
        )

        for punct in punctuations_to_use:
            pos = current_text.rfind(punct)
            if (pos != -1 and last_punct_pos == -1) or (
                pos != -1 and pos < last_punct_pos
            ):
                last_punct_pos = pos

        if last_punct_pos != -1:
            segment_text_raw = current_text[: last_punct_pos + 1]
            segment_text = textUtils.get_string_no_punctuation_or_emoji(
                segment_text_raw
            )
            # processed_chars的长度中已经包含了text_before_brackets的长度
            """
            self.processed_chars: 嘿,分析员,(双手叉腰,昂起头)   16
            segment_text_raw: 分析员,有我这样的优秀战友在,你居然还想着火锅?  24
            full_text: 嘿,分析员,(双手叉腰,昂起头)有我这样的优秀战友在,你居然还想着火锅?   36
            
            "分析员,"  的长度用了2次, 所以下一次索引不能从40开始,要从 16 + 24 - len(分析员,)  = 36 开始
            """

            # self.processed_chars = self.processed_chars + len(segment_text_raw) - len(self.text_before_brackets)  # 更新已处理字符位置 ----------------
            self.processed_chars = self.processed_chars + len(segment_text_raw) - sum(len(item) for item in self.before_text_arr)  # 更新已处理字符位置

            # 如果是第一句话,在找到第一个逗号后,将标志设置为False
            if self.is_first_sentence:
                self.is_first_sentence = False

            self.text_before_brackets = ""  # 重置
            self.before_text_arr = []  # 重置
            return segment_text
        elif self.tts_stop_request and current_text:
            segment_text = self.remove_parentheses(current_text)
            self.is_first_sentence = True  # 重置标志
            self.brackets_arr = []  # 重置
            self.text_before_brackets = ""  # 重置
            self.before_text_arr = []  # 重置
            return segment_text
        else:
            return None

    def _process_audio_file(self, tts_file):
        """处理音频文件并转换为指定格式

        Args:
            tts_file: 音频文件路径
            content_detail: 内容详情

        Returns:
            tuple: (sentence_type, audio_datas, content_detail)
        """
        audio_datas = []
        if tts_file.endswith(".p3"):
            audio_datas, _ = p3.decode_opus_from_file(tts_file)
        elif self.conn.audio_format == "pcm":
            audio_datas, _ = self.audio_to_pcm_data(tts_file)
        else:
            audio_datas, _ = self.audio_to_opus_data(tts_file)

        if (
            self.delete_audio_file
            and tts_file is not None
            and os.path.exists(tts_file)
            and tts_file.startswith(self.output_file)
        ):
            os.remove(tts_file)
        return audio_datas

    def _process_remaining_text(self):
        """处理剩余的文本并生成语音

        Returns:
            bool: 是否成功处理了文本
        """
        full_text = "".join(self.tts_text_buff)
        remaining_text = "".join(self.before_text_arr) + full_text[self.processed_chars :]
        # 去除单个单引号,若去除单引号后长度为0,返回false,否则将单引号传递给TTS合成会报错
        if self.is_text_empty_after_removing_quotes(remaining_text):
            return False
        if remaining_text:
            segment_text = textUtils.get_string_no_punctuation_or_emoji(remaining_text)
            if segment_text:
                tts_file = self.to_tts(segment_text)
                audio_datas = self._process_audio_file(tts_file)
                self.tts_audio_queue.put(
                    (SentenceType.MIDDLE, audio_datas, segment_text)
                )
                self.processed_chars += len(full_text)
                return True
        return False


    """
    判断文本中是否有单括号(中文括号、英文括号)
    """
    def has_unpaired_brackets(self, text):
        stack = []

        for char in text:
            if char == '(' or char == '(':
                stack.append(char)
            elif char == ')' or char == ')':
                if not stack:
                    # 没有对应的左括号
                    return True
                left = stack.pop()
                # 判断是否匹配(严格匹配中英文)
                if (char == ')' and left != '(') or (char == ')' and left != '('):
                    return True

        # 最后栈里还有未匹配的左括号
        return len(stack) > 0

    """
    判断文本中是否有成对的括号(中文括号、英文括号)
    """

    def has_paired_brackets(self, text):
        matched_brackets = []  # 存储所有找到的完整括号内容

        # 记录每个左括号的位置和类型
        bracket_positions = []

        for i, char in enumerate(text):
            if char == '(' or char == '(':
                bracket_positions.append((i, char))  # 保存位置和类型
            elif char == ')' and bracket_positions:
                start_idx, open_char = bracket_positions.pop()
                if open_char == '(':
                    end_idx = i + 1
                    matched_brackets.append(text[start_idx:end_idx])
            elif char == ')' and bracket_positions:
                start_idx, open_char = bracket_positions.pop()
                if open_char == '(':
                    end_idx = i + 1
                    matched_brackets.append(text[start_idx:end_idx])

        found = len(matched_brackets) > 0
        return found, matched_brackets

    def remove_parentheses(self, text):
        if text is None or len(text) == 0:
            return None  # 输入为空或 None 时直接返回 None

        # 步骤 1: 删除括号及括号中的内容(支持中英文)
        pattern_parentheses = r'([^)]*)|$[^)]*$'
        text = re.sub(pattern_parentheses, '', text)

        # 步骤 2: 删除所有单引号 “ 和 ”
        text = text.replace('“', '').replace('”', '')

        # 步骤 3: 去除首尾空白字符
        cleaned_text = text.strip()

        # 步骤 4: 如果最终文本为空,返回 None
        if not cleaned_text:
            return None

        return cleaned_text

    def is_text_empty_after_removing_quotes(self, text):
        if not text:  # 如果是 None 或空字符串
            return True

        # 删除所有类型的单引号(中文、英文、左右引号)
        text_cleaned = text.replace('“', '') \
            .replace('”', '') \
            .replace("'", '') \
            .replace('‘', '') \
            .replace('’', '') \
            .strip()

        # 判断清理后是否为空
        return len(text_cleaned) == 0

在发送到页面上显示的时候,将残留的一些符号再过滤一下(残留的符号在 TTS 合成时不影响,在页面上显示影响美观)

路径:\xiaozhi-esp32-server\main\xiaozhi-server\core\handle\sendAudioHandle.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def sendAudioMessage(conn, sentenceType, audios, text):
    text = await handle_text(text)
    # 发送句子开始消息
    if text is not None:
    ......
  

async def handle_text(text):
    if not text or len(text) == 0:
        return text

    # 1. 删除括号及其内容:包括中文括号()和英文括号()
    text = re.sub(r'([^)]*)|$[^)]*$', '', text)

    # 2. 使用栈检测成对引号,标记孤立引号为待删除
    stack = []
    chars = list(text)
    quote_pairs = {'"': '"', '“': '”', '‘': '’'}
    quote_positions = {}  # 记录每一对引号的位置

    for i, ch in enumerate(chars):
        if ch in quote_pairs:
            stack.append((i, ch))  # 左引号入栈
        elif ch in quote_pairs.values():
            if stack and quote_pairs.get(stack[-1][1]) == ch:
                left_idx, left_quote = stack.pop()
                quote_positions[left_idx] = i  # 记录成对引号范围
                quote_positions[i] = left_idx
            else:
                # 孤立右引号,标记删除
                chars[i] = '\x00DELETE\x00'

    # 标记未闭合的左引号为待删除
    for pos, _ in stack:
        chars[pos] = '\x00DELETE\x00'

    # 构建新字符串,暂时不处理引号
    temp_text = ''.join([c for c in chars if c != '\x00DELETE\x00'])

    # 3. 清理残留符号,但保留成对引号和非首尾省略号
    cleaned_parts = []
    i = 0
    while i < len(temp_text):
        matched = False
        # 检查是否在成对引号内
        in_quotes = any(start <= i <= end for start, end in quote_positions.items())

        # 如果当前位置是符号,并且不在引号中,则考虑删除
        if not in_quotes:
            # 匹配单独的标点符号(不包括成对引号中的)
            symbol_match = re.match(r'[‘’"()()\u2026.]', temp_text[i:])
            if symbol_match:
                char = symbol_match.group(0)
                # 判断是否是省略号的一部分
                ellipsis_match = re.match(r'(…|\.\.\.|\u2026)', temp_text[i:])
                if ellipsis_match:
                    ellipsis = ellipsis_match.group(0)
                    # 判断是否处于文本中间(不是开头或结尾)
                    start_pos = i
                    end_pos = i + len(ellipsis)
                    if 0 < start_pos or end_pos < len(temp_text):
                        # 保留省略号
                        cleaned_parts.append(ellipsis)
                        i += len(ellipsis)
                        matched = True
                    else:
                        # 首或尾的省略号,删除
                        i += len(ellipsis)
                        matched = True
                else:
                    # 其他符号,删除
                    i += 1
                    matched = True

        if not matched:
            cleaned_parts.append(temp_text[i])
            i += 1

    cleaned_text = ''.join(cleaned_parts)

    # 4. 去除首尾空白字符
    final_text = cleaned_text.strip()
    return final_text

0%