Files
tv/fs_share.py

1093 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# encoding: utf-8
from __future__ import annotations
__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = ["P115SharePath", "P115ShareFileSystem"]
import errno
from collections import deque
from collections.abc import (
AsyncIterator, Callable, Coroutine, Iterable, Iterator, Mapping,
MutableMapping, Sequence,
)
from copy import deepcopy
from datetime import datetime
from functools import cached_property, partial
from os import fspath, stat_result, PathLike
from posixpath import join as joinpath
from re import compile as re_compile
from stat import S_IFDIR, S_IFREG
from time import time
from typing import cast, overload, Any, Literal, Never, Self
from dictattr import AttrDict
from iterutils import run_gen_step, run_gen_step_iter, Yield, YieldFrom
from p115client import check_response, normalize_attr, P115URL
from posixpatht import escape, joins, splits, path_is_dir_form
from .client import P115Client
from .fs_base import IDOrPathType, P115PathBase, P115FileSystemBase
CRE_SHARE_LINK_search1 = re_compile(r"(?:/s/|share\.115\.com/)(?P<share_code>[a-z0-9]+)\?password=(?P<receive_code>[a-z0-9]{4})").search
CRE_SHARE_LINK_search2 = re_compile(r"(?P<share_code>[a-z0-9]+)-(?P<receive_code>[a-z0-9]{4})").search
class P115SharePath(P115PathBase):
fs: P115ShareFileSystem
@property
def ancestors(self, /) -> list[dict]:
try:
return self["ancestors"]
except KeyError:
ancestors = self.fs.get_ancestors(self.id)
self.attr["path"] = joins([a["name"] for a in ancestors])
return ancestors
@property
def path(self, /) -> str:
try:
return self["path"]
except KeyError:
self.ancestors
return self.attr["path"]
@overload
def search(
self,
/,
async_: Literal[False] = False,
**payload,
) -> Iterator[P115SharePath]:
...
@overload
def search(
self,
/,
async_: Literal[True],
**payload,
) -> AsyncIterator[P115SharePath]:
...
def search(
self,
/,
async_: Literal[False, True] = False,
**payload,
) -> Iterator[P115SharePath] | AsyncIterator[P115SharePath]:
return self.fs.search(self, async_=async_, **payload)
class P115ShareFileSystem(P115FileSystemBase[P115SharePath]):
share_code: str
receive_code: str
path_to_id: MutableMapping[str, int]
id_to_attr: MutableMapping[int, AttrDict]
pid_to_children: MutableMapping[int, tuple[AttrDict, ...]]
full_loaded: bool
path_class = P115SharePath
def __init__(
self,
/,
client: str | P115Client,
share_code: str,
receive_code: str = "",
request: None | Callable = None,
async_request: None | Callable = None,
):
"""115 分享链接的文件系统封装
支持以下几种格式的链接(括号内的字符表示可有可无):
- http(s)://115.com/s/{share_code}?password={receive_code}(#)
- http(s)://share.115.com/{share_code}?password={receive_code}(#)
- (/){share_code}-{receive_code}(/)
"""
super().__init__(client, request, async_request)
self.__dict__.update(
id=0,
path="/",
share_code=share_code,
receive_code=receive_code,
path_to_id={"/": 0},
id_to_attr={},
pid_to_children={},
full_loaded=False,
)
@classmethod
def from_url(
cls,
/,
client: str | P115Client,
url: str,
request: None | Callable = None,
async_request: None | Callable = None,
) -> Self:
m = CRE_SHARE_LINK_search1(url)
if m is None:
m = CRE_SHARE_LINK_search2(url)
if m is None:
raise ValueError("not a valid 115 share link")
return cls(
client,
share_code=m["share_code"],
receive_code=m["receive_code"] or "",
request=request,
async_request=async_request,
)
def __repr__(self, /) -> str:
cls = type(self)
module = cls.__module__
name = cls.__qualname__
if module != "__main__":
name = module + "." + name
return f"<{name}(client={self.client!r}, share_code={self.share_code!r}, receive_code={self.receive_code!r}, id={self.id!r}, path={self.path!r}) at {hex(id(self))}>"
def __setattr__(self, attr, val, /) -> Never:
raise TypeError("can't set attributes")
@overload
def downlist(
self,
/,
id: int,
*,
async_: Literal[False] = False,
) -> dict:
...
@overload
def downlist(
self,
/,
id: int,
*,
async_: Literal[True],
) -> Coroutine[Any, Any, dict]:
...
def downlist(
self,
/,
id: int,
*,
async_: Literal[False, True] = False,
) -> dict | Coroutine[Any, Any, dict]:
"""获取分享链接的某个文件夹中可下载的文件的列表(只含文件,不含文件夹,任意深度,简略信息)
"""
return check_response(self.client.share_downlist( # type: ignore
{
"share_code": self.share_code,
"receive_code": self.receive_code,
"cid": id,
},
request=self.async_request if async_ else self.request,
async_=async_,
))
@overload
def fs_files(
self,
/,
payload: dict,
async_: Literal[False] = False,
) -> dict:
...
@overload
def fs_files(
self,
/,
payload: dict,
async_: Literal[True],
) -> Coroutine[Any, Any, dict]:
...
def fs_files(
self,
/,
payload: dict,
async_: Literal[False, True] = False,
) -> dict | Coroutine[Any, Any, dict]:
"""获取分享链接的某个文件夹中的文件和子文件夹的列表(包含详细信息)
:param payload:
- id: int | str = 0
- limit: int = 32
- offset: int = 0
- asc: 0 | 1 = <default> # 是否升序排列
- o: str = <default>
# 用某字段排序:
# - "file_name": 文件名
# - "file_size": 文件大小
# - "user_ptime": 创建时间/修改时间
"""
return check_response(self.client.share_snap( # type: ignore
{
**payload,
"share_code": self.share_code,
"receive_code": self.receive_code,
},
base_url=True,
request=self.async_request if async_ else self.request,
async_=async_,
))
@overload
def fs_search(
self,
payload: str | dict,
/,
async_: Literal[False] = False,
) -> dict:
...
@overload
def fs_search(
self,
payload: str | dict,
/,
async_: Literal[True],
) -> Coroutine[Any, Any, dict]:
...
def fs_search(
self,
payload: str | dict,
/,
async_: Literal[False, True] = False,
) -> dict | Coroutine[Any, Any, dict]:
if isinstance(payload, str):
payload = {"share_code": self.share_code, "receive_code": self.receive_code, "cid": self.id, "search_value": payload}
else:
payload = {"share_code": self.share_code, "receive_code": self.receive_code, "cid": self.id, **payload}
return check_response(self.client.share_search( # type: ignore
payload,
request=self.async_request if async_ else self.request,
async_=async_,
))
@cached_property
def create_time(self, /) -> datetime:
"分享的创建时间"
return datetime.fromtimestamp(self.create_timestamp)
@cached_property
def create_timestamp(self, /) -> int:
"分享的创建时间"
return int(self.shareinfo["create_time"])
@cached_property
def snap_id(self, /) -> int:
"获取这个分享的 id"
return int(self.shareinfo["snap_id"])
@cached_property
def user_id(self, /) -> int:
"获取分享者的用户 id"
return int(self.sharedata["userinfo"]["user_id"])
@property
def sharedata(self, /) -> dict:
"获取分享的首页数据"
return self.fs_files({"limit": 1})["data"]
@property
def shareinfo(self, /) -> dict:
"获取分享信息"
return self.sharedata["shareinfo"]
@overload
def _search_item(
self,
id: int,
/,
async_: Literal[False] = False,
) -> AttrDict:
...
@overload
def _search_item(
self,
id: int,
/,
async_: Literal[True],
) -> Coroutine[Any, Any, AttrDict]:
...
def _search_item(
self,
id: int,
/,
async_: Literal[False, True] = False,
) -> AttrDict | Coroutine[Any, Any, AttrDict]:
dq = deque((self.attr(0),))
get, put = dq.popleft, dq.append
if async_:
async def request():
while dq:
async for attr in self.iterdir(get(), async_=True):
if attr["id"] == id:
return attr
if attr["is_directory"]:
put(attr)
self.__dict__["full_loaded"] = True
raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}")
return request()
else:
while dq:
for attr in self.iterdir(get()):
if attr["id"] == id:
return attr
if attr["is_directory"]:
put(attr)
self.__dict__["full_loaded"] = True
raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}")
@overload
def _attr(
self,
id: int,
/,
async_: Literal[False] = False,
) -> AttrDict:
...
@overload
def _attr(
self,
id: int,
/,
async_: Literal[True],
) -> Coroutine[Any, Any, AttrDict]:
...
def _attr(
self,
id: int,
/,
async_: Literal[False, True] = False,
) -> AttrDict | Coroutine[Any, Any, AttrDict]:
share_code = self.share_code
receive_code = self.receive_code
def gen_step():
try:
return self.id_to_attr[id]
except KeyError:
pass
if self.full_loaded:
raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}")
if id == 0:
attr = self.id_to_attr[0] = AttrDict({
"id": 0,
"parent_id": 0,
"name": "",
"path": "/",
"is_directory": True,
"size": None,
"time": self.create_timestamp,
"ico": "folder",
"ancestors": [{"id": 0, "name": ""}],
"share_code": share_code,
"receive_code": receive_code,
})
return attr
# NOTE: quick detection of id existence
yield partial(
self.client.share_download_url,
{
"share_code": share_code,
"receive_code": receive_code,
"file_id": id,
},
strict=False,
request=self.async_request if async_ else self.request,
async_=async_,
)
return (yield partial(self._search_item, id, async_=async_))
return run_gen_step(gen_step, async_=async_)
@overload
def _attr_path(
self,
path: str | PathLike[str] | Sequence[str],
/,
pid: None | int = None,
ensure_dir: bool = False,
*,
async_: Literal[False] = False,
) -> AttrDict:
...
@overload
def _attr_path(
self,
path: str | PathLike[str] | Sequence[str],
/,
pid: None | int = None,
ensure_dir: bool = False,
*,
async_: Literal[True],
) -> Coroutine[Any, Any, AttrDict]:
...
def _attr_path(
self,
path: str | PathLike[str] | Sequence[str],
/,
pid: None | int = None,
ensure_dir: bool = False,
*,
async_: Literal[False, True] = False,
) -> AttrDict | Coroutine[Any, Any, AttrDict]:
def gen_step():
nonlocal path, pid, ensure_dir
if isinstance(path, PathLike):
path = fspath(path)
if pid is None:
pid = self.id
if not path or path == ".":
return (yield partial(self._attr, pid, async_=async_))
parents = 0
if isinstance(path, str):
if not ensure_dir:
ensure_dir = path_is_dir_form(path)
patht, parents = splits(path)
if not (patht or parents):
return (yield partial(self._attr, pid, async_=async_))
else:
if not ensure_dir:
ensure_dir = path[-1] == ""
patht = [path[0], *(p for p in path[1:] if p)]
if patht == [""]:
return self._attr(0)
elif patht and patht[0] == "":
pid = 0
ancestor_patht: list[str] = []
if pid == 0:
if patht[0] != "":
patht.insert(0, "")
else:
ancestors = yield partial(self.get_ancestors, pid, async_=async_)
if parents:
if parents >= len(ancestors):
pid = 0
else:
pid = cast(int, ancestors[-parents-1]["id"])
ancestor_patht = ["", *(a["name"] for a in ancestors[1:-parents])]
else:
ancestor_patht = ["", *(a["name"] for a in ancestors[1:])]
if not patht:
return (yield partial(self._attr, pid, async_=async_))
if pid == 0:
dirname = ""
ancestors_paths: list[str] = [(dirname := f"{dirname}/{escape(name)}") for name in patht[1:]]
else:
dirname = joins(ancestor_patht)
ancestors_paths = [(dirname := f"{dirname}/{escape(name)}") for name in patht]
fullpath = ancestors_paths[-1]
path_to_id = self.path_to_id
if path_to_id:
if not ensure_dir and (id := path_to_id.get(fullpath)):
return (yield partial(self._attr, id, async_=async_))
if (id := path_to_id.get(fullpath + "/")):
return (yield partial(self._attr, id, async_=async_))
if self.full_loaded:
raise FileNotFoundError(
errno.ENOENT,
f"no such path {fullpath!r} (in {pid!r})",
)
parent: int | AttrDict
for i in reversed(range(len(ancestors_paths)-1)):
if path_to_id and (id := path_to_id.get((dirname := ancestors_paths[i]) + "/")):
parent = yield partial(self._attr, id, async_=async_)
i += 1
break
else:
i = 0
parent = pid
if pid == 0:
i += 1
attr: AttrDict
last_idx = len(patht) - 1
if async_:
for i, name in enumerate(patht[i:], i):
async def step():
nonlocal attr, parent
async for attr in self.iterdir(parent, async_=True):
if attr["name"] == name:
if ensure_dir or i < last_idx:
if attr["is_directory"]:
parent = attr
break
else:
break
else:
if isinstance(parent, AttrDict):
parent = parent["id"]
raise FileNotFoundError(
errno.ENOENT,
f"no such file {name!r} (in {parent} @ {joins(patht[:i])!r})",
)
yield step
else:
for i, name in enumerate(patht[i:], i):
for attr in self.iterdir(parent):
if attr["name"] == name:
if ensure_dir or i < last_idx:
if attr["is_directory"]:
parent = attr
break
else:
break
else:
if isinstance(parent, AttrDict):
parent = parent["id"]
raise FileNotFoundError(
errno.ENOENT,
f"no such file {name!r} (in {parent} @ {joins(patht[:i])!r})",
)
return attr
return run_gen_step(gen_step, async_=async_)
@overload
def attr(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
ensure_dir: bool = False,
*,
async_: Literal[False] = False,
) -> AttrDict:
...
@overload
def attr(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
ensure_dir: bool = False,
*,
async_: Literal[True],
) -> Coroutine[Any, Any, AttrDict]:
...
def attr(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
ensure_dir: bool = False,
*,
async_: Literal[False, True] = False,
) -> AttrDict | Coroutine[Any, Any, AttrDict]:
"获取属性"
def gen_step():
path_class = type(self).path_class
if isinstance(id_or_path, path_class):
attr = id_or_path.attr
elif isinstance(id_or_path, AttrDict):
attr = id_or_path
elif isinstance(id_or_path, int):
attr = yield partial(self._attr, id_or_path, async_=async_)
else:
return (yield partial(
self._attr_path,
id_or_path,
pid=pid,
ensure_dir=ensure_dir,
async_=async_,
))
if ensure_dir and not attr["is_directory"]:
raise NotADirectoryError(
errno.ENOTDIR,
f"{attr['id']} (id={attr['id']}) is not directory"
)
return attr
return run_gen_step(gen_step, async_=async_)
@overload
def dirlen(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[False] = False,
) -> int:
...
@overload
def dirlen(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[True],
) -> Coroutine[Any, Any, int]:
...
def dirlen(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[False, True] = False,
) -> int | Coroutine[Any, Any, int]:
"文件夹中的项目数(直属的文件和目录计数)"
def gen_step():
id = yield partial(self.get_id, id_or_path, pid=pid, async_=async_)
if (children := self.pid_to_children.get(id)) is not None:
return len(children)
resp = yield partial(
self.fs_files,
{"cid": id, "limit": 1},
async_=async_,
)
return resp["data"]["count"]
return run_gen_step(gen_step, async_=async_)
@overload
def get_ancestors(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[False] = False,
) -> list[dict]:
...
@overload
def get_ancestors(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[True],
) -> Coroutine[Any, Any, list[dict]]:
...
def get_ancestors(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[False, True] = False,
) -> list[dict] | Coroutine[Any, Any, list[dict]]:
"获取各个上级目录的少量信息(从根目录到当前目录)"
def gen_step():
attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_)
return deepcopy(attr["ancestors"])
return run_gen_step(gen_step, async_=async_)
@overload
def get_url(
self,
id_or_path: IDOrPathType,
/,
pid: None | int = None,
headers: None | Mapping = None,
*,
async_: Literal[False] = False,
) -> P115URL:
...
@overload
def get_url(
self,
id_or_path: IDOrPathType,
/,
pid: None | int = None,
headers: None | Mapping = None,
*,
async_: Literal[True],
) -> Coroutine[Any, Any, P115URL]:
...
def get_url(
self,
id_or_path: IDOrPathType,
/,
pid: None | int = None,
headers: None | Mapping = None,
*,
async_: Literal[False, True] = False,
) -> P115URL | Coroutine[Any, Any, P115URL]:
"获取下载链接"
def gen_step():
if isinstance(id_or_path, int):
id = id_or_path
else:
attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_)
if attr["is_directory"]:
raise IsADirectoryError(errno.EISDIR, f"{attr['path']!r} (id={attr['id']!r}) is a directory")
id = attr["id"]
return (yield partial(
self.client.share_download_url,
{
"share_code": self.share_code,
"receive_code": self.receive_code,
"file_id": id,
},
headers=headers,
use_web_api=attr.get("violated", False) and attr["size"] < 1024 * 1024 * 115,
request=self.async_request if async_ else self.request,
async_=async_,
))
return run_gen_step(gen_step, async_=async_)
@overload
def iterdir(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
start: int = 0,
stop: None | int = None,
page_size: int = 1_000,
refresh: bool = False,
*,
async_: Literal[False] = False,
**payload,
) -> Iterator[AttrDict]:
...
@overload
def iterdir(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
start: int = 0,
stop: None | int = None,
page_size: int = 1_000,
refresh: bool = False,
*,
async_: Literal[True],
**payload,
) -> AsyncIterator[AttrDict]:
...
def iterdir(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
start: int = 0,
stop: None | int = None,
page_size: int = 1_000,
refresh: bool = False,
*,
async_: Literal[False, True] = False,
**payload,
) -> Iterator[AttrDict] | AsyncIterator[AttrDict]:
"""迭代获取目录内直属的文件或目录的信息
:param payload:
- limit: int = 32
- offset: int = 0
- asc: 0 | 1 = <default> # 是否升序排列
- o: str = <default>
# 用某字段排序:
# - 文件名:"file_name"
# - 文件大小:"file_size"
# - 文件种类:"file_type"
# - 修改时间:"user_utime"
# - 创建时间:"user_ptime"
# - 上次打开时间:"user_otime"
"""
path_class = type(self).path_class
if page_size <= 0:
page_size = 1_000
share_code = self.share_code
receive_code = self.receive_code
def gen_step():
nonlocal start, stop
if stop is not None and (start >= 0 and stop >= 0 or start < 0 and stop < 0) and start >= stop:
return ()
if isinstance(id_or_path, int):
attr = yield partial(self._attr, id_or_path, async_=async_)
elif isinstance(id_or_path, AttrDict):
attr = id_or_path
elif isinstance(id_or_path, path_class):
attr = id_or_path.attr
else:
attr = yield partial(
self._attr_path,
id_or_path,
pid=pid,
ensure_dir=True,
async_=async_,
)
#if not attr["is_directory"]:
try:
isDir = attr["is_directory"]
except:
attr=attr()
isDir = attr["is_directory"]
if not isDir:
raise NotADirectoryError(
errno.ENOTDIR,
f"{attr['path']!r} (id={attr['id']!r}) is not a directory",
)
id = attr["id"]
ancestors = attr["ancestors"]
children: Sequence[AttrDict]
try:
if refresh:
raise KeyError
children = self.pid_to_children[id]
except KeyError:
payload["cid"] = id
payload["limit"] = page_size
offset = int(payload.setdefault("offset", 0))
if offset < 0:
offset = payload["offset"] = 0
else:
payload["offset"] = 0
dirname = attr["path"]
get_files = self.fs_files
path_to_id = self.path_to_id
ls: list[AttrDict] = []
add = ls.append
resp = yield partial(get_files, payload, async_=async_)
try:
data = resp["data"]
except:
resp=resp()
data = resp["data"]
for attr in map(normalize_attr, data["list"]):
attr["ancestors"] = [*ancestors, {"id": attr["id"], "name": attr["name"]}]
path = attr["path"] = joinpath(dirname, escape(attr["name"]))
attr["share_code"] = share_code
attr["receive_code"] = receive_code
path_to_id[path + "/"[:attr["is_directory"]]] = attr["id"]
add(attr)
for _ in range((data["count"] - 1) // page_size):
payload["offset"] += page_size
resp = yield partial(get_files, payload, async_=async_)
try:
data = resp["data"]
except:
resp=resp()
data = resp["data"]
for attr in map(normalize_attr, data["list"]):
attr["ancestors"] = [*ancestors, {"id": attr["id"], "name": attr["name"]}]
path = attr["path"] = joinpath(dirname, escape(attr["name"]))
attr["share_code"] = share_code
attr["receive_code"] = receive_code
path_to_id[path + "/"[:attr["is_directory"]]] = attr["id"]
add(attr)
children = self.pid_to_children[id] = tuple(ls)
self.id_to_attr.update((attr["id"], attr) for attr in children)
else:
count = len(children)
if start < 0:
start += count
if start < 0:
start = 0
if stop is None:
stop = count
elif stop < 0:
stop += count
if start >= stop or stop <= 0 or start >= count:
return ()
key: None | Callable
match payload.get("o"):
case "file_name":
key = lambda attr: attr["name"]
case "file_size":
key = lambda attr: attr.get("size") or 0
case "file_type":
key = lambda attr: attr.get("ico", "folder" if attr["is_directory"] else "")
case "user_utime" | "user_ptime" | "user_otime":
key = lambda attr: attr["time"]
case _:
key = None
if key:
children = sorted(children, key=key, reverse=payload.get("asc", True))
return YieldFrom(children[start:stop])
return run_gen_step_iter(gen_step, may_call=False, async_=async_)
@overload
def receive(
self,
ids: int | str | Iterable[int | str],
/,
to_pid: int = 0,
*,
async_: Literal[False] = False,
) -> dict:
...
@overload
def receive(
self,
ids: int | str | Iterable[int | str],
/,
to_pid: int = 0,
*,
async_: Literal[True],
) -> Coroutine[Any, Any, dict]:
...
def receive(
self,
ids: int | str | Iterable[int | str],
/,
to_pid: int = 0,
*,
async_: Literal[False, True] = False,
) -> dict | Coroutine[Any, Any, dict]:
"""接收分享文件到网盘
:param ids: 要转存到文件 id这些 id 归属分享链接)
:param to_pid: 你的网盘的一个目录 id这个 id 归属你的网盘)
"""
def gen_step():
nonlocal ids
if isinstance(ids, int):
ids = str(ids)
elif isinstance(ids, Iterable):
ids = ",".join(map(str, ids))
if not ids:
raise ValueError("no id (to file) to receive")
payload = {
"share_code": self.share_code,
"receive_code": self.receive_code,
"file_id": ids,
"cid": to_pid,
}
return (yield partial(
self.client.share_receive,
payload,
request=self.async_request if async_ else self.request,
async_=async_,
))
return run_gen_step(gen_step, async_=async_)
@overload
def search(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
page_size: int = 1_000,
*,
async_: Literal[False] = False,
**payload,
) -> Iterator[P115SharePath]:
...
@overload
def search(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
page_size: int = 1_000,
*,
async_: Literal[True],
**payload,
) -> AsyncIterator[P115SharePath]:
...
def search(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
page_size: int = 1_000,
*,
async_: Literal[False, True] = False,
**payload,
) -> Iterator[P115SharePath] | AsyncIterator[P115SharePath]:
"""搜索目录
:param payload:
- share_code: str = <default> 💡 分享码
- receive_code: str = <default> 💡 接收码(即密码)
- limit: int = 32 💡 一页大小,意思就是 page_size
- offset: int = 0 💡 索引偏移,索引从 0 开始计算
- search_value: str = "." 💡 搜索文本,仅支持搜索文件名
- suffix: str = <default> 💡 文件后缀(扩展名),优先级高于 `type`
- type: int = <default> 💡 文件类型
- 0: 全部
- 1: 文档
- 2: 图片
- 3: 音频
- 4: 视频
- 5: 压缩包
- 6: 应用
- 7: 书籍
- 99: 仅文件
"""
if page_size <= 0:
page_size = 1_000
def gen_step():
attr = yield self.attr(id_or_path, pid=pid, async_=async_)
if attr["is_directory"]:
payload["cid"] = attr["id"]
else:
payload["cid"] = attr["parent_id"]
payload["limit"] = page_size
offset = int(payload.setdefault("offset", 0))
if offset < 0:
payload["offset"] = 0
search = self.fs_search
while True:
resp = yield search(payload, async_=async_)
ls = resp["data"]["list"]
if not ls:
return
for attr in ls:
attr = normalize_attr(attr)
yield Yield(P115SharePath(self, attr))
offset = payload["offset"] = offset + resp["page_size"]
if offset >= resp["count"] or offset >= 10_000:
break
if offset + page_size > 10_000:
payload["page_size"] = 10_000 - offset
return run_gen_step_iter(gen_step, may_call=False, async_=async_)
@overload
def stat(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[False] = False,
) -> stat_result:
...
@overload
def stat(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[True],
) -> Coroutine[Any, Any, stat_result]:
...
def stat(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
*,
async_: Literal[False, True] = False,
) -> stat_result | Coroutine[Any, Any, stat_result]:
"检查路径的属性,就像 `os.stat`"
def gen_step():
attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_)
is_dir = attr["is_directory"]
timestamp: float = attr["timestamp"]
return stat_result((
(S_IFDIR if is_dir else S_IFREG) | 0o444, # mode
cast(int, attr["id"]), # ino
cast(int, attr["parent_id"]), # dev
1, # nlink
self.user_id, # uid
1, # gid
cast(int, 0 if is_dir else attr["size"]), # size
timestamp, # atime
timestamp, # mtime
timestamp, # ctime
))
return run_gen_step(gen_step, async_=async_)