Files
tv/getalist.py

174 lines
5.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf- -*-
import argparse
import os
import time
import re
import requests
import traceback
import json
import sys
count=0
failcount=0
fullscan=True
def walk(headers:dict, api_url:str, current_path="/", output_file=None, replaceroot=None, lastpath=None):
params = {"path": current_path}
#print(f"params:{params}")
try:
resp = requests.post(
api_url+"/api/fs/list",
headers=headers,
json=params,
stream=True,
timeout=(5,15),
)
except KeyboardInterrupt:
print("get ctrl+c, exit")
sys.exit(1)
except:
traceback.print_exc()
print(f"request fail to server:{api_url}")
time.sleep(1)
#sys.exit(1)
return
resp.raise_for_status()
body = resp.content.decode('utf-8')
resp.close()
# 解析 JSON 数据
data_dict = json.loads(body)
# 获取 raw_url
global failcount
try:
items = data_dict['data']['content']
failcount=0
except:
traceback.print_exc()
time.sleep(1)
#sys.exit(1)
failcount += 1
if failcount>10:
sys.exit(1)
return
print(f"contentlist len:{len(items)}")
# items=client.list(current_path)
filetype_re=re.compile(r'\.(png|jpg|jpeg|bmp|gif|doc|nfo|flac|mp3|wma|ape|cue|wav|dst|dff|dts|ac3|eac3|txt)$')
# print(items)
for item in items:
full_path = f"{current_path}/{item['name']}".replace("//", "/")
try:
# is_dir = client.is_dir(full_path)
# is_dir = full_path.endswith("/")
is_dir = item["is_dir"]
except:
continue
global fullscan
# print(f"fullscan:{fullscan}, full_path:{full_path}, lastpath:{lastpath}")
if is_dir: # 判断是否为目录
global count
count=count+1
if fullscan:
if count%2==0:
time.sleep(1)
#else:
# if count%10==0:
# time.sleep(1)
if not fullscan and not full_path in lastpath:
print(f"not fullscan and full_path:{full_path} not in lastpath:{lastpath}, skip")
continue
if full_path == lastpath:
print(f"found full_path:{full_path} equal lastpath:{lastpath}, enter fullscan mode")
time.sleep(3)
fullscan = True
try:
walk(headers, api_url, full_path, output_file, replaceroot, lastpath)
except KeyboardInterrupt:
print("get ctrl+c, exit")
sys.exit(1)
except:
traceback.print_exc()
#sys.exit(1)
pass
elif not fullscan:
continue
else:
if filetype_re.search(full_path) != None or "BDMV" in full_path:
continue
# size = client.info(full_path).get('size', 0)
size = item["size"]
if replaceroot!=None:
if replaceroot=="":
full_path=os.path.join("/",*full_path.split("/")[2:])
else:
full_path=os.path.join("/",replaceroot,*full_path.split("/")[2:])
print(f"{full_path}\t{size}")
if output_file != None:
output_file.write(f"{full_path}\t{size}\n")
output_file.flush()
from urllib.parse import urlparse
def extract_url_components(url):
"""分解 URL 为 schema协议、hostname主机地址、path路径"""
parsed = urlparse(url)
# 提取核心组件
schema = parsed.scheme or "http" # 默认协议处理
hostname = f"{parsed.hostname}:{parsed.port}" # 自动过滤端口和认证信息
path = parsed.path.rstrip('/') or '/' # 路径标准化
return schema, hostname, path
def main():
parser = argparse.ArgumentParser(description='快速遍历AList目录')
parser.add_argument('--url', type=str, required=True, help='AList URL')
parser.add_argument('--token', type=str, default=None, required=False, help='AList token')
parser.add_argument('--output', type=str, required=True, help='输出文件')
parser.add_argument('--lastpath', type=str, default=None, required=False, help='最后扫到的路径(从此开始继续)')
parser.add_argument('--replaceroot', type=str, default=None, required=False, help='替换根目录名称')
args = parser.parse_args()
schema, hostname, path = extract_url_components(args.url)
print(schema,hostname,path)
print("AList URL:", args.url)
if args.lastpath == None:
try:
with open(args.output, "r", encoding="utf-8") as f:
tmplines = f.readlines()
for i in range(len(tmplines)-1, -1, -1):
line=tmplines[i].strip()
if line != None and line != "":
args.lastpath = os.path.dirname(line.split("\t")[0])
break
except:
traceback.print_exc()
output_file = open(args.output, mode="a", encoding="utf-8")
if args.lastpath != None and args.lastpath != "":
global fullscan
fullscan = False
if args.lastpath != None:
args.lastpath=args.lastpath.rstrip('/')
print(f"lastpath:{args.lastpath}")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Content-Type": "application/json",
}
if args.token != None and args.token != "":
headers["Authorization"] = args.token
walk(headers, schema+"://"+hostname, path, output_file,args.replaceroot, args.lastpath)
output_file.close()
if __name__ == '__main__':
main()