#!/usr/bin/env python3 import argparse from io import BytesIO from datetime import datetime from wsgidav.dav_provider import DAVProvider,_DAVResource from wsgidav.wsgidav_app import WsgiDAVApp from wsgidav.util import init_logging from cheroot import wsgi import yaml class Node: """虚拟文件系统节点""" def __init__(self, name, is_dir=False, size=0): self.name = name self.is_dir = is_dir self.children = {} self.content = b"" self.last_modified = datetime.now() self.size = size def parse_paths(text_lines): """解析完整路径配置文件""" root = Node('/', is_dir=True) for line in text_lines: line = line.strip() if not line: continue arr = line.split('\t') line = arr[0] size = 0 if len(arr)>1: try: size = int(arr[1]) except: print("error line:"+line) is_dir = line.endswith('/') stripped = line.strip('/') parts = stripped.split('/') if stripped else [] current = root if is_dir: # 目录路径处理 for part in parts: if not part: continue if part not in current.children: current.children[part] = Node(part, is_dir=True) current = current.children[part] else: # 文件路径处理 if not parts: # 根目录文件(如 /file.txt) if stripped not in current.children: current.children[stripped] = Node(stripped, is_dir=False, size=size) continue dirs, file_name = parts[:-1], parts[-1] for part in dirs: if not part: continue if part not in current.children: current.children[part] = Node(part, is_dir=True) current = current.children[part] if file_name not in current.children: current.children[file_name] = Node(file_name, is_dir=False, size=size) return root class VirtualFSProvider(DAVProvider): """WebDAV虚拟文件系统提供程序""" def __init__(self, root_node): super().__init__() self.root_node = root_node def get_resource_inst(self, path, environ): if path == '/': return VirtualResource('/', self.root_node, environ) parts = [p for p in path.strip('/').split('/') if p] current = self.root_node for part in parts: if part in current.children: current = current.children[part] else: return None return VirtualResource(path, current, environ) class VirtualResource(_DAVResource): """虚拟资源实现""" def __init__(self, path, node, environ): super().__init__(path + '/' if node.is_dir and not path.endswith('/') else path, environ=environ, is_collection=True if node.is_dir else False) #super().__init__(path + '/' if node.is_dir and not path.endswith('/') else path, environ=environ, is_collection=False if node.is_dir and not path.endswith('/') else False) self.node = node self.environ = environ def get_display_name(self): return self.node.name def is_collection(self): return self.node.is_dir def get_content_length(self): return len(self.node.content) def get_content_type(self): return 'httpd/unix-directory' if self.node.is_dir else 'application/octet-stream' def get_member_names(self): return list(self.node.children.keys()) def get_member(self, name): child = self.node.children.get(name) if not child: return None child_path = f"{self.path.rstrip('/')}/{name}" return VirtualResource(child_path, child, self.environ) def support_ranges(self): return True if self.node.is_dir else False def support_etag(self): return True if self.node.is_dir else False def get_content(self): content = "VIRTUAL".encode('utf-8') buf = BytesIO(content) buf.seek(0) return buf def get_content_type(self): return "application/octet-stream" def get_content_length(self): return self.node.size def main(): parser = argparse.ArgumentParser(description='WebDAV路径模拟服务器') parser.add_argument('input', help='包含完整路径的配置文件') parser.add_argument('--port', type=int, default=5678, help='监听端口号') args = parser.parse_args() with open(args.input, 'r') as f: lines = f.readlines() root = parse_paths(lines) config = { 'host': '0.0.0.0', 'port': args.port, 'provider_mapping': {'/dav': VirtualFSProvider(root)}, 'verbose': 9, 'http_authenticator': {"accept_basic": True}, 'auth':'basic', 'simple_dc': { 'user_mapping': { '/dav': { 'guest':{ 'password':'guest_Api789', } } } } } print(config) app = WsgiDAVApp(config) init_logging(config) server = wsgi.Server( (config['host'], config['port']), app, server_name='Path-Based WebDAV Server' ) print(f"Server running on http://{config['host']}:{config['port']}/dav") try: server.start() except KeyboardInterrupt: print("\nServer stopped.") if __name__ == '__main__': main()