前言:前些日子微信小程序『头脑王者』很是火热,为了紧跟时代潮流,不被游戏另一端的对手嘲笑,因此赶了一个脚本,让我第一次尝到了满分的滋味

分析

不同于『跳一跳』,如果利用图像处理和OCR文字识别,结果可想而知:不仅处理起来麻烦、答题速度也慢,再加上搜索结果的准确率本来就不是很高,不过最要紧的还是反应速度,几秒钟才能给出答案,那我还不如去背题库呢。合理的解决办法是通过抓包获取题目,如果网络通畅搜索引擎能很快给出结果的话,程序甚至可以在题目显示在手机屏幕前就给出问题的答案,要知道,答题速度越快,才越有可能拿到满分

具体的,抓包工具使用fiddler,答案搜索就用urllib(构造百度query请求),对返回的页面进行词频统计,找出出现频率最高的选项,就是问题的答案了,由于词频统计方法的准确率不高,可以通过数据库对解答错误的问题进行积累,这样下次遇到同样的问题,由数据库给出正确的答案,就能进一步降低错误率

实现

Fiddler配置

  1. PC端配置

    • 配置fiddler(将PC端作为抓包代理服务器)允许监听https:打开菜单 Tools-Options-HTTPS,勾选 CaptureHTTPS CONNECTs
    • 允许远程设备连接:Tools-Options-Connections,勾选 Allow remote computer to connect,记住 Fiddler listens on port 的值,通常为8888
  2. 手机端配置

    • 安装根证书:打开手机浏览器,输入http://ip:port,ip即为代理服务器地址(PC地址),port为上述端口值8888,页面跳转之后下载证书并安装
    • 设置无线网络代理:手机设置-无线网-高级,选择手动代理,主机名、端口同上

现在PC的fiddler可以捕捉手机上的https数据包了

微信抓包


到了这一步问题已经很简单了,通过修改fiddler的JScript脚本,我们将从题库服务器发送过来的数据保存下来,然后编写py脚本对其进行处理

数据库

使用SQL Server,第一个字段为主键,值为题目的md5散列值,第二个字段为问题的答案,第三个字段为题目

Python脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import json
import time
import urllib.request
import re
import pymssql
from hashlib import md5
import threading
import inspect
import ctypes
temp='' # 临时变量,监测responseBody.txt文件中的题目是否刷新
lock=False # 意为“锁”,当获取到新的问题时,置lock为True,此时再获取到服务器发来的答案时,意味着可以取得正确的答案,在获取答案的代码块中,写入数据库,并且紧接着将lock置为False。实际上这是一个同步的操作。后来仔细看了下数据包,发现其自带同步,因为无论是题目还是答案数据包都包含了[选项],因此该字段足以将题目和答案一一对应
opts=[] # 一个全局容器,存放问题选项
timu='' # 当前题目
selectNum=1 # 选择的选项(搜索引擎)
k=['A','B','C','D']
url = 'https://www.baidu.com/s?wd=%' # 百度搜索
headers = ("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14393")
opener = None
reg_o2=re.compile('.')
reg_o3=re.compile('[。“”,]')
#使用代理(练手)
def useProxy(proxy_addr):
global opener
proxy = urllib.request.ProxyHandler({'http': proxy_addr})
opener = urllib.request.build_opener(proxy, urllib.request.HTTPHandler)
opener.addheaders = [headers]
urllib.request.install_opener(opener)
useProxy('61.135.217.7')
# 计算md5值
def getMD5(strs):
myMD=md5()
myMD.update(strs.encode('utf-8'))
return myMD.hexdigest()
# 插入之前先检查数据库中是否存在
def check(cursor,cmd,mod=0):
cursor.execute(cmd)
if cursor.rowcount:
#print('数据库中已有记录!')
if mod==1:
for row in cursor:
return row[1] # 返回正确答案
return True
else:
return False
# 添加一条记录
def addRecord(conn,cursor,cmd,strs_md5):
try:
checkSql="select * from bank1 where md5='"+strs_md5+"'"
if(not check(cursor,checkSql)):
cursor.execute(cmd)
conn.commit()
except Exception as e:
print(str(e))
def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
# kill某子线程(此处代码来自Internet)
def stop_thread(thread):
_async_raise(thread.ident, SystemExit)
def main():
global temp,lock,opts,timu
while True: # 轮询
with open('E:\\TEST\\weibo\\responseBody.txt',encoding='utf-8') as f:
try:
j=json.loads(f.read())
if 'quiz' in j['data']: # 如果是题目数据包
quiz=j['data']['quiz']
if quiz==temp: # 问题没有刷新
pass
else: # 已刷新,这是一道新的问题,在这里编写答案搜索代码
temp=quiz
lock=True
print(str(j['data']['num'])+'. '+'['+j['data']['school']+'/'+j['data']['type']+'] '+quiz) # 打印题目
timu=quiz
opts=j['data']['options']
print('A:%s B:%s C:%s D:%s'%(opts[0],opts[1],opts[2],opts[3])) # 打印选项
# 开启两个子线程,一边在数据库中查找(线程一)一边在搜索引擎中查找(线程二),当在数据库中先行找到之后,立即终止线程二;若线程二先结束,则主线程仍等待线程一结束。最终可能出现两个结果,以数据库为准
# 搜索引擎
def SearchEngines():
global selectNum
data = urllib.request.urlopen(url+urllib.parse.quote(quiz),timeout=1.5).read().decode('utf-8')
#time.sleep(4) # 测试
# 对问题的百度搜索返回的页面做【词频统计】
regs=[] # 存放各选项的正则匹配表达式
# 利用re.sub()函数对四个选项各自出现的次数计数,以期将时间开销降低至四分之一
for i in opts:
regs.append(reg_o2.sub(lambda match:match.group(0)+'[^'+match.group(0)+']*?',i)[:-6])
reg_4=re.compile('('+regs[0]+'|'+regs[1]+'|'+regs[2]+'|'+regs[3]+')') # 用该正则匹配遍历网页数据,只一次
maxs=[0,0,0,0] # 四个整型数据对应‘A,B,C,D’四个选项,记录的是四个选项的词频
def replacement(match):
string=match.group(1)
for index,i in enumerate(regs):
if re.findall(i,string):
maxs[index]+=1
break
return ''
reg_4.sub(replacement,data)
maxindex=0 # “最大值的下标”初始值
for index,i in enumerate(maxs):
if i > maxs[maxindex]:
maxindex=index
selectNum=maxindex+1
print('\n搜索结果(Internet): '+opts[maxindex]+' 选择【'+k[maxindex]+'】')
# 数据库搜索
def SearchDB():
t2=threading.Thread(target=SearchEngines)
#t2.setDaemon(True)
t2.start() # 开启搜索引擎搜索子线程
conn = pymssql.connect(host='.',user='sa',password='s17a0114da',database='QuestionBank')
cursor=conn.cursor()
retrieveSql="select * from bank1 where md5='"+getMD5(quiz)+"'"
right=check(cursor,retrieveSql,1)
if not right:
print('数据库无结果')
else:
stop_thread(t2) # 如果在数据库(线程)中查找到了答案,则立即杀死搜索引擎子线程(杀掉子线程通常利用setDaemon的特性,但是由于主线程为无限循环(没有结束),所以setDaemon不起作用,于是采用ctypes库kill掉子线程)
xiabiao=(xb for xb,it in enumerate(opts) if it==right)
print('\n搜索结果(DataBase): '+right+' 选择【'+k[next(xiabiao)]+'】')
cursor.close()
conn.close()
#t2.join(2)
#SearchEngines()
#SearchDB()
t1=threading.Thread(target=SearchDB)
t1.start()
t1.join() # 主线程在此等待数据库搜索子线程结束
else: # 否则就是答案数据包
if lock:
lock=False
# 将问题和正确答案写入数据库中,何种情况下写入:搜索引擎搜索正确的就没必要写入了,只记录答错的(‘yes’字段为false)或者搜索结果和正确结果不一致的题目(共两种情况)
if(not j['data']['yes'] or j['data']['answer']!=selectNum):
conn = pymssql.connect(host='.',user='sa',password='s17a0114da',database='QuestionBank')
cursor=conn.cursor()
timu_md5=getMD5(timu)
addSql="insert into bank1 values('"+timu_md5+"','"+opts[j['data']['answer']-1]+"','"+timu+"')"
addRecord(conn,cursor,addSql,timu_md5)
cursor.close()
conn.close()
if j['data']['yes']:
print('回答【正确】,又加分啦!')
else:
print('回答【错误】,呜呜呜!')
if opts:
print('正确答案: '+opts[j['data']['answer']-1]+'\n') # 打印答案
except Exception as e:
print(str(e))
time.sleep(0.5)
if __name__=='__main__':
print('头脑王者,就是我!请开启对战!')
main()

运行截图