问题描述
标题说明了一切 - 我正在寻找一种(或多种)方法来加载、启用、添加(同样:禁用/删除)扩展之后已创建网络驱动程序。
上下文:Python、硒(每个标签)。某些部分的 pyautogui 是不可避免的,但需要健壮。我有一个工作解决方案草案,但它粗糙且有点依赖 GPU(点:可以/不会接受“这不能用 Chrome 完成”,因为我知道它可以完成,但希望看到一些解决此问题的解决方案/代码).
我非常了解在驱动程序被实例化/创建之前加载扩展的各种方法。即
其中 o = 选项,profileX 预装了扩展,等等。所以很清楚 - 我对这种效果的提案完全不感兴趣。
关于标题/上述问题的任何想法/建议?
理想情况下(但不是绝对关键)是不完全基于 GUI(依赖于机器人/pyautogui)的程序化解决方案。
您可以假设该扩展程序可在 Chrome 商店中用于本练习。
用例:我正在测试的扩展程序涉及注册点并保护公平/预期的使用条款,其设计为在识别出自动操作时停止运行(这对于“有头”的 chrome 来说是完全可能的,例如您只需分配它是 chrome 设置中的一个热键,并使用 pyautogui 激活,或定位它的 html 页面并直接交互等)。鉴于这一点,以及我拥有的非常多的测试,我不得不重新加载测试,或设置配置文件,或经常停止和启动测试,以便我可以包含“新”副本,这似乎是一种资源浪费crx 文件或部署我上面提到的其他方法之一。迄今为止/据我所知,没有解决方案。如果有人能指出我在这方面的“重复”问题,我很高兴感到惊喜。
解决方法
注意事项
- 用三种可能的方法解决问题(因此解决方案看起来很长,但个别方法并不过分/特别冗长;而且,代码可以很容易地根据需要进行调整。)
- 此外,这是在单浏览器环境/设置的上下文中开始的;
- 随之而来的变量和方法定义将前提/思维扩展到包含多线程/多浏览器环境中的实现
初步
import glob2,itertools,os,pyautogui,shutil,time,pyperclip,subprocess,keyboard as kb
from datetime import datetime
from zipfile import ZipFile
now = datetime.now()
dt_string: str = now.strftime("%d-%b-%Y,%H:%M:%S")
print(dt_string)
from selenium import webdriver
from selenium.webdriver import DesiredCapabilities
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from concurrent.futures import ThreadPoolExecutor
from os import path
#references: date-time format: https://www.programiz.com/python-programming/datetime/current-datetime
方法
方法一:通过Chrome-store启动chrome后添加
def ext_update_method1(ps,section=''):
global w,p2s
text_temp,p2s = [''] * len(ps),[]
def button_click(p,parms = ['',True]):
#parms[0] = ID,parms[1] = click?
ID,click = parms[0],parms[1]
global text_temp
text_temp[p] = ''
ID = ['omghfjlpggmjjaagoclmmobgdodcjboh'] if ID == '' else ID #default i= Browsec if no ID entered
d[p].get(f'https://chrome.google.com/webstore/detail/{ID[0]}')
w[p] = WebDriverWait(d[p],10)
w[p].until(lambda x: x.execute_script("return document.readyState") == "complete")
button = w[p].until(lambda x: x.find_element_by_class_name('g-c-R'))
text_temp[p] = button.text
if click == True: button.click()
def add_remove():
global p2s
for i,y in enumerate(ys): activ(i,ys),(ss,0.5),keys([(pr,'tab'),(pr,'space')])
after2,ys2 = gwwt(''),[]
for x in after2:
if (x not in before) and (x not in ys_remove): ys2 = ys2 + [x,]
# ys2 = ys2 + [x,] if (all(x not in before) and (x not in ys_remove)) else ys2
for i,y in enumerate(ys2): activ(i,'space')])
ss(1)
after2,[]
for x in after2:
if (x not in before) and (x not in ys_remove) and (x not in a[0] for a in wd): ys2 = ys2 + [x,]
for i,y in enumerate(ys2):
if all(y != x[0] for x in wd): y.close()
if len(ys_remove) > 0:
for i,y in enumerate(ys_remove):
activ(i,ys_remove),pr('space'),ss(0.5)
p2s = p2s + [i,]
if section == 1:
return 'complete 2nd iteration' #prevent endless loop (see next comment)
else:
print('removed add-in,now adding back again....')
cc(2)
thread_all(ps,close_extra)
ext_update_method1(p2s,1) #loop back on self to add
else:
return 'complete 1st iteration'
def close_extra(p):
global w
w_old = w[p]
w[p] = WebDriverWait(d[p],10)
tabs_close_extra(p)
tabs_close_extra(p)
activ(p),pr('a'),activ(p),pr('enter')
w[p].until(lambda x: x.execute_script("return document.readyState") == "complete")
w[p] = w_old
thread_all(ps,tabs_close_extra)
before = gwwt('')
thread_all(ps,button_click)
cc(2) #cc(5)
after = gwwt('')
ys,ys_remove,p2s = [],[],[]
ys_remove = gwwt('remove')
print(f'ys_remove = {ys_remove}')
for x in after:
if (x not in ys_remove) and (x not in before): ys = ys + [x,]
print(add_remove())
cc(2)
thread_all(ps,close_extra)
thread_all(ps,button_click,['',False])
qs = []
for x in text_temp: qs = qs + [1,] if 'add' in x.lower() else qs
print(f'qs: {qs}')
if section == 2: return #prevent never-ending loop (see next comment)
if len(qs) > 0: ext_update_method1(qs,2) #loop back on self in case 'add extension' still present in web-store
注意事项: 可以在同一个浏览器上下载同一个扩展的多个版本
- 注意:n_i_c 中的第三个参数默认设置为 1:这将 清除您的下载文件夹。如果不想发生这种情况,请设置为 0
- 参数:
- args:
n_i_c[0]
= 您希望下载给定的次数 扩展到同一浏览器 -
n_i_c[1]
= 全局列表项:ext_details,参见 vars()n_i_c2 = 1 清除文件夹 -
n_i_c[3]
= 0 => 只下载解压的文件夹(不加载) - 需要下面的方法('辅助代码',Vars) - 代码可以修改 排除这些。
- args:
方法 2: Chrome 启动后
下载并加载 .crx 文件def ext_update_method2(ps,n_i_c=[2,1,1],section = ''): # same order as it would
appear for single set of extensions (iro one tab)...
# parms = [2,0] =>
# 0. repetitions - e.g. 2 downloads : uBlock
# 1. 0 [index value - e.g. uBlock = 4,below] downloaded and unzipped/installed on profile p
# 2: clear folder
# 3: is_load = 0 => only download unpacked folder (not loading it too)
global start,w,files_old,ev,d,parent,txt
global tabs_before
global files,files_new,index_latest_files,index_redo,index_hks
hks2,index_ext,clear_folder,is_load = n_i_c[0],n_i_c[1],n_i_c[2],n_i_c[3] # ID,alias = cred[0],cred[1],cred[2]
hks2 = hks if hks == '' else hks2
ID,alias = ext_details[index_ext][1],ext_details[index_ext][0]
url_robwu = '''https://robwu.nl/crxviewer/?crx=https://chrome.google.com/webstore/detail/''' + str(ID)
if clear_folder == 1:
folder_clear(del_dir=os.path.join(os.path.expanduser("~"),'Downloads','*'))
def thread_all_0(p): #set up stuff
global d,index_hks2
if 'error' in str(tabs_close_extra(p)):
tab_close(p)
d[p].implicitly_wait(5)
w[p] = WebDriverWait(d[p],5)
index_hks2[p] = ''
d[p].get('chrome://extensions')
code_0 = '''var tags = document.querySelector('body').querySelector('extensions-manager')['shadowRoot'].querySelector('cr-view-manager').querySelector('extensions-item-list')["shadowRoot"].querySelector('#container').querySelectorAll('extensions-item'); return tags.length;'''
index_hks[p]=ev[p](code_0)
print(index_hks[p])
#
thread_all(ps,thread_all_0)
#
def thread_all_1(p): #open website to download crx (multi-thread to expedite)
d[p].get(url_robwu)
#
if len(ps) >= 2: #prevent overloading website - only do 3 at a time if more than 2 browsers being used
groups = list(data_mygrouper(2,ps))
for x in groups: thread_all(x,thread_all_1,workers = 3,chunk = 3)
else:
thread_all(ps,thread_all_1)
files_old = glob2.glob(os.path.join(os.path.expanduser("~"),'*.zip'))
def thread_all_2(p):
try:
w[p].until(lambda x: x.find_element_by_id("download-link")).click()
print('lambda wait passed !!')
except:
print('lambda wait failed :(')
return
start_temp = time.time()
while (time.time() - start_temp <= 90) and (len(glob2.glob(os.path.join(os.path.expanduser("~"),'*.zip'))) - len(files_old) < hks2 * (len(ps))) or (kb.is_pressed('escape') == True):
if kb.is_pressed('escape') == True: break
thread_all(ps,thread_all_2),ss(1)
files = glob2.glob(os.path.join(os.path.expanduser("~"),'*.zip'))
files_new = []
for keep in files:
if str(keep) not in files_old: files_new = files_new + [str(keep),]
# thread_all(files_new,z_extension_unzip,alias)
path_dir_core = os.path.join(os.path.expanduser("~"),'Downloads')
index_latest_files = [''] * len(files_new)
#
def thread_all_3(j): # unzip
global index_latest_files
index_latest_files[j] = os.path.join(path_dir_core,alias + str(j))
with ZipFile(files_new[j],'r') as zipObj:
zipObj.extractall(index_latest_files[j])
os.remove(files_new[j])
#
thread_all(list(range(len(files_new))),thread_all_3)
if is_load != 1: return
code1 = '''if(document.querySelector('body').querySelector('extensions-manager')["shadowRoot"].querySelector('extensions-toolbar').attributes.length !=2){document.querySelector("body").querySelector("extensions-manager")["shadowRoot"].querySelector("extensions-toolbar")["shadowRoot"].querySelector("cr-toolbar").querySelector(".more-actions").querySelector("#devMode")["shadowRoot"].querySelector("#knob").click() } else {'dev mode already activated!'}'''
code2 = '''document.querySelector('body').querySelector('extensions-manager')["shadowRoot"].querySelector('extensions-toolbar')["shadowRoot"].querySelector("#loadUnpacked").click()'''
#
def thread_all_4(p):
d[p].get('chrome://extensions/'),ss(1)
ev[p](code1)
for y in range(hks2):
thread_all(ps,thread_all_4)
windows_before = gwwt('Select the extension directory')
for p in ps:
try:
if path.exists(os.path.join(path_dir_core,index_latest_files[p + len(ps) * (y)])):
try:
ev[p](code2)
except:
continue
start_temp = time.time()
while (time.time() - start_temp <= 30) and (
len(gwwt('Select the extension directory')) - len(windows_before)) < 1 and (
'select the extension directory' not in gawt().lower()): pass # len(ps) - len(errors_chrome()): pass
tabs_before = len(d[p].window_handles)
windows_A = gwwt('')
try:
keys([(tw,index_latest_files[p + len(ps) * (y)]),'enter'),'space')],0.75)
if len(windows_A) < len(gwwt('')):
d[p].refresh()
print('would break1')
else:
window_target = gwwt('Select the extension directory')[0]
try:
window_target.activate()
except:
window_target.minimize()
window_target.restore()
finally:
pyperclip.copy(index_latest_files[p + len(ps) * (y)]),ss(0.5)
keys([(hk,'ctrl','l'),(kd,'shift'),'tab',4),(ku,(hk,'v'),'space')])
except:
d[p].refresh()
pr('escape',2)
print('would break2')
tabs_close_extra(p)
else: continue
except: continue
def thread_all_5(p):
global qs
d[p].get('chrome://extensions/shortcuts')
code_0 = '''return document.querySelector('body').firstElementChild["shadowRoot"].querySelector("cr-view-manager").querySelector('extensions-keyboard-shortcuts')['shadowRoot'].querySelector('#container').querySelectorAll('.shortcut-card').length'''
index_hks[p] = int(ev[p](code_0)) - index_hks[p]
qs = qs + [p,] if index_hks[p] < hks2 else qs
print(f'index_hks[{p}] = {index_hks[p]}')
index4[p] = 0
qs = []
if clear_folder == 1:
thread_all(ps,thread_all_5)
if section == 1: return
if len(qs) > 0: ext_update_method2(qs,n_i_c = [hks2,section = 1)
qs = []
for p in range(len(ps)): qs = qs + [p,]
方法 3:以编程方式删除
def ext_remove(ps,ext):
for p in range(len(ps)):
try:
activ(p)
d[p].get('chrome://extensions/')
#pr('f12'),ss(1)
hk('ctrl','shift','j'),ss(1)
code = '''var tags = document.querySelector('body').querySelector('extensions-manager')['shadowRoot'].querySelector('cr-view-manager').querySelector('extensions-item-list')["shadowRoot"].querySelector('#container').querySelectorAll('extensions-item'); for (i = 0;i <= tags.length; i ++) {try {if(tags[i]['shadowRoot'].querySelector('#a11yAssociation').innerText.toLowerCase().indexOf("''' + str(ext) +'''") > 0) {tags[i]['shadowRoot'].querySelector('#remove-button').click();break} } catch {}}'''
pyperclip.copy(code),ss(0.5)
# pr('tab'),ss(0.5)
temp_rep = 1 if type(ext) == str else len(ext)
for _ in range(temp_rep):
hk('ctrl',ss(0.5)
pr('enter'),ss(0.5)
pr('space'),ss(0.5)
tabs_close_extra(p)
pr('f12'),ss(0.5) #hk('ctrl',ss(0.5)
except: pass
辅助代码
- 激活窗口
- 速记:模拟键盘敲击
- 方法:细分列表、关闭选项卡、线程池(多线程)和平铺窗口*
*(java 脚本包括 - 保存在与此脚本相同的位置
激活窗口
pyautogui.FAILSAFE = False
def activ(p,index_wd=''):
global index_activ
try:
if index_wd == '':
index_wd = wd[p][0]
else:
index_wd = index_wd[p]
except:
index_wd = wd[p]
print(f'active {w}')
index_activ[p] = False
try:
index_wd.activate()
index_activ[p] = True
return index_activ[p]
except:
try:
index_wd.minimize()
index_wd.restore()
index_activ[p] = True
return index_activ[p]
except:
index_activ[p] = False
index_activ[p] = False
return index_activ[p]
快捷键
def keys(actions,delay=0.5):
# global BL3
print(f'def keys()') # {actions}') #,profile {p}')
outcome = []
for action in actions:
print(f'action {action}')
if len(action) == 4:
outcome = outcome + [action[0](action[1],action[2],action[3]),]
elif len(action) == 3:
outcome = outcome + [action[0](action[1],action[2]),]
elif (len(action) == 2) and (action[1] != ''):
outcome = outcome + [action[0](action[1]),]
else:
outcome = outcome + [action[0](),]
ss(delay) # await ass(delay)
return outcome
细分列表
def data_mygrouper(n,iterable):
args = [iter(iterable)] * n
return ([e for e in t if e != None] for t in itertools.zip_longest(*args))
堆栈溢出引用(data_mygrouper)
线程池
def thread_all(ps,fn,parm='',actions=[],workers=6,chunk=1):
# https://stackoverflow.com/questions/42056738/how-to-pass-a-function-with-more-than-one-argument-to-python-concurrent-futures/42056975
print(f'thread_all({ps},{fn},{parm},{actions}')
if parm == '':
with ThreadPoolExecutor(max_workers=max(1,workers)) as executor:
return executor.map(fn,ps,timeout=90,chunksize=max(1,chunk))
else:
with ThreadPoolExecutor(max_workers=max(1,itertools.repeat(parm,L),chunk))
线程池提取器引用:here
关闭标签
def tab_close(p):
print(f'def tab_close {p}')
# d[p].switch_to.window(parent[p])
for h in d[p].window_handles:
d[p].switch_to.window(h)
parent[p] = h
[ev[p]] = [d[p].execute_script]
break
# parent[p] = d[p].current_window_handle
for h in d[p].window_handles:
if h != parent[p]:
d[p].switch_to.window(h)
if len(d[p].window_handles) > 1: d[p].close()
else: break
d[p].switch_to.window(parent[p])
def tabs_close_extra(p):
# alternative method (much slower): https://stackoverflow.com/questions/12729265/switch-tabs-using-selenium-webdriver-with-java
try:
for h in d[p].window_handles:
d[p].switch_to.window(h)
parent[p] = h
break
# parent[p] = d[p].current_window_handle
tabs_original = len(d[p].window_handles)
if tabs_original > 1:
for h in d[p].window_handles:
if h != parent[p]:
d[p].switch_to.window(h)
activ(p),hk('ctrl','w')
if len(d[p].window_handles) == tabs_original:
d[p].close()
# d[p].switch_to.window(parent[p])
d[p].switch_to.window(parent[p])
except:
print(f'error in tabs_close_extra {p}')
变量
def vars():
print('def vars()')
global index,index2,index3,index4,index5,index6
global index_activ,index_activ2,hks,pycharm_win
global index_latest_files,index_hks,index_hks2
global parent,w2
global path_chrome,path_exec,path_core,path_temp,profile,done,caps,pycharm_win
global ps,p2s,qs
global L,tile_type,compact,outcome
global kd,ku,hk,pr,tw,wr,cl
global gats,gaws,gwwt,gawt,gaw
global cc,ss,start,start2,start_overall
global wd,wd2,errors,errors2,errors3
global ext_0,ext_1,ext_2,ext_3,ext_details
caps = DesiredCapabilities.CHROME.copy()
[cc,kd,cl] = [pyautogui.countdown,time.sleep,pyautogui.keyDown,pyautogui.keyUp,pyautogui.hotkey,pyautogui.press,pyautogui.typewrite,pyautogui.write,pyautogui.click]
[gats,gwwt] = [pyautogui.getAllTitles,pyautogui.getAllWindows,pyautogui.getWindowsWithTitle]
[gawt,gaw] = [pyautogui.getActiveWindowTitle,pyautogui.getActiveWindow]
L = 4
qs,= list(range(L)),list(range(L)),list(range(L))
text_temp,start = [''] * L,[0] * L
index_hks,index_hks2 = [hks] * L,[0] * L
index_latest_files = ['']
parent,wd = [''] * L,[''] * L,[''] * (L + 1),[['']]*L
start_overall,outcome = '',''
try: pycharm_win = [gaw()]
except: pass
path_chrome = "C:\Program Files\Google\Chrome\Application\chrome.exe"
path_core = os.path.join(os.path.expanduser("~"),'PyCharmProjects','Parallel')
path_exec = os.path.join(path_core,"chromedriver.exe")
ext_details = [ ['ublock','cjpalhdlnbpafiamejdnhcphjbkeiagm'],['keyboard_shortcuts','dkoadhojigekhckndaehenfbhcgfeepl'],['browsec','omghfjlpggmjjaagoclmmobgdodcjboh'],['itrace','njkmjblmcfiobddjgebnoeldkjcplfjb']]
ext_0= os.path.join(path_core,"Browsec")
ext_1 = os.path.join(path_core,"Keyboard")
ext_2 = os.path.join(path_core,"itrace")
ext_3 = os.path.join(path_core,"ublock")
主要代码
if __name__ == '__main__':
ps =''
vars()
if len(ps) > 3:
ps_new = list(data_mygrouper(int(round(len(ps) / 2,ndigits=0)),ps))
for x in range(2): thread_all(ps_new[x],new_profile,workers=len(ps_new[x]) + 1,chunk=len(ps_new[x]) + 1),cc(5)
# new_profile
else: thread_all(ps,workers=L + 1,chunk=L+1)
#thread_all(ps,new_profile)
tile_windows('v')
ext_update_method1(ps)
ext_remove(ps,'browsec')
ext_update_method2(ps)