大多数应用程序都要重复地存储(写入)和检索(读取)某种数据。但是,在大多数情况下,读取次数远远超过写入次数。为了提高效率,我们会使用到一层缓存。缓存是写入到一个位置(例如,一块内存),专门用于对常见请求实现快速检索的行为。
例如,假设客户在一个在线商店上创建个人资料。客户填写他们的姓名、电话号码和地址,我们将这些信息存储到我们的数据库中。由于这些信息在结账过程中的多个步骤都是必需的,因此,值得在首次从数据库中检索它们时,将它们存储在缓存中。这加快了应用程序所有后续检索的处理时间。任何单个请求的检索速度差异可能只有几毫秒,但是,当我们开发云应用同时为数百万用户服务时,这些毫秒加起来就是很大的耗时了。
缓存会给云应用带来一些复杂性,但也提供了优化的机会。我们可以使用两个数据库,而不是使用内存块(以及随之而来的无限制、混乱的性质)!我们可以将一个数据库用作数据存储,一个用作缓存。此外,我们可以为我们的数据存储选择一个针对并发控制等方面进行优化的数据库,以及一个针对快速读写优化的缓存数据库,同时仍然利用 PostgreSQL 为我们提供的一切能力。
准备工作
从 PostgreSQL dellstore2 示例数据库下载dellstore2-normal-1.0.tar.gz文件。
解压它 - 例如,在终端上,您可以使用:
tar -xf dellstore2-normal-1.0.tar.gz
在终端上导航到 dellstore2-normal-1.0 文件夹:
cd dellstore2-normal-1.0
接下来,创建一个新数据库dellstore:
CREATE DATABASE dellstore;
然后连接到正确的数据库:
\c dellstore
当您连接到dellstore数据库时,提示符应更改为dellstore=>。
通过输入以下命令导入数据:
\i dellstore2-normal-1.0.sql
然后检查在数据库中创建了哪些对象:
\d
输出应如下所示:
List of relations
Schema | Name | Type | Owner
--------+--------------------------+----------+----------
public | categories | table | postgres
public | categories_category_seq | sequence | postgres
public | cust_hist | table | postgres
public | customers | table | postgres
public | customers_customerid_seq | sequence | postgres
public | inventory | table | postgres
public | orderlines | table | postgres
public | orders | table | postgres
public | orders_orderid_seq | sequence | postgres
public | products | table | postgres
public | products_prod_id_seq | sequence | postgres
public | reorder | table | postgres
(12 rows)
通过输入\q退出psql,然后离开dellstore2-normal-1.0目录。
创建一个简单的应用程序
我们将创建一个read_count函数,它会根据一个给定的客户 ID,对数据库中的记录执行一次COUNT计算。
这是一个有意的缓慢操作,因为COUNT会逐个记录地遍历整个数据库。在大多数生产场景中,您不会经常这样做。
在现实生活中,一个更好的慢查询例子是,计算电子商务网站上购物篮的结账价格。这需要大量的数据库访问,需要花一些时间,我们不希望仅仅因为客户点击了“刷新”,就重新计算所有内容。
我们可以添加read_count方法,来查询我们的数据库中有多少订单(在 psycopg 文档中的基础模块用法,你可以看到如何使用psycopg3进行 SQL 查询的简要介绍):
@app.get("/count")
def read_count(customerid):
conn = connect_to_postgres()
try:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM orders WHERE customerid = %s;', (customerid,))
count = cursor.fetchone()[0]
finally:
conn.close()
return {'count': count}
让我们测试下应用程序的第一个工作版本。
我们为什么要缓存?
当您运行如上所示的应用程序时,在每次调用read_count()方法时代码会执行COUNT - 在每次进行一次 GET 查询时。我们没有将结果存储在任何地方,因此每次都需要执行这种昂贵而缓慢的操作。
使用 Redis 作为缓存解决了这个问题:我们运行一个所有后端都可以访问的 Redis 实例。这让我们可以将read_count()的结果存储在代码和 PostgreSQL 数据库以外的地方。如果我们正在运行一个现代的云应用,我们后端的其他副本都可以访问read_count()的结果,我们不需要经常运行昂贵的函数。由于 Redis 存储的数据不会被读取或写到磁盘,因此我们可以实现非常低的延迟。
连接到 Redis 并缓存 GET 方法
现在让我们为read_count函数添加缓存。
最后,修改read_count()函数,在调用时向 Redis 添加值。我们通过以下方式做到这一点:
• 使用connect_to_redis()函数连接到 Redis。
• 创建一个变量来存储我们的缓存键,orders:count。最好给缓存键起一个有意义的名字。
• 在 Redis 中查找存储在orders:count键下的任何值。
如果在 Redis 中找到一个值,我们返回该值并退出函数。
如果没有找到,我们连接到 PostgreSQL,并运行SELECT COUNT (*) FROM orders语句,将该值添加到 Redis 缓存中,并在退出之前返回该值。
read_count()函数现在应该是这样的:
@app.get("/count")
def read_count(customerid):
cache_key = 'orders:count:{0}'.format(customerid)
redis_conn = connect_to_redis()
# Is it already in the cache? If so, just use it
count = redis_conn.get(cache_key)
if count:
logging.debug(f'Found {cache_key}, value is {count}')
return {'count': count}
pg_conn = connect_to_postgres()
try:
cursor = pg_conn.cursor()
cursor.execute('SELECT COUNT(*) FROM orders WHERE customerid = %s;', (customerid,))
count = cursor.fetchone()[0]
finally:
pg_conn.close()
# Remember to add it to the cache
logging.debug(f'Caching {cache_key}, value is {count}')
redis_conn.set(cache_key, count)
return {'count': count}
让我们测试一下对应用程序的更改。
添加一个 POST 方法到应用程序
接下来,让我们向应用程序添加一个 POST 函数,以便我们可以将新订单添加到我们的 PostgreSQL 数据库中。
@app.post("/add/")
def post_add(item: Item):
"""Add a new order."""
order = item.dict()
logging.debug(f'Adding order {order}')
pg_conn = connect_to_postgres()
try:
cursor = pg_conn.cursor()
cursor.execute(
'INSERT INTO orders'
' (orderdate, customerid, netamount, tax, totalamount)'
' VALUES (%s, %s, %s, %s, %s);',
(item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax),
)
pg_conn.commit()
logging.debug('Added new order')
finally:
pg_conn.close()
return {'updated': {'customer': item.customerid}}
我们可以测试一下。然后连接到数据库,运行一个查询:
select * from orders where customerid = 9999;
它应该输出如下结果:
orderid | orderdate | customerid | netamount | tax | totalamount
---------+------------+------------+-----------+------+-------------
7918 | 2004-08-28 | 9999 | 51.19 | 4.22 | 55.41
12006 | 2023-03-10 | 9999 | 23.00 | 2.00 | 25.00
(2 rows)
这里我们有一个问题:我们的缓存过期了。我们只在第一次调用read_count()时更新该缓存。当我们从另一个方法更新数据库时,我们不会更新该缓存来反映更改。
处理过期的缓存
缓存失效是软件开发中的常见问题。我们的缓存只有在准确无误的情况下才有用。那么,我们需要做些什么,来确保我们的缓存是准确的呢?
好吧,对于初学者来说,我们需要确保每当我们调用 POST 方法时,我们的count缓存都会失效。
使缓存失效
我们可以通过在添加新订单时“丢弃”缓存值,来解决缓存过时的问题。要做到这一点,我们需要在post_add函数的开头添加以下行,就在调用connect_to_postgres()之前(注释不是绝对必要的,但它们使我们所做的事情更清楚):
# Invalidate the cache entry whether we succeed in doing the update or not
# - we don't expect the update to fail, and it shouldn't hurt to clear the
# cache a bit too often
cache_key = 'orders:count:{0}'.format(customerid)
redis_conn = connect_to_redis()
# The Redis command is `del`, but that's special in Python
redis_conn.delete(cache_key)
实际的 Redis 命令名为del,但这是一个 Python 的保留字,所以不能用作方法名。Redis 库选择了delete作为一个很好的替代。
整个函数现在应该是这样的:
@app.post("/add/")
def post_add(item: Item):
"""Add a new order."""
order = item.dict()
logging.debug(f'Adding order {order}')
# Invalidate the cache entry whether we succeed in doing the update or not
# - we don't expect the update to fail, and it shouldn't hurt to clear the
# cache a bit too often
cache_key = 'orders:count:{0}'.format(item.customerid)
redis_conn = connect_to_redis()
# The Redis command is `del`, but that's special in Python
redis_conn.delete(cache_key)
pg_conn = connect_to_postgres()
try:
cursor = pg_conn.cursor()
cursor.execute(
'INSERT INTO orders'
' (orderdate, customerid, netamount, tax, totalamount)'
' VALUES (%s, %s, %s, %s, %s);',
(item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax),
)
pg_conn.commit()
logging.debug('Added new order')
finally:
pg_conn.close()
return {'updated': {'customer': item.customerid}}
指定 TTL(“生存时间”)
接下来,让我们设置一个生存时间,即 TTL。
这是使用缓存时的一种常见做法:它指定了我们应该让缓存保持多长时间有效。当我们向应用程序添加更多函数和微服务时,其他应用程序可能会在后台修改数据库条目。TTL 可确保我们定期使缓存失效,从而减少缓存值同基础数据不一致的可能性。
让我们为缓存超时定义一个以秒为单位的变量,并通过在read_count函数的末尾更改 Redis 的set方法调用,来使用该缓存超时值。我们只需要添加ex参数来指定超时:
CACHE_TIMEOUT_SECONDS = 5
redis_conn.set(cache_key, count, ex=CACHE_TIMEOUT_SECONDS)
现在,如果我们 POST 一个新的订单,下一次 GET 时的计数值会被缓存。然后,我们会看到后续的 GET 请求,只有在CACHE_TIMEOUT_SECONDS时间内才能找到该缓存值。