# 1. 下载安装 go-cqhttp
根据官方文档说明,前往 release (opens new window) 下载适合自己版本的 go-cqhttp,本文以go-cqhttp_windows_amd64.exe
为例进行说明
第一次下载好go-cqhttp.exe
后,双击运行,按照提示点击确定,等待生成go-cqhttp.bat
后,双击运行bat,此时选择0:HTTP通信
即可:
# 2. 配置
打开上一步生成的config.yml
配置文件,初次使用,建议修改下面两项:
uin
:作为机器人的QQ账号password
:QQ密码,可填可不填(密码为空时使用扫码登录)address
和url
:打开注释,若端口号没被占用,则无需修改
我的配置如下:
# go-cqhttp 默认配置文件
account: # 账号相关
uin: # QQ账号
password: '' # 密码为空时使用扫码登录
encrypt: false # 是否开启密码加密
status: 0 # 在线状态 请参考 https://docs.go-cqhttp.org/guide/config.html#在线状态
relogin: # 重连设置
delay: 3 # 首次重连延迟, 单位秒
interval: 3 # 重连间隔
max-times: 0 # 最大重连次数, 0为无限制
# 是否使用服务器下发的新地址进行重连
# 注意, 此设置可能导致在海外服务器上连接情况更差
use-sso-address: true
# 是否允许发送临时会话消息
allow-temp-session: false
heartbeat:
# 心跳频率, 单位秒
# -1 为关闭心跳
interval: -1
message:
# 上报数据类型
# 可选: string,array
post-format: string
# 是否忽略无效的CQ码, 如果为假将原样发送
ignore-invalid-cqcode: true
# 是否强制分片发送消息
# 分片发送将会带来更快的速度
# 但是兼容性会有些问题
force-fragment: false
# 是否将url分片发送
fix-url: false
# 下载图片等请求网络代理
proxy-rewrite: ''
# 是否上报自身消息
report-self-message: false
# 移除服务端的Reply附带的At
remove-reply-at: false
# 为Reply附加更多信息
extra-reply-data: false
# 跳过 Mime 扫描, 忽略错误数据
skip-mime-scan: false
output:
# 日志等级 trace,debug,info,warn,error
log-level: warn
# 日志时效 单位天. 超过这个时间之前的日志将会被自动删除. 设置为 0 表示永久保留.
log-aging: 15
# 是否在每次启动时强制创建全新的文件储存日志. 为 false 的情况下将会在上次启动时创建的日志文件续写
log-force-new: true
# 是否启用日志颜色
log-colorful: true
# 是否启用 DEBUG
debug: false # 开启调试模式
# 默认中间件锚点
default-middlewares: &default
# 访问密钥, 强烈推荐在公网的服务器设置
access-token: ''
# 事件过滤器文件目录
filter: ''
# API限速设置
# 该设置为全局生效
# 原 cqhttp 虽然启用了 rate_limit 后缀, 但是基本没插件适配
# 目前该限速设置为令牌桶算法, 请参考:
# https://baike.baidu.com/item/%E4%BB%A4%E7%89%8C%E6%A1%B6%E7%AE%97%E6%B3%95/6597000?fr=aladdin
rate-limit:
enabled: false # 是否启用限速
frequency: 1 # 令牌回复频率, 单位秒
bucket: 1 # 令牌桶大小
database: # 数据库相关设置
leveldb:
# 是否启用内置leveldb数据库
# 启用将会增加10-20MB的内存占用和一定的磁盘空间
# 关闭将无法使用 撤回 回复 get_msg 等上下文相关功能
enable: true
sqlite3:
# 是否启用内置sqlite3数据库
# 启用将会增加一定的内存占用和一定的磁盘空间
# 关闭将无法使用 撤回 回复 get_msg 等上下文相关功能
enable: false
cachettl: 3600000000000 # 1h
# 连接服务列表
servers:
# 添加方式,同一连接方式可添加多个,具体配置说明请查看文档
#- http: # http 通信
#- ws: # 正向 Websocket
#- ws-reverse: # 反向 Websocket
#- pprof: #性能分析服务器
- http: # HTTP 通信设置
address: 0.0.0.0:5700 # HTTP监听地址
timeout: 5 # 反向 HTTP 超时时间, 单位秒,<5 时将被忽略
long-polling: # 长轮询拓展
enabled: false # 是否开启
max-queue-size: 2000 # 消息队列大小,0 表示不限制队列大小,谨慎使用
middlewares:
<<: *default # 引用默认中间件
post: # 反向HTTP POST地址列表
#- url: '' # 地址
# secret: '' # 密钥
# max-retries: 3 # 最大重试,0 时禁用
# retries-interval: 1500 # 重试时间,单位毫秒,0 时立即
- url: http://127.0.0.1:5701/ # 地址
secret: '' # 密钥
# max-retries: 10 # 最大重试,0 时禁用
# retries-interval: 1000 # 重试时间,单位毫秒,0 时立即
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# 3. 登陆机器人
配置完成后,双击运行go-cqhttp.bat
,根据提示进行操作,若看到上报器和服务器已启动,则表示运行没问题,出现WARING警告
忽略即可
# 4. 消息格式
详细格式参考官方文档 (opens new window)
正常消息返回格式如下:
{
'post_type': 'message',
'message_type': 'private',
'time': 1678779435,
'self_id': 123456789,
'sub_type': 'friend',
'message': '消息',
'raw_message': '消息',
'font': 0,
'sender': {
'age': 0,
'nickname': 'xxx',
'sex': 'xxx',
'user_id': 987654321
},
'message_id': -1361367431,
'user_id': 987654321,
'target_id': 2077686705
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 5. 封装Python脚本
# 5.1 监听上报的消息
# receive.py
import socket
import json
# 创建一个socket对象,用于监听客户端的连接请求
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定本地主机和5701端口号(cqhttp会上报信息到此端口,服务器监听此端口即可)
server.bind(('127.0.0.1', 5701))
# 开始监听,最多允许10个连接请求排队等待
server.listen(10)
HttpResponseHeader = '''HTTP/1.1 200 OK
Content-Type: text/html\r\n\r\n
'''
def data_to_json(msg):
for i in range(len(msg)):
if msg[i] == "{" and msg[-1] == "\n":
return json.loads(msg[i:])
return None
def rev_msg(): # json or None
client, address = server.accept()
data = client.recv(1024).decode(encoding='utf-8')
rev_json = data_to_json(data)
client.sendall((HttpResponseHeader).encode(encoding='utf-8'))
client.close()
return rev_json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 5.2 发送消息
#send.py
import socket
from global_var import global_var
import urllib.parse
def _send_msg(resp_dict):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ip = '127.0.0.1'
# 发送消息到此端口
client.connect((ip, 5700))
msg_type = resp_dict['msg_type'] # 回复类型(群聊/私聊)
number = resp_dict['number'] # 回复账号(群号/好友号)
msg = resp_dict['msg'] # 要回复的消息
# 将字符中的特殊字符进行url编码
# msg = msg.replace(" ", "%20")
# msg = msg.replace("\n", "%0a")
# msg = msg.encode("utf-8")
# 字符串进行url编码
print('发送消息', msg)
msg = urllib.parse.quote(msg)
if msg_type == 'group':
payload = "GET /send_group_msg?group_id=" + str(
number) + "&message=" + msg + " HTTP/1.1\r\nHost:" + ip + ":5700\r\nConnection: close\r\n\r\n"
elif msg_type == 'private':
payload = "GET /send_private_msg?user_id=" + str(
number) + "&message=" + msg + " HTTP/1.1\r\nHost:" + ip + ":5700\r\nConnection: close\r\n\r\n"
# print("发送" + payload)
client.send(payload.encode("utf-8"))
client.close()
return 0
def send_private_msg(qid, msg):
resp_dict = {'msg_type': 'private', 'number': qid, 'msg': msg}
_send_msg(resp_dict)
def send_group_msg(msg, gid=global_var.get_value('G_ID')):
resp_dict = {'msg_type': 'group', 'number': gid, 'msg': msg}
_send_msg(resp_dict)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 5.3 脚本主逻辑
主要逻辑是不断接收(receive)上报的消息,并进行处理,然后发送(send)给客户端,针对多个用户的消息消息,此处做了多线程优化,防止阻塞,部分关键代码如下
# main.py
import time
import traceback
from receive import rev_msg
from handle import HandleMsg
import threading
# 创建一个BoundedSemaphore对象,设置最大信号量为10(最多启动10个线程)
global maxLimiter
maxLimiter = 10
threadLimiter = threading.BoundedSemaphore(maxLimiter)
import os
# 禁用代理
os.environ['no_proxy'] = '*'
def start_msg_handler():
handleMsg = HandleMsg()
handleMsg.reg_all_messages()
repeat_receive_msg(handleMsg)
def repeat_receive_msg(handleMsg: HandleMsg):
while True:
try:
rev = rev_msg()
print(rev)
if rev == None:
continue
if rev["post_type"] == "message":
if rev["message_type"] == "private": # 私聊
# 另起线程去处理消息
thread = MessageHandlerThread(handleMsg.handle_private_msg, rev)
thread.start()
elif rev["message_type"] == "group": # 群聊
thread = MessageHandlerThread(handleMsg.handle_group_msg, rev)
thread.start()
else:
print('其他消息')
continue
else:
continue
except Exception as e:
traceback.print_exc()
class MessageHandlerThread(threading.Thread):
def __init__(self, handle_func, rev_msg):
threading.Thread.__init__(self)
self.handle_func = handle_func
self.rev_msg = rev_msg
# 重写run方法,定义线程要执行的任务
def run(self):
# 获取一个信号量,如果已满则等待
threadLimiter.acquire()
try:
self.handle_func(self.rev_msg)
finally:
# 释放一个信号量,让其他线程继续执行
threadLimiter.release()
if __name__ == '__main__':
start_msg_handler()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 6. 消息处理
针对不同的消息,封装各种有意思的响应
To Be Continue...