Files
tv/webdav_simulator.py

204 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
from io import BytesIO
from datetime import datetime
from wsgidav.dav_provider import DAVProvider,_DAVResource
from wsgidav.wsgidav_app import WsgiDAVApp
from wsgidav.util import init_logging
from cheroot import wsgi
#import yaml
import sys
import os
class Node:
"""虚拟文件系统节点"""
def __init__(self, name, is_dir=False, size=0):
self.name = name
self.is_dir = is_dir
self.children = {}
self.content = b""
self.last_modified = datetime.now()
self.size = size
def parse_paths(text_lines):
"""解析完整路径配置文件"""
root = Node('/', is_dir=True)
for line in text_lines:
line = line.strip()
if not line:
continue
arr = line.split('\t')
line = arr[0]
size = 0
if len(arr)>1:
try:
size = int(arr[1])
except:
print("error line:"+line)
is_dir = line.endswith('/')
stripped = line.strip('/')
parts = stripped.split('/') if stripped else []
current = root
if is_dir:
# 目录路径处理
for part in parts:
if not part:
continue
if part not in current.children:
current.children[part] = Node(part, is_dir=True)
current = current.children[part]
else:
# 文件路径处理
if not parts:
# 根目录文件(如 /file.txt
if stripped not in current.children:
current.children[stripped] = Node(stripped, is_dir=False, size=size)
continue
dirs, file_name = parts[:-1], parts[-1]
for part in dirs:
if not part:
continue
if part not in current.children:
current.children[part] = Node(part, is_dir=True)
current = current.children[part]
if file_name not in current.children:
current.children[file_name] = Node(file_name, is_dir=False, size=size)
return root
class VirtualFSProvider(DAVProvider):
"""WebDAV虚拟文件系统提供程序"""
def __init__(self, root_node):
super().__init__()
self.root_node = root_node
def get_resource_inst(self, path, environ):
if path == '/':
return VirtualResource('/', self.root_node, environ)
parts = [p for p in path.strip('/').split('/') if p]
current = self.root_node
for part in parts:
if part in current.children:
current = current.children[part]
else:
return None
return VirtualResource(path, current, environ)
class VirtualResource(_DAVResource):
"""虚拟资源实现"""
def __init__(self, path, node, environ):
super().__init__(path + '/' if node.is_dir and not path.endswith('/') else path, environ=environ, is_collection=True if node.is_dir else False)
#super().__init__(path + '/' if node.is_dir and not path.endswith('/') else path, environ=environ, is_collection=False if node.is_dir and not path.endswith('/') else False)
self.node = node
self.environ = environ
def get_display_name(self):
return self.node.name
def is_collection(self):
return self.node.is_dir
#def get_content_length(self):
# return len(self.node.content)
def get_content_type(self):
return 'httpd/unix-directory' if self.node.is_dir else 'application/octet-stream'
def get_member_names(self):
return list(self.node.children.keys())
def get_member(self, name):
child = self.node.children.get(name)
if not child:
return None
child_path = f"{self.path.rstrip('/')}/{name}"
return VirtualResource(child_path, child, self.environ)
def support_ranges(self):
return True if self.node.is_dir else False
def support_etag(self):
return True if self.node.is_dir else False
def get_content(self):
content = "VIRTUAL".encode('utf-8')
buf = BytesIO(content)
buf.seek(0)
return buf
def get_content_length(self):
return self.node.size
def get_property_names(self, is_allprop):
if self.is_collection:
# 返回集合资源的属性名称
return ["{DAV:}resourcetype", "{DAV:}displayname"]
else:
# 返回非集合资源的属性名称
return ["{DAV:}resourcetype", "{DAV:}displayname", "{DAV:}getcontentlength"]
def get_htdocs_path():
if getattr(sys, 'frozen', False): # 判断是否为打包环境
base_path = sys._MEIPASS # 临时解压目录:ml-citation{ref="7,8" data="citationList"}
else:
base_path = os.path.dirname(__file__)
return os.path.join(base_path, "htdocs")
def main():
parser = argparse.ArgumentParser(description='WebDAV路径模拟服务器')
parser.add_argument('input', help='包含完整路径的配置文件')
parser.add_argument('--port', type=int, default=5678, help='监听端口号')
args = parser.parse_args()
with open(args.input, 'r', encoding='utf-8') as f:
lines = f.readlines()
root = parse_paths(lines)
config = {
'host': '0.0.0.0',
'port': args.port,
'provider_mapping':
{
'/dav': VirtualFSProvider(root),
},
'verbose': 9,
'http_authenticator': {"accept_basic": True},
'auth':'basic',
'simple_dc': {
'user_mapping': {
'/dav': {
'guest':{
'password':'guest_Api789',
}
}
}
},
"dir_browser": {
"enable": True, # 启用目录浏览功能:ml-citation{ref="6" data="citationList"}
"htdocs_path": get_htdocs_path(), # 手动指定绝对路径
},
}
print(config)
app = WsgiDAVApp(config)
init_logging(config)
server = wsgi.Server(
(config['host'], config['port']),
app,
server_name='Path-Based WebDAV Server'
)
print(f"Server running on http://{config['host']}:{config['port']}/dav")
try:
server.start()
except KeyboardInterrupt:
print("\nServer stopped.")
if __name__ == '__main__':
main()