161 lines
5.1 KiB
Python
161 lines
5.1 KiB
Python
![]() |
#!/usr/bin/env python3
|
|||
|
|
|||
|
import argparse
|
|||
|
from io import BytesIO
|
|||
|
from datetime import datetime
|
|||
|
from wsgidav.dav_provider import DAVProvider,_DAVResource
|
|||
|
from wsgidav.wsgidav_app import WsgiDAVApp
|
|||
|
from wsgidav.util import init_logging
|
|||
|
from cheroot import wsgi
|
|||
|
|
|||
|
import yaml
|
|||
|
|
|||
|
class Node:
|
|||
|
"""虚拟文件系统节点"""
|
|||
|
def __init__(self, name, is_dir=False):
|
|||
|
self.name = name
|
|||
|
self.is_dir = is_dir
|
|||
|
self.children = {}
|
|||
|
self.content = b""
|
|||
|
self.last_modified = datetime.now()
|
|||
|
|
|||
|
def parse_paths(text_lines):
|
|||
|
"""解析完整路径配置文件"""
|
|||
|
root = Node('/', is_dir=True)
|
|||
|
for line in text_lines:
|
|||
|
line = line.strip()
|
|||
|
if not line:
|
|||
|
continue
|
|||
|
|
|||
|
is_dir = line.endswith('/')
|
|||
|
stripped = line.strip('/')
|
|||
|
parts = stripped.split('/') if stripped else []
|
|||
|
current = root
|
|||
|
|
|||
|
if is_dir:
|
|||
|
# 目录路径处理
|
|||
|
for part in parts:
|
|||
|
if not part:
|
|||
|
continue
|
|||
|
if part not in current.children:
|
|||
|
current.children[part] = Node(part, is_dir=True)
|
|||
|
current = current.children[part]
|
|||
|
else:
|
|||
|
# 文件路径处理
|
|||
|
if not parts:
|
|||
|
# 根目录文件(如 /file.txt)
|
|||
|
if stripped not in current.children:
|
|||
|
current.children[stripped] = Node(stripped, is_dir=False)
|
|||
|
continue
|
|||
|
|
|||
|
dirs, file_name = parts[:-1], parts[-1]
|
|||
|
for part in dirs:
|
|||
|
if not part:
|
|||
|
continue
|
|||
|
if part not in current.children:
|
|||
|
current.children[part] = Node(part, is_dir=True)
|
|||
|
current = current.children[part]
|
|||
|
if file_name not in current.children:
|
|||
|
current.children[file_name] = Node(file_name, is_dir=False)
|
|||
|
return root
|
|||
|
|
|||
|
class VirtualFSProvider(DAVProvider):
|
|||
|
"""WebDAV虚拟文件系统提供程序"""
|
|||
|
def __init__(self, root_node):
|
|||
|
super().__init__()
|
|||
|
self.root_node = root_node
|
|||
|
|
|||
|
def get_resource_inst(self, path, environ):
|
|||
|
if path == '/':
|
|||
|
return VirtualResource('/', self.root_node, environ)
|
|||
|
|
|||
|
parts = [p for p in path.strip('/').split('/') if p]
|
|||
|
current = self.root_node
|
|||
|
for part in parts:
|
|||
|
if part in current.children:
|
|||
|
current = current.children[part]
|
|||
|
else:
|
|||
|
return None
|
|||
|
return VirtualResource(path, current, environ)
|
|||
|
|
|||
|
class VirtualResource(_DAVResource):
|
|||
|
"""虚拟资源实现"""
|
|||
|
def __init__(self, path, node, environ):
|
|||
|
super().__init__(path + '/' if node.is_dir and not path.endswith('/') else path, environ=environ, is_collection=True if node.is_dir else False)
|
|||
|
#super().__init__(path + '/' if node.is_dir and not path.endswith('/') else path, environ=environ, is_collection=False if node.is_dir and not path.endswith('/') else False)
|
|||
|
self.node = node
|
|||
|
self.environ = environ
|
|||
|
|
|||
|
def get_display_name(self):
|
|||
|
return self.node.name
|
|||
|
|
|||
|
def is_collection(self):
|
|||
|
return self.node.is_dir
|
|||
|
|
|||
|
def get_content_length(self):
|
|||
|
return len(self.node.content)
|
|||
|
|
|||
|
def get_content_type(self):
|
|||
|
return 'httpd/unix-directory' if self.node.is_dir else 'application/octet-stream'
|
|||
|
|
|||
|
def get_member_names(self):
|
|||
|
return list(self.node.children.keys())
|
|||
|
|
|||
|
def get_member(self, name):
|
|||
|
child = self.node.children.get(name)
|
|||
|
if not child:
|
|||
|
return None
|
|||
|
child_path = f"{self.path.rstrip('/')}/{name}"
|
|||
|
return VirtualResource(child_path, child, self.environ)
|
|||
|
def support_ranges(self):
|
|||
|
return True if self.node.is_dir else False
|
|||
|
def support_etag(self):
|
|||
|
return True if self.node.is_dir else False
|
|||
|
def get_content(self):
|
|||
|
return BytesIO("VIRTUAL".encode())
|
|||
|
|
|||
|
def main():
|
|||
|
parser = argparse.ArgumentParser(description='WebDAV路径模拟服务器')
|
|||
|
parser.add_argument('input', help='包含完整路径的配置文件')
|
|||
|
parser.add_argument('--port', type=int, default=5678, help='监听端口号')
|
|||
|
args = parser.parse_args()
|
|||
|
|
|||
|
with open(args.input, 'r') as f:
|
|||
|
lines = f.readlines()
|
|||
|
root = parse_paths(lines)
|
|||
|
|
|||
|
config = {
|
|||
|
'host': '0.0.0.0',
|
|||
|
'port': args.port,
|
|||
|
'provider_mapping': {'/dav': VirtualFSProvider(root)},
|
|||
|
'verbose': 1,
|
|||
|
'http_authenticator': {"accept_basic": True},
|
|||
|
'auth':'basic',
|
|||
|
'simple_dc': {
|
|||
|
'user_mapping': {
|
|||
|
'/dav': {
|
|||
|
'guest':{
|
|||
|
'password':'guest_Api789',
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
print(config)
|
|||
|
app = WsgiDAVApp(config)
|
|||
|
init_logging(config)
|
|||
|
|
|||
|
server = wsgi.Server(
|
|||
|
(config['host'], config['port']),
|
|||
|
app,
|
|||
|
server_name='Path-Based WebDAV Server'
|
|||
|
)
|
|||
|
print(f"Server running on http://{config['host']}:{config['port']}/dav")
|
|||
|
try:
|
|||
|
server.start()
|
|||
|
except KeyboardInterrupt:
|
|||
|
print("\nServer stopped.")
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
main()
|