#!/usr/bin/env python3 # encoding: utf-8 from __future__ import annotations __author__ = "ChenyangGao " __all__ = ["P115SharePath", "P115ShareFileSystem"] import errno from collections import deque from collections.abc import ( AsyncIterator, Callable, Coroutine, Iterable, Iterator, Mapping, MutableMapping, Sequence, ) from copy import deepcopy from datetime import datetime from functools import cached_property, partial from os import fspath, stat_result, PathLike from posixpath import join as joinpath from re import compile as re_compile from stat import S_IFDIR, S_IFREG from time import time from typing import cast, overload, Any, Literal, Never, Self from dictattr import AttrDict from iterutils import run_gen_step, run_gen_step_iter, Yield, YieldFrom from p115client import check_response, normalize_attr, P115URL from posixpatht import escape, joins, splits, path_is_dir_form from .client import P115Client from .fs_base import IDOrPathType, P115PathBase, P115FileSystemBase CRE_SHARE_LINK_search1 = re_compile(r"(?:/s/|share\.115\.com/)(?P[a-z0-9]+)\?password=(?P[a-z0-9]{4})").search CRE_SHARE_LINK_search2 = re_compile(r"(?P[a-z0-9]+)-(?P[a-z0-9]{4})").search class P115SharePath(P115PathBase): fs: P115ShareFileSystem @property def ancestors(self, /) -> list[dict]: try: return self["ancestors"] except KeyError: ancestors = self.fs.get_ancestors(self.id) self.attr["path"] = joins([a["name"] for a in ancestors]) return ancestors @property def path(self, /) -> str: try: return self["path"] except KeyError: self.ancestors return self.attr["path"] @overload def search( self, /, async_: Literal[False] = False, **payload, ) -> Iterator[P115SharePath]: ... @overload def search( self, /, async_: Literal[True], **payload, ) -> AsyncIterator[P115SharePath]: ... def search( self, /, async_: Literal[False, True] = False, **payload, ) -> Iterator[P115SharePath] | AsyncIterator[P115SharePath]: return self.fs.search(self, async_=async_, **payload) class P115ShareFileSystem(P115FileSystemBase[P115SharePath]): share_code: str receive_code: str path_to_id: MutableMapping[str, int] id_to_attr: MutableMapping[int, AttrDict] pid_to_children: MutableMapping[int, tuple[AttrDict, ...]] full_loaded: bool path_class = P115SharePath def __init__( self, /, client: str | P115Client, share_code: str, receive_code: str = "", request: None | Callable = None, async_request: None | Callable = None, ): """115 分享链接的文件系统封装 支持以下几种格式的链接(括号内的字符表示可有可无): - http(s)://115.com/s/{share_code}?password={receive_code}(#) - http(s)://share.115.com/{share_code}?password={receive_code}(#) - (/){share_code}-{receive_code}(/) """ super().__init__(client, request, async_request) self.__dict__.update( id=0, path="/", share_code=share_code, receive_code=receive_code, path_to_id={"/": 0}, id_to_attr={}, pid_to_children={}, full_loaded=False, ) @classmethod def from_url( cls, /, client: str | P115Client, url: str, request: None | Callable = None, async_request: None | Callable = None, ) -> Self: m = CRE_SHARE_LINK_search1(url) if m is None: m = CRE_SHARE_LINK_search2(url) if m is None: raise ValueError("not a valid 115 share link") return cls( client, share_code=m["share_code"], receive_code=m["receive_code"] or "", request=request, async_request=async_request, ) def __repr__(self, /) -> str: cls = type(self) module = cls.__module__ name = cls.__qualname__ if module != "__main__": name = module + "." + name return f"<{name}(client={self.client!r}, share_code={self.share_code!r}, receive_code={self.receive_code!r}, id={self.id!r}, path={self.path!r}) at {hex(id(self))}>" def __setattr__(self, attr, val, /) -> Never: raise TypeError("can't set attributes") @overload def downlist( self, /, id: int, *, async_: Literal[False] = False, ) -> dict: ... @overload def downlist( self, /, id: int, *, async_: Literal[True], ) -> Coroutine[Any, Any, dict]: ... def downlist( self, /, id: int, *, async_: Literal[False, True] = False, ) -> dict | Coroutine[Any, Any, dict]: """获取分享链接的某个文件夹中可下载的文件的列表(只含文件,不含文件夹,任意深度,简略信息) """ return check_response(self.client.share_downlist( # type: ignore { "share_code": self.share_code, "receive_code": self.receive_code, "cid": id, }, request=self.async_request if async_ else self.request, async_=async_, )) @overload def fs_files( self, /, payload: dict, async_: Literal[False] = False, ) -> dict: ... @overload def fs_files( self, /, payload: dict, async_: Literal[True], ) -> Coroutine[Any, Any, dict]: ... def fs_files( self, /, payload: dict, async_: Literal[False, True] = False, ) -> dict | Coroutine[Any, Any, dict]: """获取分享链接的某个文件夹中的文件和子文件夹的列表(包含详细信息) :param payload: - id: int | str = 0 - limit: int = 32 - offset: int = 0 - asc: 0 | 1 = # 是否升序排列 - o: str = # 用某字段排序: # - "file_name": 文件名 # - "file_size": 文件大小 # - "user_ptime": 创建时间/修改时间 """ return check_response(self.client.share_snap( # type: ignore { **payload, "share_code": self.share_code, "receive_code": self.receive_code, }, base_url=True, request=self.async_request if async_ else self.request, async_=async_, )) @overload def fs_search( self, payload: str | dict, /, async_: Literal[False] = False, ) -> dict: ... @overload def fs_search( self, payload: str | dict, /, async_: Literal[True], ) -> Coroutine[Any, Any, dict]: ... def fs_search( self, payload: str | dict, /, async_: Literal[False, True] = False, ) -> dict | Coroutine[Any, Any, dict]: if isinstance(payload, str): payload = {"share_code": self.share_code, "receive_code": self.receive_code, "cid": self.id, "search_value": payload} else: payload = {"share_code": self.share_code, "receive_code": self.receive_code, "cid": self.id, **payload} return check_response(self.client.share_search( # type: ignore payload, request=self.async_request if async_ else self.request, async_=async_, )) @cached_property def create_time(self, /) -> datetime: "分享的创建时间" return datetime.fromtimestamp(self.create_timestamp) @cached_property def create_timestamp(self, /) -> int: "分享的创建时间" return int(self.shareinfo["create_time"]) @cached_property def snap_id(self, /) -> int: "获取这个分享的 id" return int(self.shareinfo["snap_id"]) @cached_property def user_id(self, /) -> int: "获取分享者的用户 id" return int(self.sharedata["userinfo"]["user_id"]) @property def sharedata(self, /) -> dict: "获取分享的首页数据" return self.fs_files({"limit": 1})["data"] @property def shareinfo(self, /) -> dict: "获取分享信息" return self.sharedata["shareinfo"] @overload def _search_item( self, id: int, /, async_: Literal[False] = False, ) -> AttrDict: ... @overload def _search_item( self, id: int, /, async_: Literal[True], ) -> Coroutine[Any, Any, AttrDict]: ... def _search_item( self, id: int, /, async_: Literal[False, True] = False, ) -> AttrDict | Coroutine[Any, Any, AttrDict]: dq = deque((self.attr(0),)) get, put = dq.popleft, dq.append if async_: async def request(): while dq: async for attr in self.iterdir(get(), async_=True): if attr["id"] == id: return attr if attr["is_directory"]: put(attr) self.__dict__["full_loaded"] = True raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}") return request() else: while dq: for attr in self.iterdir(get()): if attr["id"] == id: return attr if attr["is_directory"]: put(attr) self.__dict__["full_loaded"] = True raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}") @overload def _attr( self, id: int, /, async_: Literal[False] = False, ) -> AttrDict: ... @overload def _attr( self, id: int, /, async_: Literal[True], ) -> Coroutine[Any, Any, AttrDict]: ... def _attr( self, id: int, /, async_: Literal[False, True] = False, ) -> AttrDict | Coroutine[Any, Any, AttrDict]: share_code = self.share_code receive_code = self.receive_code def gen_step(): try: return self.id_to_attr[id] except KeyError: pass if self.full_loaded: raise FileNotFoundError(errno.ENOENT, f"no such id: {id!r}") if id == 0: attr = self.id_to_attr[0] = AttrDict({ "id": 0, "parent_id": 0, "name": "", "path": "/", "is_directory": True, "size": None, "time": self.create_timestamp, "ico": "folder", "ancestors": [{"id": 0, "name": ""}], "share_code": share_code, "receive_code": receive_code, }) return attr # NOTE: quick detection of id existence yield partial( self.client.share_download_url, { "share_code": share_code, "receive_code": receive_code, "file_id": id, }, strict=False, request=self.async_request if async_ else self.request, async_=async_, ) return (yield partial(self._search_item, id, async_=async_)) return run_gen_step(gen_step, async_=async_) @overload def _attr_path( self, path: str | PathLike[str] | Sequence[str], /, pid: None | int = None, ensure_dir: bool = False, *, async_: Literal[False] = False, ) -> AttrDict: ... @overload def _attr_path( self, path: str | PathLike[str] | Sequence[str], /, pid: None | int = None, ensure_dir: bool = False, *, async_: Literal[True], ) -> Coroutine[Any, Any, AttrDict]: ... def _attr_path( self, path: str | PathLike[str] | Sequence[str], /, pid: None | int = None, ensure_dir: bool = False, *, async_: Literal[False, True] = False, ) -> AttrDict | Coroutine[Any, Any, AttrDict]: def gen_step(): nonlocal path, pid, ensure_dir if isinstance(path, PathLike): path = fspath(path) if pid is None: pid = self.id if not path or path == ".": return (yield partial(self._attr, pid, async_=async_)) parents = 0 if isinstance(path, str): if not ensure_dir: ensure_dir = path_is_dir_form(path) patht, parents = splits(path) if not (patht or parents): return (yield partial(self._attr, pid, async_=async_)) else: if not ensure_dir: ensure_dir = path[-1] == "" patht = [path[0], *(p for p in path[1:] if p)] if patht == [""]: return self._attr(0) elif patht and patht[0] == "": pid = 0 ancestor_patht: list[str] = [] if pid == 0: if patht[0] != "": patht.insert(0, "") else: ancestors = yield partial(self.get_ancestors, pid, async_=async_) if parents: if parents >= len(ancestors): pid = 0 else: pid = cast(int, ancestors[-parents-1]["id"]) ancestor_patht = ["", *(a["name"] for a in ancestors[1:-parents])] else: ancestor_patht = ["", *(a["name"] for a in ancestors[1:])] if not patht: return (yield partial(self._attr, pid, async_=async_)) if pid == 0: dirname = "" ancestors_paths: list[str] = [(dirname := f"{dirname}/{escape(name)}") for name in patht[1:]] else: dirname = joins(ancestor_patht) ancestors_paths = [(dirname := f"{dirname}/{escape(name)}") for name in patht] fullpath = ancestors_paths[-1] path_to_id = self.path_to_id if path_to_id: if not ensure_dir and (id := path_to_id.get(fullpath)): return (yield partial(self._attr, id, async_=async_)) if (id := path_to_id.get(fullpath + "/")): return (yield partial(self._attr, id, async_=async_)) if self.full_loaded: raise FileNotFoundError( errno.ENOENT, f"no such path {fullpath!r} (in {pid!r})", ) parent: int | AttrDict for i in reversed(range(len(ancestors_paths)-1)): if path_to_id and (id := path_to_id.get((dirname := ancestors_paths[i]) + "/")): parent = yield partial(self._attr, id, async_=async_) i += 1 break else: i = 0 parent = pid if pid == 0: i += 1 attr: AttrDict last_idx = len(patht) - 1 if async_: for i, name in enumerate(patht[i:], i): async def step(): nonlocal attr, parent async for attr in self.iterdir(parent, async_=True): if attr["name"] == name: if ensure_dir or i < last_idx: if attr["is_directory"]: parent = attr break else: break else: if isinstance(parent, AttrDict): parent = parent["id"] raise FileNotFoundError( errno.ENOENT, f"no such file {name!r} (in {parent} @ {joins(patht[:i])!r})", ) yield step else: for i, name in enumerate(patht[i:], i): for attr in self.iterdir(parent): if attr["name"] == name: if ensure_dir or i < last_idx: if attr["is_directory"]: parent = attr break else: break else: if isinstance(parent, AttrDict): parent = parent["id"] raise FileNotFoundError( errno.ENOENT, f"no such file {name!r} (in {parent} @ {joins(patht[:i])!r})", ) return attr return run_gen_step(gen_step, async_=async_) @overload def attr( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, ensure_dir: bool = False, *, async_: Literal[False] = False, ) -> AttrDict: ... @overload def attr( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, ensure_dir: bool = False, *, async_: Literal[True], ) -> Coroutine[Any, Any, AttrDict]: ... def attr( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, ensure_dir: bool = False, *, async_: Literal[False, True] = False, ) -> AttrDict | Coroutine[Any, Any, AttrDict]: "获取属性" def gen_step(): path_class = type(self).path_class if isinstance(id_or_path, path_class): attr = id_or_path.attr elif isinstance(id_or_path, AttrDict): attr = id_or_path elif isinstance(id_or_path, int): attr = yield partial(self._attr, id_or_path, async_=async_) else: return (yield partial( self._attr_path, id_or_path, pid=pid, ensure_dir=ensure_dir, async_=async_, )) if ensure_dir and not attr["is_directory"]: raise NotADirectoryError( errno.ENOTDIR, f"{attr['id']} (id={attr['id']}) is not directory" ) return attr return run_gen_step(gen_step, async_=async_) @overload def dirlen( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False] = False, ) -> int: ... @overload def dirlen( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[True], ) -> Coroutine[Any, Any, int]: ... def dirlen( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False, True] = False, ) -> int | Coroutine[Any, Any, int]: "文件夹中的项目数(直属的文件和目录计数)" def gen_step(): id = yield partial(self.get_id, id_or_path, pid=pid, async_=async_) if (children := self.pid_to_children.get(id)) is not None: return len(children) resp = yield partial( self.fs_files, {"cid": id, "limit": 1}, async_=async_, ) return resp["data"]["count"] return run_gen_step(gen_step, async_=async_) @overload def get_ancestors( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False] = False, ) -> list[dict]: ... @overload def get_ancestors( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[True], ) -> Coroutine[Any, Any, list[dict]]: ... def get_ancestors( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False, True] = False, ) -> list[dict] | Coroutine[Any, Any, list[dict]]: "获取各个上级目录的少量信息(从根目录到当前目录)" def gen_step(): attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_) return deepcopy(attr["ancestors"]) return run_gen_step(gen_step, async_=async_) @overload def get_url( self, id_or_path: IDOrPathType, /, pid: None | int = None, headers: None | Mapping = None, *, async_: Literal[False] = False, ) -> P115URL: ... @overload def get_url( self, id_or_path: IDOrPathType, /, pid: None | int = None, headers: None | Mapping = None, *, async_: Literal[True], ) -> Coroutine[Any, Any, P115URL]: ... def get_url( self, id_or_path: IDOrPathType, /, pid: None | int = None, headers: None | Mapping = None, *, async_: Literal[False, True] = False, ) -> P115URL | Coroutine[Any, Any, P115URL]: "获取下载链接" def gen_step(): if isinstance(id_or_path, int): id = id_or_path else: attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_) if attr["is_directory"]: raise IsADirectoryError(errno.EISDIR, f"{attr['path']!r} (id={attr['id']!r}) is a directory") id = attr["id"] return (yield partial( self.client.share_download_url, { "share_code": self.share_code, "receive_code": self.receive_code, "file_id": id, }, headers=headers, use_web_api=attr.get("violated", False) and attr["size"] < 1024 * 1024 * 115, request=self.async_request if async_ else self.request, async_=async_, )) return run_gen_step(gen_step, async_=async_) @overload def iterdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, start: int = 0, stop: None | int = None, page_size: int = 1_000, refresh: bool = False, *, async_: Literal[False] = False, **payload, ) -> Iterator[AttrDict]: ... @overload def iterdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, start: int = 0, stop: None | int = None, page_size: int = 1_000, refresh: bool = False, *, async_: Literal[True], **payload, ) -> AsyncIterator[AttrDict]: ... def iterdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, start: int = 0, stop: None | int = None, page_size: int = 1_000, refresh: bool = False, *, async_: Literal[False, True] = False, **payload, ) -> Iterator[AttrDict] | AsyncIterator[AttrDict]: """迭代获取目录内直属的文件或目录的信息 :param payload: - limit: int = 32 - offset: int = 0 - asc: 0 | 1 = # 是否升序排列 - o: str = # 用某字段排序: # - 文件名:"file_name" # - 文件大小:"file_size" # - 文件种类:"file_type" # - 修改时间:"user_utime" # - 创建时间:"user_ptime" # - 上次打开时间:"user_otime" """ path_class = type(self).path_class if page_size <= 0: page_size = 1_000 share_code = self.share_code receive_code = self.receive_code def gen_step(): nonlocal start, stop if stop is not None and (start >= 0 and stop >= 0 or start < 0 and stop < 0) and start >= stop: return () if isinstance(id_or_path, int): attr = yield partial(self._attr, id_or_path, async_=async_) elif isinstance(id_or_path, AttrDict): attr = id_or_path elif isinstance(id_or_path, path_class): attr = id_or_path.attr else: attr = yield partial( self._attr_path, id_or_path, pid=pid, ensure_dir=True, async_=async_, ) #if not attr["is_directory"]: try: isDir = attr["is_directory"] except: attr=attr() isDir = attr["is_directory"] if not isDir: raise NotADirectoryError( errno.ENOTDIR, f"{attr['path']!r} (id={attr['id']!r}) is not a directory", ) id = attr["id"] ancestors = attr["ancestors"] children: Sequence[AttrDict] try: if refresh: raise KeyError children = self.pid_to_children[id] except KeyError: payload["cid"] = id payload["limit"] = page_size offset = int(payload.setdefault("offset", 0)) if offset < 0: offset = payload["offset"] = 0 else: payload["offset"] = 0 dirname = attr["path"] get_files = self.fs_files path_to_id = self.path_to_id ls: list[AttrDict] = [] add = ls.append resp = yield partial(get_files, payload, async_=async_) try: data = resp["data"] except: resp=resp() data = resp["data"] for attr in map(normalize_attr, data["list"]): attr["ancestors"] = [*ancestors, {"id": attr["id"], "name": attr["name"]}] path = attr["path"] = joinpath(dirname, escape(attr["name"])) attr["share_code"] = share_code attr["receive_code"] = receive_code path_to_id[path + "/"[:attr["is_directory"]]] = attr["id"] add(attr) for _ in range((data["count"] - 1) // page_size): payload["offset"] += page_size resp = yield partial(get_files, payload, async_=async_) try: data = resp["data"] except: resp=resp() data = resp["data"] for attr in map(normalize_attr, data["list"]): attr["ancestors"] = [*ancestors, {"id": attr["id"], "name": attr["name"]}] path = attr["path"] = joinpath(dirname, escape(attr["name"])) attr["share_code"] = share_code attr["receive_code"] = receive_code path_to_id[path + "/"[:attr["is_directory"]]] = attr["id"] add(attr) children = self.pid_to_children[id] = tuple(ls) self.id_to_attr.update((attr["id"], attr) for attr in children) else: count = len(children) if start < 0: start += count if start < 0: start = 0 if stop is None: stop = count elif stop < 0: stop += count if start >= stop or stop <= 0 or start >= count: return () key: None | Callable match payload.get("o"): case "file_name": key = lambda attr: attr["name"] case "file_size": key = lambda attr: attr.get("size") or 0 case "file_type": key = lambda attr: attr.get("ico", "folder" if attr["is_directory"] else "") case "user_utime" | "user_ptime" | "user_otime": key = lambda attr: attr["time"] case _: key = None if key: children = sorted(children, key=key, reverse=payload.get("asc", True)) return YieldFrom(children[start:stop]) return run_gen_step_iter(gen_step, may_call=False, async_=async_) @overload def receive( self, ids: int | str | Iterable[int | str], /, to_pid: int = 0, *, async_: Literal[False] = False, ) -> dict: ... @overload def receive( self, ids: int | str | Iterable[int | str], /, to_pid: int = 0, *, async_: Literal[True], ) -> Coroutine[Any, Any, dict]: ... def receive( self, ids: int | str | Iterable[int | str], /, to_pid: int = 0, *, async_: Literal[False, True] = False, ) -> dict | Coroutine[Any, Any, dict]: """接收分享文件到网盘 :param ids: 要转存到文件 id(这些 id 归属分享链接) :param to_pid: 你的网盘的一个目录 id(这个 id 归属你的网盘) """ def gen_step(): nonlocal ids if isinstance(ids, int): ids = str(ids) elif isinstance(ids, Iterable): ids = ",".join(map(str, ids)) if not ids: raise ValueError("no id (to file) to receive") payload = { "share_code": self.share_code, "receive_code": self.receive_code, "file_id": ids, "cid": to_pid, } return (yield partial( self.client.share_receive, payload, request=self.async_request if async_ else self.request, async_=async_, )) return run_gen_step(gen_step, async_=async_) @overload def search( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, page_size: int = 1_000, *, async_: Literal[False] = False, **payload, ) -> Iterator[P115SharePath]: ... @overload def search( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, page_size: int = 1_000, *, async_: Literal[True], **payload, ) -> AsyncIterator[P115SharePath]: ... def search( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, page_size: int = 1_000, *, async_: Literal[False, True] = False, **payload, ) -> Iterator[P115SharePath] | AsyncIterator[P115SharePath]: """搜索目录 :param payload: - share_code: str = 💡 分享码 - receive_code: str = 💡 接收码(即密码) - limit: int = 32 💡 一页大小,意思就是 page_size - offset: int = 0 💡 索引偏移,索引从 0 开始计算 - search_value: str = "." 💡 搜索文本,仅支持搜索文件名 - suffix: str = 💡 文件后缀(扩展名),优先级高于 `type` - type: int = 💡 文件类型 - 0: 全部 - 1: 文档 - 2: 图片 - 3: 音频 - 4: 视频 - 5: 压缩包 - 6: 应用 - 7: 书籍 - 99: 仅文件 """ if page_size <= 0: page_size = 1_000 def gen_step(): attr = yield self.attr(id_or_path, pid=pid, async_=async_) if attr["is_directory"]: payload["cid"] = attr["id"] else: payload["cid"] = attr["parent_id"] payload["limit"] = page_size offset = int(payload.setdefault("offset", 0)) if offset < 0: payload["offset"] = 0 search = self.fs_search while True: resp = yield search(payload, async_=async_) ls = resp["data"]["list"] if not ls: return for attr in ls: attr = normalize_attr(attr) yield Yield(P115SharePath(self, attr)) offset = payload["offset"] = offset + resp["page_size"] if offset >= resp["count"] or offset >= 10_000: break if offset + page_size > 10_000: payload["page_size"] = 10_000 - offset return run_gen_step_iter(gen_step, may_call=False, async_=async_) @overload def stat( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False] = False, ) -> stat_result: ... @overload def stat( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[True], ) -> Coroutine[Any, Any, stat_result]: ... def stat( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False, True] = False, ) -> stat_result | Coroutine[Any, Any, stat_result]: "检查路径的属性,就像 `os.stat`" def gen_step(): attr = yield partial(self.attr, id_or_path, pid=pid, async_=async_) is_dir = attr["is_directory"] timestamp: float = attr["timestamp"] return stat_result(( (S_IFDIR if is_dir else S_IFREG) | 0o444, # mode cast(int, attr["id"]), # ino cast(int, attr["parent_id"]), # dev 1, # nlink self.user_id, # uid 1, # gid cast(int, 0 if is_dir else attr["size"]), # size timestamp, # atime timestamp, # mtime timestamp, # ctime )) return run_gen_step(gen_step, async_=async_)