作者信息
背景
postgres=# select * from test;id----(0 rows)postgres=# insert into test select 1;ERROR: database is not accepting commands to avoid wraparound data loss in database "xxxx"HINT: Stop the postmaster and vacuum that database in single-user mode.You might also need to commit or roll back old prepared transactions.
XID基础原理
XID 定义
typedef uint32 TransactionId; /* 事务号定义,32位无符号整数 */typedef struct HeapTupleFields{TransactionId t_xmin; /* 插入该元组的事务号 */TransactionId t_xmax; /* 删除或锁定该元组的事务号 *//*** 其它属性省略 ***/} HeapTupleFields;struct HeapTupleHeaderData{union{HeapTupleFields t_heap;DatumTupleFields t_datum;} t_choice;/*** 其它属性省略 ***/};
XID 发行机制
// 无效事务号#define InvalidTransactionId ((TransactionId) 0)// 引导事务号,在数据库初始化过程(BKI执行)中使用#define BootstrapTransactionId ((TransactionId) 1)// 冻结事务号用于表示非常陈旧的元组,它们比所有正常事务号都要早(也就是可见)#define FrozenTransactionId ((TransactionId) 2)// 第一个正常事务号#define FirstNormalTransactionId ((TransactionId) 3)// 把 FullTransactionId 的低32位作为无符号整数生成 xid#define XidFromFullTransactionId(x) ((uint32) (x).value)static inline voidFullTransactionIdAdvance(FullTransactionId *dest){dest->value++;while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)dest->value++;}FullTransactionIdGetNewTransactionId(bool isSubXact){/*** 省略 ***/full_xid = ShmemVariableCache->nextFullXid;xid = XidFromFullTransactionId(full_xid);/*** 省略 ***/FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);/*** 省略 ***return full_xid;}static voidAssignTransactionId(TransactionState s){/*** 省略 ***/s->fullTransactionId = GetNewTransactionId(isSubXact);if (!isSubXact)XactTopFullTransactionId = s->fullTransactionId;/*** 省略 ***/}TransactionIdGetTopTransactionId(void){if (!FullTransactionIdIsValid(XactTopFullTransactionId))AssignTransactionId(&TopTransactionStateData);return XidFromFullTransactionId(XactTopFullTransactionId);}
XID 回卷机制
/** TransactionIdPrecedes --- is id1 logically < id2?*/boolTransactionIdPrecedes(TransactionId id1, TransactionId id2){/** If either ID is a permanent XID then we can just do unsigned* comparison. If both are normal, do a modulo-2^32 comparison.*/int32 diff;if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))return (id1 < id2);diff = (int32) (id1 - id2);return (diff < 0);}
XID 回卷预防
除了内核自动冻结回收XID,我们也可以通过命令或者 sql 的方式手动进行 xid 冻结回收
查询数据库或表的年龄,数据库年龄指的是:「最新事务号-数据库中最老事务号」,表年龄指的是:「最新事务号-表中最老事务号」
# 查看每个库的年龄SELECT datname, age(datfrozenxid) FROM pg_database;# 1个库每个表的年龄排序SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc;# 查看1个表的年龄select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;
手动冻结回收一张表的元组的 xid 的sql:
vacuum freeze 表名;
手动冻结回收一个库里面的所有表 xid 的命令:
vacuumdb -d 库名 --freeze --jobs=30 -h 连接串 -p 端口号 -U 库Owner
解决方案
问题分析
voidSetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid){TransactionId xidVacLimit;TransactionId xidWarnLimit;TransactionId xidStopLimit;TransactionId xidWrapLimit;TransactionId curXid;Assert(TransactionIdIsNormal(oldest_datfrozenxid));/** xidWrapLimit = 最老的事务号 + 0x7FFFFFFF,当前事务号一旦到达xidWrapLimit将发生回卷*/xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);if (xidWrapLimit < FirstNormalTransactionId)xidWrapLimit += FirstNormalTransactionId;/** 一旦当前事务号到达xidStopLimit,实例将不可写入,保留 1000000 的xid用于vacuum* 每 vacuum 一张表需要占用一个xid*/xidStopLimit = xidWrapLimit - 1000000;if (xidStopLimit < FirstNormalTransactionId)xidStopLimit -= FirstNormalTransactionId;/** 一旦当前事务号到达xidWarnLimit,将不停地收到* WARNING: database "xxxx" must be vacuumed within 2740112 transactions*/xidWarnLimit = xidStopLimit - 10000000;if (xidWarnLimit < FirstNormalTransactionId)xidWarnLimit -= FirstNormalTransactionId;/** 一旦当前事务号到达xidVacLimit将触发force autovacuums*/xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;if (xidVacLimit < FirstNormalTransactionId)xidVacLimit += FirstNormalTransactionId;/* Grab lock for just long enough to set the new limit values */LWLockAcquire(XidGenLock, LW_EXCLUSIVE);ShmemVariableCache->oldestXid = oldest_datfrozenxid;ShmemVariableCache->xidVacLimit = xidVacLimit;ShmemVariableCache->xidWarnLimit = xidWarnLimit;ShmemVariableCache->xidStopLimit = xidStopLimit;ShmemVariableCache->xidWrapLimit = xidWrapLimit;ShmemVariableCache->oldestXidDB = oldest_datoid;curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);LWLockRelease(XidGenLock);/* Log the info */ereport(DEBUG1,(errmsg("transaction ID wrap limit is %u, limited by database with OID %u",xidWrapLimit, oldest_datoid)));/** 如果 当前事务号>=最老事务号+autovacuum_freeze_max_age* 触发 autovacuum 对年龄最老的数据库进行清理,如果有多个数据库达到要求,按年龄最老的顺序依次清理* 通过设置标志位标记当前 autovacuum 结束之后再来一次 autovacuum*/if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&IsUnderPostmaster && !InRecovery)SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);/* Give an immediate warning if past the wrap warn point */if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery){char *oldest_datname;if (IsTransactionState())oldest_datname = get_database_name(oldest_datoid);elseoldest_datname = NULL;if (oldest_datname)ereport(WARNING,(errmsg("database \"%s\" must be vacuumed within %u transactions",oldest_datname,xidWrapLimit - curXid),errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n""You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));elseereport(WARNING,(errmsg("database with OID %u must be vacuumed within %u transactions",oldest_datoid,xidWrapLimit - curXid),errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n""You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));}}boolTransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2){int32 diff;if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))return (id1 >= id2);diff = (int32) (id1 - id2);return (diff >= 0);}FullTransactionIdGetNewTransactionId(bool isSubXact){/*** 省略 ***/full_xid = ShmemVariableCache->nextFullXid;xid = XidFromFullTransactionId(full_xid);if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit)){TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;Oid oldest_datoid = ShmemVariableCache->oldestXidDB;/*** 省略 ***/if (IsUnderPostmaster &&TransactionIdFollowsOrEquals(xid, xidStopLimit)){char *oldest_datname = get_database_name(oldest_datoid);/* complain even if that DB has disappeared */if (oldest_datname)ereport(ERROR,(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",oldest_datname),errhint("Stop the postmaster and vacuum that database in single-user mode.\n""You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));/*** 省略 ***/}/*** 省略 ***/}/*** 省略 ***/}
问题定位
# 查看每个库的年龄SELECT datname, age(datfrozenxid) FROM pg_database;# 1个库每个表的年龄排序SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc;# 查看1个表的年龄select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;
问题解决
通过上面的第一个 sql,查找年龄最大的数据库,数据库年龄指的是:|最新事务号-数据库中最老事务号|
通过上面第二个 sql,查找年龄最大的表,然后对表依次执行:vacuum freeze 表名,把表中的老事务号冻结回收,表年龄指的是:|最新事务号-表中最老事务号|
运维脚本
单进程 Shell 脚本
# 对指定数据库中年龄最大的前 50 张表进行 vacuum freezefor cmd in `psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd | grep -v row | grep vacuum`; dopsql -U用户名 -p端口号 -h连接串 -d数据库名 -c "$cmd"done
多进程 Python 脚本
from multiprocessing import Poolimport psycopg2args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='数据库名',user='用户名', password='密码')def vacuum_handler(sql):sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; "try:conn = psycopg2.connect(**args)cur = conn.cursor()cur.execute(sql)conn.commit()cur = conn.cursor()cur.execute(sql_str)print cur.fetchall()conn.close()except Exception as e:print str(e)# 对指定数据库中年龄最大的前 1000 张表进行 vacuum freeze,32 个进程并发执行def multi_vacuum():pool = Pool(processes=32)sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;";try:conn = psycopg2.connect(**args)cur = conn.cursor()cur.execute(sql_str)rows = cur.fetchall()for row in rows:cmd = row['vacuum_cmd']pool.apply_async(vacuum_handler, (cmd, ))conn.close()pool.close()pool.join()except Exception as e:print str(e)multi_vacuum()

