安装 Tornado
可以使用以下命令通过 pip 安装 Tornado:
pip install tornado
编写一个简单的 Tornado 应用
1. 导入 Tornado 模块:
import tornado.ioloop
import tornado.web
2. 定义一个请求处理器类:
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, Tornado!")
3. 创建应用对象并指定路由规则:
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
4. 启动 Tornado 服务:
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
处理异步请求
Tornado 是一个基于异步 I/O 的框架,可以处理大量并发连接。以下是一个处理异步请求的简单示例:
import tornado.ioloop
import tornado.web
import time
class AsyncHandler(tornado.web.RequestHandler):
async def get(self):
await self.some_async_method()
self.write("Async request completed!")
async def some_async_method(self):
# 模拟异步操作
await tornado.ioloop.IOLoop.current().run_in_executor(None, time.sleep, 2)
def make_app():
return tornado.web.Application([
(r"/async", AsyncHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
WebSocket 支持
Tornado 提供了对 WebSocket 的支持。以下是一个简单的 WebSocket 示例:
import tornado.ioloop
import tornado.web
import tornado.websocket
class WebSocketHandler(tornado.websocket.WebSocketHandler):
def open(self):
print("WebSocket opened")
def on_message(self, message):
self.write_message(f"You said: {message}")
def on_close(self):
print("WebSocket closed")
def make_app():
return tornado.web.Application([
(r"/ws", WebSocketHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
这是一个简单的 WebSocket 处理程序,它会回应客户端发送的消息。
异步数据库访问
Tornado 可以与异步数据库访问库一起使用,例如 aiomysql、aiopg 等,以实现异步数据库操作。以下是一个使用 aiomysql 的简单示例:
import tornado.ioloop
import tornado.web
import aiomysql
async def db_query():
conn = await aiomysql.connect(host='localhost', port=3306, user='root', password='password', db='testdb')
cursor = await conn.cursor()
await cursor.execute("SELECT * FROM users")
result = await cursor.fetchall()
await conn.close()
return result
class DBHandler(tornado.web.RequestHandler):
async def get(self):
result = await db_query()
self.write(f"Database query result: {result}")
def make_app():
return tornado.web.Application([
(r"/db", DBHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
这个示例演示了如何使用 aiomysql 连接到 MySQL 数据库并进行异步查询。
这只是 Tornado 框架的基本用法,你可以根据具体的需求深入学习 Tornado 的更多功能和特性。有关更详细的信息,请参阅 Tornado 的[官方文档](https://www.tornadoweb.org/en/stable/)。
转载请注明出处:http://www.zyzy.cn/article/detail/7416/Tornado