1093 lines
35 KiB
Python
1093 lines
35 KiB
Python
#!/usr/bin/env python3
|
||
# encoding: utf-8
|
||
|
||
from __future__ import annotations
|
||
|
||
__author__ = "ChenyangGao <https://chenyanggao.github.io>"
|
||
__all__ = ["P115SharePath", "P115ShareFileSystem"]
|
||
|
||
import errno
|
||
|
||
from collections import deque
|
||
from collections.abc import (
|
||
AsyncIterator, Callable, Coroutine, Iterable, Iterator, Mapping,
|
||
MutableMapping, Sequence,
|
||
)
|
||
from copy import deepcopy
|
||
from datetime import datetime
|
||
from functools import cached_property, partial
|
||
from os import fspath, stat_result, PathLike
|
||
from posixpath import join as joinpath
|
||
from re import compile as re_compile
|
||
from stat import S_IFDIR, S_IFREG
|
||
from time import time
|
||
from typing import cast, overload, Any, Literal, Never, Self
|
||
|
||
from dictattr import AttrDict
|
||
from iterutils import run_gen_step, run_gen_step_iter, Yield, YieldFrom
|
||
from p115client import check_response, normalize_attr, P115URL
|
||
from posixpatht import escape, joins, splits, path_is_dir_form
|
||
|
||
from .client import P115Client
|
||
from .fs_base import IDOrPathType, P115PathBase, P115FileSystemBase
|
||
|
||
|
||
CRE_SHARE_LINK_search1 = re_compile(r"(?:/s/|share\.115\.com/)(?P<share_code>[a-z0-9]+)\?password=(?P<receive_code>[a-z0-9]{4})").search
|
||
CRE_SHARE_LINK_search2 = re_compile(r"(?P<share_code>[a-z0-9]+)-(?P<receive_code>[a-z0-9]{4})").search
|
||
|
||
|
||
class P115SharePath(P115PathBase):
|
||
fs: P115ShareFileSystem
|
||
|
||
@property
|
||
def ancestors(self, /) -> list[dict]:
|
||
try:
|
||
return self["ancestors"]
|
||
except KeyError:
|
||
ancestors = self.fs.get_ancestors(self.id)
|
||
self.attr["path"] = joins([a["name"] for a in ancestors])
|
||
return ancestors
|
||
|
||
@property
|
||
def path(self, /) -> str:
|
||
try:
|
||
return self["path"]
|
||
except KeyError:
|
||
self.ancestors
|
||
return self.attr["path"]
|
||
|
||
@overload
|
||
def search(
|
||
self,
|
||
/,
|
||
async_: Literal[False] = False,
|
||
**payload,
|
||
) -> Iterator[P115SharePath]:
|
||
...
|
||
@overload
|
||
def search(
|
||
self,
|
||
/,
|
||
async_: Literal[True],
|
||
**payload,
|
||
) -> AsyncIterator[P115SharePath]:
|
||
...
|
||
def search(
|
||
self,
|
||
/,
|
||
async_: Literal[False, True] = False,
|
||
**payload,
|
||
) -> Iterator[P115SharePath] | AsyncIterator[P115SharePath]:
|
||
return self.fs.search(self, async_=async_, **payload)
|
||
|
||
|
||
class P115ShareFileSystem(P115FileSystemBase[P115SharePath]):
|
||
share_code: str
|
||
receive_code: str
|
||
path_to_id: MutableMapping[str, int]
|
||
id_to_attr: MutableMapping[int, AttrDict]
|
||
pid_to_children: MutableMapping[int, tuple[AttrDict, ...]]
|
||
full_loaded: bool
|
||
path_class = P115SharePath
|
||
|
||
def __init__(
|
||
self,
|
||
/,
|
||
client: str | P115Client,
|
||
share_code: str,
|
||
receive_code: str = "",
|
||
request: None | Callable = None,
|
||
async_request: None | Callable = None,
|
||
):
|
||
"""115 分享链接的文件系统封装
|
||
|
||
支持以下几种格式的链接(括号内的字符表示可有可无):
|
||
- http(s)://115.com/s/{share_code}?password={receive_code}(#)
|
||
- http(s)://share.115.com/{share_code}?password={receive_code}(#)
|
||
- (/){share_code}-{receive_code}(/)
|
||
"""
|
||
super().__init__(client, request, async_request)
|
||
self.__dict__.update(
|
||
id=0,
|
||
path="/",
|
||
share_code=share_code,
|
||
receive_code=receive_code,
|
||
path_to_id={"/": 0},
|
||
id_to_attr={},
|
||
pid_to_children={},
|
||
full_loaded=False,
|
||
)
|
||
|
||
@classmethod
|
||
def from_url(
|
||
cls,
|
||
/,
|
||
client: str | P115Client,
|
||
url: str,
|
||
request: None | Callable = None,
|
||
async_request: None | Callable = None,
|
||
) -> Self:
|
||
m = CRE_SHARE_LINK_search1(url)
|
||
if m is None:
|
||
m = CRE_SHARE_LINK_search2(url)
|
||
if m is None:
|
||
raise ValueError("not a valid 115 share link")
|
||
return cls(
|
||
client,
|
||
share_code=m["share_code"],
|
||
receive_code=m["receive_code"] or "",
|
||
request=request,
|
||
async_request=async_request,
|
||
)
|
||
|
||
def __repr__(self, /) -> str:
|
||
cls = type(self)
|
||
module = cls.__module__
|
||
name = cls.__qualname__
|
||
if module != "__main__":
|
||
name = module + "." + name
|
||
return f"<{name}(client={self.client!r}, share_code={self.share_code!r}, receive_code={self.receive_code!r}, id={self.id!r}, path={self.path!r}) at {hex(id(self))}>"
|
||
|
||
def __setattr__(self, attr, val, /) -> Never:
|
||
raise TypeError("can't set attributes")
|
||
|
||
@overload
|
||
def downlist(
|
||
self,
|
||
/,
|
||
id: int,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
) -> dict:
|
||
...
|
||
@overload
|
||
def downlist(
|
||
self,
|
||
/,
|
||
id: int,
|
||
*,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, dict]:
|
||
...
|
||
def downlist(
|
||
self,
|
||
/,
|
||
id: int,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
) -> dict | Coroutine[Any, Any, dict]:
|
||
"""获取分享链接的某个文件夹中可下载的文件的列表(只含文件,不含文件夹,任意深度,简略信息)
|
||
"""
|
||
return check_response(self.client.share_downlist( # type: ignore
|
||
{
|
||
"share_code": self.share_code,
|
||
"receive_code": self.receive_code,
|
||
"cid": id,
|
||
},
|
||
request=self.async_request if async_ else self.request,
|
||
async_=async_,
|
||
))
|
||
|
||
@overload
|
||
def fs_files(
|
||
self,
|
||
/,
|
||
payload: dict,
|
||
async_: Literal[False] = False,
|
||
) -> dict:
|
||
...
|
||
@overload
|
||
def fs_files(
|
||
self,
|
||
/,
|
||
payload: dict,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, dict]:
|
||
...
|
||
def fs_files(
|
||
self,
|
||
/,
|
||
payload: dict,
|
||
async_: Literal[False, True] = False,
|
||
) -> dict | Coroutine[Any, Any, dict]:
|
||
"""获取分享链接的某个文件夹中的文件和子文件夹的列表(包含详细信息)
|
||
:param payload:
|
||
- id: int | str = 0
|
||
- limit: int = 32
|
||
- offset: int = 0
|
||
- asc: 0 | 1 = <default> # 是否升序排列
|
||
- o: str = <default>
|
||
# 用某字段排序:
|
||
# - "file_name": 文件名
|
||
# - "file_size": 文件大小
|
||
# - "user_ptime": 创建时间/修改时间
|
||
"""
|
||
return check_response(self.client.share_snap( # type: ignore
|
||
{
|
||
**payload,
|
||
"share_code": self.share_code,
|
||
"receive_code": self.receive_code,
|
||
},
|
||
base_url=True,
|
||
request=self.async_request if async_ else self.request,
|
||
async_=async_,
|
||
))
|
||
|
||
@overload
|
||
def fs_search(
|
||
self,
|
||
payload: str | dict,
|
||
/,
|
||
async_: Literal[False] = False,
|
||
) -> dict:
|
||
...
|
||
@overload
|
||
def fs_search(
|
||
self,
|
||
payload: str | dict,
|
||
/,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, dict]:
|
||
...
|
||
def fs_search(
|
||
self,
|
||
payload: str | dict,
|
||
/,
|
||
async_: Literal[False, True] = False,
|
||
) -> dict | Coroutine[Any, Any, dict]:
|
||
if isinstance(payload, str):
|
||
payload = {"share_code": self.share_code, "receive_code": self.receive_code, "cid": self.id, "search_value": payload}
|
||
else:
|
||
payload = {"share_code": self.share_code, "receive_code": self.receive_code, "cid": self.id, **payload}
|
||
return check_response(self.client.share_search( # type: ignore
|
||
payload,
|
||
request=self.async_request if async_ else self.request,
|
||
async_=async_,
|
||
))
|
||
|
||
@cached_property
|
||
def create_time(self, /) -> datetime:
|
||
"分享的创建时间"
|
||
return datetime.fromtimestamp(self.create_timestamp)
|
||
|
||
@cached_property
|
||
def create_timestamp(self, /) -> int:
|
||
"分享的创建时间"
|
||
return int(self.shareinfo["create_time"])
|
||
|
||
@cached_property
|
||
def snap_id(self, /) -> int:
|
||
"获取这个分享的 id"
|
||
return int(self.shareinfo["snap_id"])
|
||
|
||
@cached_property
|
||
def user_id(self, /) -> int:
|
||
"获取分享者的用户 id"
|
||
return int(self.sharedata["userinfo"]["user_id"])
|
||
|
||
@property
|
||
def sharedata(self, /) -> dict:
|
||
"获取分享的首页数据"
|
||
return self.fs_files({"limit": 1})["data"]
|
||
|
||
@property
|
||
def shareinfo(self, /) -> dict:
|
||
"获取分享信息"
|
||
return self.sharedata["shareinfo"]
|
||
|
||
@overload
|
||
def _search_item(
|
||
self,
|
||
id: int,
|
||
/,
|
||
async_: Literal[False] = False,
|
||
) -> AttrDict:
|
||
...
|
||
@overload
|
||
def _search_item(
|
||
self,
|
||
id: int,
|
||
/,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, AttrDict]:
|
||
...
|
||
def _search_item(
|
||
self,
|
||
id: int,
|
||
/,
|
||
async_: Literal[False, True] = False,
|
||
) -> AttrDict | Coroutine[Any, Any, AttrDict]:
|
||
dq = deque((self.attr(0),))
|
||
get, put = dq.popleft, dq.append
|
||
if async_:
|
||
async def request():
|
||
while dq:
|
||
async for attr in self.iterdir(get(), async_=True):
|
||
if attr["id"] == id:
|
||
return attr
|
||
if attr["is_directory"]:
|
||
put(attr)
|
||
self.__dict__["full_loaded"] = True
|
||
raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}")
|
||
return request()
|
||
else:
|
||
while dq:
|
||
for attr in self.iterdir(get()):
|
||
if attr["id"] == id:
|
||
return attr
|
||
if attr["is_directory"]:
|
||
put(attr)
|
||
self.__dict__["full_loaded"] = True
|
||
raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}")
|
||
|
||
@overload
|
||
def _attr(
|
||
self,
|
||
id: int,
|
||
/,
|
||
async_: Literal[False] = False,
|
||
) -> AttrDict:
|
||
...
|
||
@overload
|
||
def _attr(
|
||
self,
|
||
id: int,
|
||
/,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, AttrDict]:
|
||
...
|
||
def _attr(
|
||
self,
|
||
id: int,
|
||
/,
|
||
async_: Literal[False, True] = False,
|
||
) -> AttrDict | Coroutine[Any, Any, AttrDict]:
|
||
share_code = self.share_code
|
||
receive_code = self.receive_code
|
||
def gen_step():
|
||
try:
|
||
return self.id_to_attr[id]
|
||
except KeyError:
|
||
pass
|
||
if self.full_loaded:
|
||
raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}")
|
||
if id == 0:
|
||
attr = self.id_to_attr[0] = AttrDict({
|
||
"id": 0,
|
||
"parent_id": 0,
|
||
"name": "",
|
||
"path": "/",
|
||
"is_directory": True,
|
||
"size": None,
|
||
"time": self.create_timestamp,
|
||
"ico": "folder",
|
||
"ancestors": [{"id": 0, "name": ""}],
|
||
"share_code": share_code,
|
||
"receive_code": receive_code,
|
||
})
|
||
return attr
|
||
# NOTE: quick detection of id existence
|
||
yield partial(
|
||
self.client.share_download_url,
|
||
{
|
||
"share_code": share_code,
|
||
"receive_code": receive_code,
|
||
"file_id": id,
|
||
},
|
||
strict=False,
|
||
request=self.async_request if async_ else self.request,
|
||
async_=async_,
|
||
)
|
||
return (yield partial(self._search_item, id, async_=async_))
|
||
return run_gen_step(gen_step, async_=async_)
|
||
|
||
@overload
|
||
def _attr_path(
|
||
self,
|
||
path: str | PathLike[str] | Sequence[str],
|
||
/,
|
||
pid: None | int = None,
|
||
ensure_dir: bool = False,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
) -> AttrDict:
|
||
...
|
||
@overload
|
||
def _attr_path(
|
||
self,
|
||
path: str | PathLike[str] | Sequence[str],
|
||
/,
|
||
pid: None | int = None,
|
||
ensure_dir: bool = False,
|
||
*,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, AttrDict]:
|
||
...
|
||
def _attr_path(
|
||
self,
|
||
path: str | PathLike[str] | Sequence[str],
|
||
/,
|
||
pid: None | int = None,
|
||
ensure_dir: bool = False,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
) -> AttrDict | Coroutine[Any, Any, AttrDict]:
|
||
def gen_step():
|
||
nonlocal path, pid, ensure_dir
|
||
|
||
if isinstance(path, PathLike):
|
||
path = fspath(path)
|
||
if pid is None:
|
||
pid = self.id
|
||
if not path or path == ".":
|
||
return (yield partial(self._attr, pid, async_=async_))
|
||
|
||
parents = 0
|
||
if isinstance(path, str):
|
||
if not ensure_dir:
|
||
ensure_dir = path_is_dir_form(path)
|
||
patht, parents = splits(path)
|
||
if not (patht or parents):
|
||
return (yield partial(self._attr, pid, async_=async_))
|
||
else:
|
||
if not ensure_dir:
|
||
ensure_dir = path[-1] == ""
|
||
patht = [path[0], *(p for p in path[1:] if p)]
|
||
if patht == [""]:
|
||
return self._attr(0)
|
||
elif patht and patht[0] == "":
|
||
pid = 0
|
||
|
||
ancestor_patht: list[str] = []
|
||
if pid == 0:
|
||
if patht[0] != "":
|
||
patht.insert(0, "")
|
||
else:
|
||
ancestors = yield partial(self.get_ancestors, pid, async_=async_)
|
||
if parents:
|
||
if parents >= len(ancestors):
|
||
pid = 0
|
||
else:
|
||
pid = cast(int, ancestors[-parents-1]["id"])
|
||
ancestor_patht = ["", *(a["name"] for a in ancestors[1:-parents])]
|
||
else:
|
||
ancestor_patht = ["", *(a["name"] for a in ancestors[1:])]
|
||
if not patht:
|
||
return (yield partial(self._attr, pid, async_=async_))
|
||
|
||
if pid == 0:
|
||
dirname = ""
|
||
ancestors_paths: list[str] = [(dirname := f"{dirname}/{escape(name)}") for name in patht[1:]]
|
||
else:
|
||
dirname = joins(ancestor_patht)
|
||
ancestors_paths = [(dirname := f"{dirname}/{escape(name)}") for name in patht]
|
||
|
||
fullpath = ancestors_paths[-1]
|
||
path_to_id = self.path_to_id
|
||
if path_to_id:
|
||
if not ensure_dir and (id := path_to_id.get(fullpath)):
|
||
return (yield partial(self._attr, id, async_=async_))
|
||
if (id := path_to_id.get(fullpath + "/")):
|
||
return (yield partial(self._attr, id, async_=async_))
|
||
if self.full_loaded:
|
||
raise FileNotFoundError(
|
||
errno.ENOENT,
|
||
f"no such path {fullpath!r} (in {pid!r})",
|
||
)
|
||
|
||
parent: int | AttrDict
|
||
for i in reversed(range(len(ancestors_paths)-1)):
|
||
if path_to_id and (id := path_to_id.get((dirname := ancestors_paths[i]) + "/")):
|
||
parent = yield partial(self._attr, id, async_=async_)
|
||
i += 1
|
||
break
|
||
else:
|
||
i = 0
|
||
parent = pid
|
||
|
||
if pid == 0:
|
||
i += 1
|
||
|
||
attr: AttrDict
|
||
last_idx = len(patht) - 1
|
||
if async_:
|
||
for i, name in enumerate(patht[i:], i):
|
||
async def step():
|
||
nonlocal attr, parent
|
||
async for attr in self.iterdir(parent, async_=True):
|
||
if attr["name"] == name:
|
||
if ensure_dir or i < last_idx:
|
||
if attr["is_directory"]:
|
||
parent = attr
|
||
break
|
||
else:
|
||
break
|
||
else:
|
||
if isinstance(parent, AttrDict):
|
||
parent = parent["id"]
|
||
raise FileNotFoundError(
|
||
errno.ENOENT,
|
||
f"no such file {name!r} (in {parent} @ {joins(patht[:i])!r})",
|
||
)
|
||
yield step
|
||
else:
|
||
for i, name in enumerate(patht[i:], i):
|
||
for attr in self.iterdir(parent):
|
||
if attr["name"] == name:
|
||
if ensure_dir or i < last_idx:
|
||
if attr["is_directory"]:
|
||
parent = attr
|
||
break
|
||
else:
|
||
break
|
||
else:
|
||
if isinstance(parent, AttrDict):
|
||
parent = parent["id"]
|
||
raise FileNotFoundError(
|
||
errno.ENOENT,
|
||
f"no such file {name!r} (in {parent} @ {joins(patht[:i])!r})",
|
||
)
|
||
return attr
|
||
return run_gen_step(gen_step, async_=async_)
|
||
|
||
@overload
|
||
def attr(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
ensure_dir: bool = False,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
) -> AttrDict:
|
||
...
|
||
@overload
|
||
def attr(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
ensure_dir: bool = False,
|
||
*,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, AttrDict]:
|
||
...
|
||
def attr(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
ensure_dir: bool = False,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
) -> AttrDict | Coroutine[Any, Any, AttrDict]:
|
||
"获取属性"
|
||
def gen_step():
|
||
path_class = type(self).path_class
|
||
if isinstance(id_or_path, path_class):
|
||
attr = id_or_path.attr
|
||
elif isinstance(id_or_path, AttrDict):
|
||
attr = id_or_path
|
||
elif isinstance(id_or_path, int):
|
||
attr = yield partial(self._attr, id_or_path, async_=async_)
|
||
else:
|
||
return (yield partial(
|
||
self._attr_path,
|
||
id_or_path,
|
||
pid=pid,
|
||
ensure_dir=ensure_dir,
|
||
async_=async_,
|
||
))
|
||
if ensure_dir and not attr["is_directory"]:
|
||
raise NotADirectoryError(
|
||
errno.ENOTDIR,
|
||
f"{attr['id']} (id={attr['id']}) is not directory"
|
||
)
|
||
return attr
|
||
return run_gen_step(gen_step, async_=async_)
|
||
|
||
@overload
|
||
def dirlen(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
) -> int:
|
||
...
|
||
@overload
|
||
def dirlen(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, int]:
|
||
...
|
||
def dirlen(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
) -> int | Coroutine[Any, Any, int]:
|
||
"文件夹中的项目数(直属的文件和目录计数)"
|
||
def gen_step():
|
||
id = yield partial(self.get_id, id_or_path, pid=pid, async_=async_)
|
||
if (children := self.pid_to_children.get(id)) is not None:
|
||
return len(children)
|
||
resp = yield partial(
|
||
self.fs_files,
|
||
{"cid": id, "limit": 1},
|
||
async_=async_,
|
||
)
|
||
return resp["data"]["count"]
|
||
return run_gen_step(gen_step, async_=async_)
|
||
|
||
@overload
|
||
def get_ancestors(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
) -> list[dict]:
|
||
...
|
||
@overload
|
||
def get_ancestors(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, list[dict]]:
|
||
...
|
||
def get_ancestors(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
) -> list[dict] | Coroutine[Any, Any, list[dict]]:
|
||
"获取各个上级目录的少量信息(从根目录到当前目录)"
|
||
def gen_step():
|
||
attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_)
|
||
return deepcopy(attr["ancestors"])
|
||
return run_gen_step(gen_step, async_=async_)
|
||
|
||
@overload
|
||
def get_url(
|
||
self,
|
||
id_or_path: IDOrPathType,
|
||
/,
|
||
pid: None | int = None,
|
||
headers: None | Mapping = None,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
) -> P115URL:
|
||
...
|
||
@overload
|
||
def get_url(
|
||
self,
|
||
id_or_path: IDOrPathType,
|
||
/,
|
||
pid: None | int = None,
|
||
headers: None | Mapping = None,
|
||
*,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, P115URL]:
|
||
...
|
||
def get_url(
|
||
self,
|
||
id_or_path: IDOrPathType,
|
||
/,
|
||
pid: None | int = None,
|
||
headers: None | Mapping = None,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
) -> P115URL | Coroutine[Any, Any, P115URL]:
|
||
"获取下载链接"
|
||
def gen_step():
|
||
if isinstance(id_or_path, int):
|
||
id = id_or_path
|
||
else:
|
||
attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_)
|
||
if attr["is_directory"]:
|
||
raise IsADirectoryError(errno.EISDIR, f"{attr['path']!r} (id={attr['id']!r}) is a directory")
|
||
id = attr["id"]
|
||
return (yield partial(
|
||
self.client.share_download_url,
|
||
{
|
||
"share_code": self.share_code,
|
||
"receive_code": self.receive_code,
|
||
"file_id": id,
|
||
},
|
||
headers=headers,
|
||
use_web_api=attr.get("violated", False) and attr["size"] < 1024 * 1024 * 115,
|
||
request=self.async_request if async_ else self.request,
|
||
async_=async_,
|
||
))
|
||
return run_gen_step(gen_step, async_=async_)
|
||
|
||
@overload
|
||
def iterdir(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
start: int = 0,
|
||
stop: None | int = None,
|
||
page_size: int = 1_000,
|
||
refresh: bool = False,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
**payload,
|
||
) -> Iterator[AttrDict]:
|
||
...
|
||
@overload
|
||
def iterdir(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
start: int = 0,
|
||
stop: None | int = None,
|
||
page_size: int = 1_000,
|
||
refresh: bool = False,
|
||
*,
|
||
async_: Literal[True],
|
||
**payload,
|
||
) -> AsyncIterator[AttrDict]:
|
||
...
|
||
def iterdir(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
start: int = 0,
|
||
stop: None | int = None,
|
||
page_size: int = 1_000,
|
||
refresh: bool = False,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
**payload,
|
||
) -> Iterator[AttrDict] | AsyncIterator[AttrDict]:
|
||
"""迭代获取目录内直属的文件或目录的信息
|
||
:param payload:
|
||
- limit: int = 32
|
||
- offset: int = 0
|
||
- asc: 0 | 1 = <default> # 是否升序排列
|
||
- o: str = <default>
|
||
# 用某字段排序:
|
||
# - 文件名:"file_name"
|
||
# - 文件大小:"file_size"
|
||
# - 文件种类:"file_type"
|
||
# - 修改时间:"user_utime"
|
||
# - 创建时间:"user_ptime"
|
||
# - 上次打开时间:"user_otime"
|
||
"""
|
||
path_class = type(self).path_class
|
||
if page_size <= 0:
|
||
page_size = 1_000
|
||
share_code = self.share_code
|
||
receive_code = self.receive_code
|
||
def gen_step():
|
||
nonlocal start, stop
|
||
if stop is not None and (start >= 0 and stop >= 0 or start < 0 and stop < 0) and start >= stop:
|
||
return ()
|
||
if isinstance(id_or_path, int):
|
||
attr = yield partial(self._attr, id_or_path, async_=async_)
|
||
elif isinstance(id_or_path, AttrDict):
|
||
attr = id_or_path
|
||
elif isinstance(id_or_path, path_class):
|
||
attr = id_or_path.attr
|
||
else:
|
||
attr = yield partial(
|
||
self._attr_path,
|
||
id_or_path,
|
||
pid=pid,
|
||
ensure_dir=True,
|
||
async_=async_,
|
||
)
|
||
#if not attr["is_directory"]:
|
||
try:
|
||
isDir = attr["is_directory"]
|
||
except:
|
||
attr=attr()
|
||
isDir = attr["is_directory"]
|
||
if not isDir:
|
||
raise NotADirectoryError(
|
||
errno.ENOTDIR,
|
||
f"{attr['path']!r} (id={attr['id']!r}) is not a directory",
|
||
)
|
||
id = attr["id"]
|
||
ancestors = attr["ancestors"]
|
||
children: Sequence[AttrDict]
|
||
try:
|
||
if refresh:
|
||
raise KeyError
|
||
children = self.pid_to_children[id]
|
||
except KeyError:
|
||
payload["cid"] = id
|
||
payload["limit"] = page_size
|
||
offset = int(payload.setdefault("offset", 0))
|
||
if offset < 0:
|
||
offset = payload["offset"] = 0
|
||
else:
|
||
payload["offset"] = 0
|
||
dirname = attr["path"]
|
||
get_files = self.fs_files
|
||
path_to_id = self.path_to_id
|
||
ls: list[AttrDict] = []
|
||
add = ls.append
|
||
resp = yield partial(get_files, payload, async_=async_)
|
||
|
||
try:
|
||
data = resp["data"]
|
||
except:
|
||
resp=resp()
|
||
data = resp["data"]
|
||
for attr in map(normalize_attr, data["list"]):
|
||
attr["ancestors"] = [*ancestors, {"id": attr["id"], "name": attr["name"]}]
|
||
path = attr["path"] = joinpath(dirname, escape(attr["name"]))
|
||
attr["share_code"] = share_code
|
||
attr["receive_code"] = receive_code
|
||
path_to_id[path + "/"[:attr["is_directory"]]] = attr["id"]
|
||
add(attr)
|
||
for _ in range((data["count"] - 1) // page_size):
|
||
payload["offset"] += page_size
|
||
resp = yield partial(get_files, payload, async_=async_)
|
||
try:
|
||
data = resp["data"]
|
||
except:
|
||
resp=resp()
|
||
data = resp["data"]
|
||
for attr in map(normalize_attr, data["list"]):
|
||
attr["ancestors"] = [*ancestors, {"id": attr["id"], "name": attr["name"]}]
|
||
path = attr["path"] = joinpath(dirname, escape(attr["name"]))
|
||
attr["share_code"] = share_code
|
||
attr["receive_code"] = receive_code
|
||
path_to_id[path + "/"[:attr["is_directory"]]] = attr["id"]
|
||
add(attr)
|
||
children = self.pid_to_children[id] = tuple(ls)
|
||
self.id_to_attr.update((attr["id"], attr) for attr in children)
|
||
else:
|
||
count = len(children)
|
||
if start < 0:
|
||
start += count
|
||
if start < 0:
|
||
start = 0
|
||
if stop is None:
|
||
stop = count
|
||
elif stop < 0:
|
||
stop += count
|
||
if start >= stop or stop <= 0 or start >= count:
|
||
return ()
|
||
key: None | Callable
|
||
match payload.get("o"):
|
||
case "file_name":
|
||
key = lambda attr: attr["name"]
|
||
case "file_size":
|
||
key = lambda attr: attr.get("size") or 0
|
||
case "file_type":
|
||
key = lambda attr: attr.get("ico", "folder" if attr["is_directory"] else "")
|
||
case "user_utime" | "user_ptime" | "user_otime":
|
||
key = lambda attr: attr["time"]
|
||
case _:
|
||
key = None
|
||
if key:
|
||
children = sorted(children, key=key, reverse=payload.get("asc", True))
|
||
return YieldFrom(children[start:stop])
|
||
return run_gen_step_iter(gen_step, may_call=False, async_=async_)
|
||
|
||
@overload
|
||
def receive(
|
||
self,
|
||
ids: int | str | Iterable[int | str],
|
||
/,
|
||
to_pid: int = 0,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
) -> dict:
|
||
...
|
||
@overload
|
||
def receive(
|
||
self,
|
||
ids: int | str | Iterable[int | str],
|
||
/,
|
||
to_pid: int = 0,
|
||
*,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, dict]:
|
||
...
|
||
def receive(
|
||
self,
|
||
ids: int | str | Iterable[int | str],
|
||
/,
|
||
to_pid: int = 0,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
) -> dict | Coroutine[Any, Any, dict]:
|
||
"""接收分享文件到网盘
|
||
:param ids: 要转存到文件 id(这些 id 归属分享链接)
|
||
:param to_pid: 你的网盘的一个目录 id(这个 id 归属你的网盘)
|
||
"""
|
||
def gen_step():
|
||
nonlocal ids
|
||
if isinstance(ids, int):
|
||
ids = str(ids)
|
||
elif isinstance(ids, Iterable):
|
||
ids = ",".join(map(str, ids))
|
||
if not ids:
|
||
raise ValueError("no id (to file) to receive")
|
||
payload = {
|
||
"share_code": self.share_code,
|
||
"receive_code": self.receive_code,
|
||
"file_id": ids,
|
||
"cid": to_pid,
|
||
}
|
||
return (yield partial(
|
||
self.client.share_receive,
|
||
payload,
|
||
request=self.async_request if async_ else self.request,
|
||
async_=async_,
|
||
))
|
||
return run_gen_step(gen_step, async_=async_)
|
||
|
||
@overload
|
||
def search(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
page_size: int = 1_000,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
**payload,
|
||
) -> Iterator[P115SharePath]:
|
||
...
|
||
@overload
|
||
def search(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
page_size: int = 1_000,
|
||
*,
|
||
async_: Literal[True],
|
||
**payload,
|
||
) -> AsyncIterator[P115SharePath]:
|
||
...
|
||
def search(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
page_size: int = 1_000,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
**payload,
|
||
) -> Iterator[P115SharePath] | AsyncIterator[P115SharePath]:
|
||
"""搜索目录
|
||
|
||
:param payload:
|
||
- share_code: str = <default> 💡 分享码
|
||
- receive_code: str = <default> 💡 接收码(即密码)
|
||
- limit: int = 32 💡 一页大小,意思就是 page_size
|
||
- offset: int = 0 💡 索引偏移,索引从 0 开始计算
|
||
- search_value: str = "." 💡 搜索文本,仅支持搜索文件名
|
||
- suffix: str = <default> 💡 文件后缀(扩展名),优先级高于 `type`
|
||
- type: int = <default> 💡 文件类型
|
||
|
||
- 0: 全部
|
||
- 1: 文档
|
||
- 2: 图片
|
||
- 3: 音频
|
||
- 4: 视频
|
||
- 5: 压缩包
|
||
- 6: 应用
|
||
- 7: 书籍
|
||
- 99: 仅文件
|
||
"""
|
||
if page_size <= 0:
|
||
page_size = 1_000
|
||
def gen_step():
|
||
attr = yield self.attr(id_or_path, pid=pid, async_=async_)
|
||
if attr["is_directory"]:
|
||
payload["cid"] = attr["id"]
|
||
else:
|
||
payload["cid"] = attr["parent_id"]
|
||
payload["limit"] = page_size
|
||
offset = int(payload.setdefault("offset", 0))
|
||
if offset < 0:
|
||
payload["offset"] = 0
|
||
search = self.fs_search
|
||
while True:
|
||
resp = yield search(payload, async_=async_)
|
||
ls = resp["data"]["list"]
|
||
if not ls:
|
||
return
|
||
for attr in ls:
|
||
attr = normalize_attr(attr)
|
||
yield Yield(P115SharePath(self, attr))
|
||
offset = payload["offset"] = offset + resp["page_size"]
|
||
if offset >= resp["count"] or offset >= 10_000:
|
||
break
|
||
if offset + page_size > 10_000:
|
||
payload["page_size"] = 10_000 - offset
|
||
return run_gen_step_iter(gen_step, may_call=False, async_=async_)
|
||
|
||
@overload
|
||
def stat(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[False] = False,
|
||
) -> stat_result:
|
||
...
|
||
@overload
|
||
def stat(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[True],
|
||
) -> Coroutine[Any, Any, stat_result]:
|
||
...
|
||
def stat(
|
||
self,
|
||
id_or_path: IDOrPathType = "",
|
||
/,
|
||
pid: None | int = None,
|
||
*,
|
||
async_: Literal[False, True] = False,
|
||
) -> stat_result | Coroutine[Any, Any, stat_result]:
|
||
"检查路径的属性,就像 `os.stat`"
|
||
def gen_step():
|
||
attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_)
|
||
is_dir = attr["is_directory"]
|
||
timestamp: float = attr["timestamp"]
|
||
return stat_result((
|
||
(S_IFDIR if is_dir else S_IFREG) | 0o444, # mode
|
||
cast(int, attr["id"]), # ino
|
||
cast(int, attr["parent_id"]), # dev
|
||
1, # nlink
|
||
self.user_id, # uid
|
||
1, # gid
|
||
cast(int, 0 if is_dir else attr["size"]), # size
|
||
timestamp, # atime
|
||
timestamp, # mtime
|
||
timestamp, # ctime
|
||
))
|
||
return run_gen_step(gen_step, async_=async_)
|
||
|