2025-05-10 06:57:01 +08:00
#!/usr/bin/env python3
import argparse
from io import BytesIO
from datetime import datetime
from wsgidav . dav_provider import DAVProvider , _DAVResource
from wsgidav . wsgidav_app import WsgiDAVApp
from wsgidav . util import init_logging
from cheroot import wsgi
import yaml
class Node :
""" 虚拟文件系统节点 """
2025-05-10 11:57:09 +08:00
def __init__ ( self , name , is_dir = False , size = 0 ) :
2025-05-10 06:57:01 +08:00
self . name = name
self . is_dir = is_dir
self . children = { }
self . content = b " "
self . last_modified = datetime . now ( )
2025-05-10 11:57:09 +08:00
self . size = size
2025-05-10 06:57:01 +08:00
def parse_paths ( text_lines ) :
""" 解析完整路径配置文件 """
root = Node ( ' / ' , is_dir = True )
for line in text_lines :
line = line . strip ( )
if not line :
continue
2025-05-10 11:57:09 +08:00
arr = line . split ( ' \t ' )
line = arr [ 0 ]
size = 0
if len ( arr ) > 1 :
try :
size = int ( arr [ 1 ] )
except :
print ( " error line: " + line )
2025-05-10 06:57:01 +08:00
is_dir = line . endswith ( ' / ' )
stripped = line . strip ( ' / ' )
parts = stripped . split ( ' / ' ) if stripped else [ ]
current = root
if is_dir :
# 目录路径处理
for part in parts :
if not part :
continue
if part not in current . children :
current . children [ part ] = Node ( part , is_dir = True )
current = current . children [ part ]
else :
# 文件路径处理
if not parts :
# 根目录文件(如 /file.txt)
if stripped not in current . children :
2025-05-10 11:57:09 +08:00
current . children [ stripped ] = Node ( stripped , is_dir = False , size = size )
2025-05-10 06:57:01 +08:00
continue
dirs , file_name = parts [ : - 1 ] , parts [ - 1 ]
for part in dirs :
if not part :
continue
if part not in current . children :
current . children [ part ] = Node ( part , is_dir = True )
current = current . children [ part ]
if file_name not in current . children :
2025-05-10 11:57:09 +08:00
current . children [ file_name ] = Node ( file_name , is_dir = False , size = size )
2025-05-10 06:57:01 +08:00
return root
class VirtualFSProvider ( DAVProvider ) :
""" WebDAV虚拟文件系统提供程序 """
def __init__ ( self , root_node ) :
super ( ) . __init__ ( )
self . root_node = root_node
def get_resource_inst ( self , path , environ ) :
if path == ' / ' :
return VirtualResource ( ' / ' , self . root_node , environ )
parts = [ p for p in path . strip ( ' / ' ) . split ( ' / ' ) if p ]
current = self . root_node
for part in parts :
if part in current . children :
current = current . children [ part ]
else :
return None
return VirtualResource ( path , current , environ )
class VirtualResource ( _DAVResource ) :
""" 虚拟资源实现 """
def __init__ ( self , path , node , environ ) :
super ( ) . __init__ ( path + ' / ' if node . is_dir and not path . endswith ( ' / ' ) else path , environ = environ , is_collection = True if node . is_dir else False )
#super().__init__(path + '/' if node.is_dir and not path.endswith('/') else path, environ=environ, is_collection=False if node.is_dir and not path.endswith('/') else False)
self . node = node
self . environ = environ
def get_display_name ( self ) :
return self . node . name
def is_collection ( self ) :
return self . node . is_dir
def get_content_length ( self ) :
return len ( self . node . content )
def get_content_type ( self ) :
return ' httpd/unix-directory ' if self . node . is_dir else ' application/octet-stream '
def get_member_names ( self ) :
return list ( self . node . children . keys ( ) )
def get_member ( self , name ) :
child = self . node . children . get ( name )
if not child :
return None
child_path = f " { self . path . rstrip ( ' / ' ) } / { name } "
return VirtualResource ( child_path , child , self . environ )
def support_ranges ( self ) :
return True if self . node . is_dir else False
def support_etag ( self ) :
return True if self . node . is_dir else False
def get_content ( self ) :
2025-05-10 11:57:09 +08:00
content = " VIRTUAL " . encode ( ' utf-8 ' )
buf = BytesIO ( content )
buf . seek ( 0 )
return buf
def get_content_type ( self ) :
return " application/octet-stream "
def get_content_length ( self ) :
return self . node . size
2025-05-10 06:57:01 +08:00
def main ( ) :
parser = argparse . ArgumentParser ( description = ' WebDAV路径模拟服务器 ' )
parser . add_argument ( ' input ' , help = ' 包含完整路径的配置文件 ' )
parser . add_argument ( ' --port ' , type = int , default = 5678 , help = ' 监听端口号 ' )
args = parser . parse_args ( )
with open ( args . input , ' r ' ) as f :
lines = f . readlines ( )
root = parse_paths ( lines )
config = {
' host ' : ' 0.0.0.0 ' ,
' port ' : args . port ,
' provider_mapping ' : { ' /dav ' : VirtualFSProvider ( root ) } ,
' verbose ' : 1 ,
' http_authenticator ' : { " accept_basic " : True } ,
' auth ' : ' basic ' ,
' simple_dc ' : {
' user_mapping ' : {
' /dav ' : {
' guest ' : {
' password ' : ' guest_Api789 ' ,
}
}
}
}
}
print ( config )
app = WsgiDAVApp ( config )
init_logging ( config )
server = wsgi . Server (
( config [ ' host ' ] , config [ ' port ' ] ) ,
app ,
server_name = ' Path-Based WebDAV Server '
)
print ( f " Server running on http:// { config [ ' host ' ] } : { config [ ' port ' ] } /dav " )
try :
server . start ( )
except KeyboardInterrupt :
print ( " \n Server stopped. " )
if __name__ == ' __main__ ' :
main ( )