session - python 2 SSL服务器,具有会话处理功能

  显示原文与译文双语对照的内容
0 0

我搜索了很长时间,找到了标题中提到的实现,但找不到任何内容。 所以我自己实现了它。 我在这里发布了代码,别人可能会觉得有用。

如果有人发现错误我愿意修复他们。 下面是代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from hashlib import sha256
import hmac
import uuid
import time
from datetime import datetime
import ssl
import socket
import SocketServer
import BaseHTTPServer
import SimpleHTTPServer
import SimpleXMLRPCServer
from xmlrpclib import Fault
# Configure below
LISTEN_HOST = '127.0.0.1' # You should not use '' here, unless you have a real FQDN.
LISTEN_PORT = 2048
KEYFILE = os.path.join('certs', 'server.key') # Replace with your PEM formatted key file
CERTFILE = os.path.join('certs', 'server.crt') # Replace with your PEM formatted certificate file
# 2011/01/01 in UTC
EPOCH = 1293840000
def require_login(decorated_function):
"""
 Decorator that prevents access to action if not logged in.
 If the login check failed a xmlrpclib.Fault exception is raised
"""
 def wrapper(self, session_id, *args, **kwargs):
""" Decorated methods must always have self and session_id"""
 # check if a valid session is available
 if not self.sessions.has_key(session_id):
 self._clear_expired_sessions() # clean the session dict
 raise Fault("Session ID invalid","Call login(user, pass) to aquire a valid session")
 last_visit = self.sessions[session_id]["last_visit"]
 # check if timestamp is valid
 if is_timestamp_expired(last_visit):
 self._clear_expired_sessions() # clean the session dict
 raise Fault("Session ID expired","Call login(user, pass) to aquire a valid session")
 self.sessions[session_id]["last_visit"] = get_timestamp()
 return decorated_function(self, session_id, *args, **kwargs)
 return wrapper
def timestamp_to_datetime(timestamp):
"""
 Convert a timestamp from 'get_timestamp' into a datetime object
 Args:
 ts: An integer timestamp
 Returns:
 A datetime object
"""
 return datetime.utcfromtimestamp(timestamp + EPOCH)
def get_timestamp():
"""
 Returns the seconds since 1/1/2011.
 Returns:
 A integer timestamp
"""
 return int(time.time() - EPOCH)
def is_timestamp_expired(timestamp, max_age = 2700): # maxage in seconds (here: 2700 = 45 min)
"""
 Checks if the given timestamp is expired
 Args:
 timestamp: An integer timestamp
 max_age : The maximal allowd age of the timestamp in seconds
 Returns:
 True if the timestamp is expired or False if the timestamp is valid
"""
 age = get_timestamp() - timestamp
 if age> max_age:
 return True
 return False
class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
 def __init__(self, server_address, HandlerClass, logRequests=True, allow_none=False):
"""
 Secure XML-RPC server.
 It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
"""
 self.logRequests = logRequests
 self.allow_none = True
 SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, self.allow_none, None)
 SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
 self.socket = ssl.wrap_socket(socket.socket(), server_side=True, certfile=CERTFILE,
 keyfile=KEYFILE, ssl_version=ssl.PROTOCOL_SSLv23)
 self.server_bind()
 self.server_activate()
class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
"""
 Secure XML-RPC request handler class.
 It it very similar to SimpleXMLRPCRequestHandler but it uses HTTPS for transporting XML data.
"""
 def setup(self):
 self.connection = self.request
 self.rfile = socket._fileobject(self.request,"rb", self.rbufsize)
 self.wfile = socket._fileobject(self.request,"wb", self.wbufsize)
 def do_POST(self):
"""Handles the HTTPS POST request.
 It was copied out from SimpleXMLRPCServer.py and modified to shutdown the socket cleanly.
"""
 try:
 # get arguments
 data = self.rfile.read(int(self.headers["content-length"]))
 # In previous versions of SimpleXMLRPCServer, _dispatch
 # could be overridden in this class, instead of in
 # SimpleXMLRPCDispatcher. To maintain backwards compatibility,
 # check to see if a subclass implements _dispatch and dispatch
 # using that method if present.
 response = self.server._marshaled_dispatch(
 data, getattr(self, '_dispatch', None)
 )
 except Exception as e: # This should only happen if the module is buggy
 # internal error, report as HTTP server error
 self.send_response(500)
 self.end_headers()
 else:
 # got a valid XML RPC response
 self.send_response(200)
 self.send_header("Content-type","text/xml")
 self.send_header("Content-length", str(len(response)))
 self.end_headers()
 self.wfile.write(response)
 # shut down the connection
 self.wfile.flush()
 #modified as of http://docs.python.org/library/ssl.html
 self.connection.shutdown(socket.SHUT_RDWR)
 self.connection.close()
class XMLRPCHandler:
"""
 Example implementation for login handling
"""
 def __init__(self):
 self.users = {"test":"test","foo":"bar"} # replace with your own authentication
 self.sessions = dict()
 self.session_key = os.urandom(32)
 def _find_session_by_username(self, username):
"""
 Try to find a valid session by username.
 Args:
 username: The username to search for
 Returns:
 If a session is found it is returned otherwise None is returned
"""
 for session in self.sessions.itervalues():
 if session["username"] == username:
 return session
 def _invalidate_session_id(self, session_id):
"""
 Remove a session.
 Args:
 session_id: The session which should be removed
"""
 try:
 del self.sessions[session_id]
 except KeyError:
 pass
 def _clear_expired_sessions(self):
"""
 Clear all expired sessions
"""
 for session_id in self.sessions.keys():
 last_visit = self.sessions[session_id]["last_visit"]
 if is_timestamp_expired(last_visit):
 self._invalidate_session_id(session_id)
 def _generate_session_id(self, username):
"""
 Generates a new session id
 Returns:
 A new unique session_id
"""
 return hmac.new(self.session_key, username + str(uuid.uuid4()), sha256).hexdigest()
 def login(self, username, password):
"""
 Handle the login procedure. If the login is successfull the session id is returned
 otherwise a xmlrpclib.Fault exception is raised.
 Args:
 username: The username
 password: The password
 Returns:
 A valid session id
 Raises:
 A xmlrpclib.Fault exception is raised
"""
 # check username and password
 if self.users.has_key(username):
 if self.users[username] == password:
 # Check if a session with the username exists
 session = self._find_session_by_username(username)
 if session:
 if is_timestamp_expired(session["last_visit"]):
 self._invalidate_session_id(session["session_id"])
 else:
 return session["session_id"]
 # generate session id and save it
 session_id = self._generate_session_id(username)
 self.sessions[session_id] = {"username" : username,
"session_id": session_id,
"last_visit": get_timestamp()}
 return session_id
 raise Fault("unknown username or password","Please check your username and password")
 @require_login
 def hello(self, session_id, name):
"""
 Example method which requires a login
"""
 if not name:
 raise Fault("unknown recipient","I need someone to greet!")
 return"Hello, %s!" % name
def test():
 server_address = (LISTEN_HOST, LISTEN_PORT)
 server = SecureXMLRPCServer(server_address, SecureXMLRpcRequestHandler)
 server.register_introspection_functions()
 server.register_instance(XMLRPCHandler())
 sa = server.socket.getsockname()
 print"Serving HTTPS on", sa[0],"port", sa[1]
 server.serve_forever()
if __name__ =="__main__":
 test()
""" Testcode for a example client"""
 import time
 def continue_xmlrpc_call(func, *args):
 try:
 ret = func(*args)
 print ret
 return ret
 except xmlrpclib.Fault as e:
 print e
 server = xmlrpclib.ServerProxy("https://localhost:2048")
 print server
 print server.system.listMethods()
 sid = continue_xmlrpc_call(server.login,"foo","bar")
 sid = continue_xmlrpc_call(server.login,"foo","bar")
 continue_xmlrpc_call(server.hello, sid,"World")
 time.sleep(2)
 continue_xmlrpc_call(server.hello, sid,"Invalid")
 continue_xmlrpc_call(server.hello,"193","")
时间:原作者:1个回答

0 0

@Philipp 很棒但我发现loginfunction有点怪。 似乎知道登录用户名的任何人都可以获得会话 id,然后进行。 如果你移动会话代码,使现有会话id仅在输入正确的用户名和密码时返回,我看起来更好。

 # check username and password
 if self.users.has_key(username):
 if self.users[username] == password:
 # Check if a session with the username exists
 session = self._find_session_by_username(username)
 if session:
 if is_timestamp_expired(session["last_visit"]):
 self._invalidate_session_id(session["session_id"])
 else:
 return session["session_id"]
 # generate session id and save it
 session_id = self._generate_session_id(username)
 self.sessions[session_id] = {"username" : username,
"session_id": session_id,
"last_visit": get_timestamp()}
 return session_id
原作者:
...