回顾
我们在上一篇文章中讨论 TDD 开发渗透工具,并且完成了我们 HTTP 代理扫描器的一个功能模块。那么我们接下来就继续使用 TDD 完成接下来的部分,大家也深入理解一下这种开发方式的好处(可是代码量大啊~hhhh)。 如何继续上次的任务?先运行测试用例! 如果上次的任务没有完成,你就需要继续调整你的代码,修改,通过测试用例,这就是你首先要做的,当然如果上次的任务你完成了,测试用例顺利通过,那么当然好啊,接下来你就可以直接再写一个测试用例,开始一个新的功能或者模块了。 继续在之前的文章中我们完成了针对一个地址的代理验证,那么,我们接下来应该怎么做呢?如果我们有一堆地址需要检测,这样我们就要处理两大问题: 安全的多线程(多进程)并发 正确收取结果
短小精干的并发处理这样的话,涉及到多线程编程,保证线程安全什么 BlaBla,对于一个渗透测试爱好者来说,可能会稍微有一点挑战:(我平时写的爆破甚至都是单线程的!),写多线程还是有点麻烦的! 当然我这里也有考虑到这一点,在写过很多类似的脚本之后,最近也算是首创了一种短小精悍的并发框架,具体的开发流程自然也是 TDD,这里我就不会和上次一样很细致的一步一步来讲到底是怎么写的。我们就一起来看一下吧! def test_pool(self):
'''测试并发'''
def demo_task(*args):
'''simulate the plugin.run'''
print '[!] Computing!'
time.sleep(args[0])
print 'Sleep : ', args[0]
print '[!] Finished!'
print
returns = 'Runtime Length : %s' % str(args)
return returns
pool = Pool()
pool.add_task(demo_task, 7)
pool.add_task(demo_task, 3)
q = pool.run()
self.assertIsInstance(q, Queue)
r = q.get()
print r
self.assertIsInstance(r, str)
pool.add_task(demo_task, 4)
pool.add_task(demo_task, 2)
pool.add_task(demo_task, 1)
pool.add_task(demo_task, 6)
pool.add_task(demo_task, 7)
pool.add_task(demo_task, 1)
pool.add_task(demo_task, 6)
pool.run()
r = q.get()
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~' ,r
大家会发现,在测试的过程中,已经写明了这个并发池的基本使用方法,当然这个使用方法不是根据 Pool 源代码写出的,而是我自己“瞎”写的:也就是说,我并不知道源代码,但是我设想我的小框架应该这么写,所以我就这么写了,至于怎么实现,不是我写测试用例需要考虑的问题。所以,现在觉得,这个框架是非常的用户友好啊:至少对我自己来说用起来是非常的舒服。 换句话来说,我在写上面的代码的时候,其实并不知道自己的控制并发的小玩意叫什么内部内部什么构造,需要什么模块,需要什么函数,或者有没有什么特殊属性之类的,就只是“我觉得他应该是这样的”而已。 至于写下了测试用例以后发生了什么我觉得就不用再多说了,大家都可以猜得到就是不停测试不停修改代码直到成功完成需要的功能。(当然这个测试程序的测试代码就不会面面俱到,满足我们需求的代码就OK) 期待的运行结果如下: [!] Computing!
[!] Computing!
Sleep : 3
[!] Finished!
Runtime Length : (3,)
[!] Computing!
[!] Computing!
[!] Computing!
[!] Computing!
[!] Computing!
[!] Computing!
[!] Computing!
Sleep : 1
Sleep : [!] Finished!
1
[!] Finished!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Runtime Length : (1,)
.
----------------------------------------------------------------------
Ran 1 test in 4.033s
OK
Sleep : 2
[!] Finished!
Sleep : 7
[!] Finished!
Sleep : 4
[!] Finished!
Sleep : 6
[!] Finished!
Sleep : 6
[!] Finished!
Sleep : 7
[!] Finished!
获取 3s 和 1s 的两个结果,其他的分开操作不去获取结果,测试在 4.033s 结束,其余的进程后台运行,但是不会被获取结果:成功达到了初衷。 源码在下面: class Pool(object):
"""Thread or Proccess Pool to support the
concurrence of many tasks"""
def __init__(self, thread_max=50, mode='Thread'):
"""Constructor"""
modes = ['thread', 'process']
self.mode = mode.lower() if mode in modes else 'thread'
self.task_list = []
self.result_queue = Queue()
self.signal_name = self._uuid1_str()
self.lock = threading.Lock() if self.mode == 'thread' else multiprocessing.Lock()
self.thread_max = thread_max
self.current_thread_count = 0
def _uuid1_str(self):
'''Returns: random UUID tag '''
return str(uuid.uuid1())
def add_task(self, func, *args, **argv):
'''Add task to Pool and wait to exec
Params:
func : A callable obj, the entity of the current task
args : the args of [func]
argv : the argv of [func]
'''
assert callable(func), '[!] Function can \'t be called'
ret = {}
ret['func'] = func
ret['args'] = args
ret['argv'] = argv
ret['uuid'] = self.signal_name
self.task_list.append(ret)
def run(self):
""""""
self._init_signal()
Thread(target=self._run).start()
return self.result_queue
def _run(self):
""""""
for i in self.task_list:
while self.thread_max <= self.current_thread_count:
time.sleep(0.3)
self._start_task(i)
def _start_task(self, task):
""""""
self.current_thread_count = self.current_thread_count + 1
try:
if self.mode == 'thread':
Thread(target=self._worker, args=(task,)).start()
elif self.mode == 'process':
Process(target=self._worker, args=(task,)).start()
except TypeError:
self.current_thread_count = self.current_thread_count - 1
def _worker(self, dictobj):
""""""
func = dictobj['func']
args = dictobj['args']
argv = dictobj['argv']
result = func(*args, **argv)
self.lock.acquire()
self._add_result_to_queue(result=result)
self.lock.release()
def _add_result_to_queue(self, **kw):
""""""
assert kw.has_key('result'), '[!] Result Error!'
self.result_queue.put(kw['result'])
self.current_thread_count = self.current_thread_count - 1
嗯,总之,这一百行不到代码可以实现: 控制线程数量监控:current_threads 简单易用的一次执行多个任务接口 异步获取结果 随时添加任务
我们想要的是批量执行扫描 IP 代理,所以上面的东西完全满足我们需求了。 下面我就简单来介绍一下上面的可以控制并发的 Pool 的流程,方便需要的读者可以参考一下: 
但是由于我们的工具是自己使用,或者说是实验用,就不考虑 Treading/eventlet/stackless 的选择问题了,也不纠结给这个小工具命名的问题。(虽然我觉得这样说是有一些不负责任的) 获取与解析目标拿到目标地址,我们既然是扫描肯定是批量扫描,那么我们肯定是要批量获取 IP 地址。 我们说的获取 IP 地址当然不是 DNS 查询,我们说的当然是我们需要怎么处理很多个 IP 地址:用户指定一个范围;或者仅仅是随机扫描特定区域的网段。想想,大多也就这样了,不会有新的途径了吧。然后 IP 地址有了,端口呢?我们怎么样选择扫描的端口?首先,我们肯定不能扫描全部的端口,因为这样速度太慢了,而且比如 3306 5432 111 22 25 这些端口显然不可能是一个 http 代理端口,我们就没必要对这些端口进行检查,根据经验来说 80-90, 8080-8090 这些端口是一个 http 代理的可能行比较大,再缩小一点,80,8080,8123,3128,9000。 就先这么多吧! 那么我们获取和解析目标的思路就有了: 获取一堆 IP 地址,然后用这一堆 IP 地址与想要检查的端口进行组合,然后把组合结果分配给我们上面刚刚完成的并发框架,然后收取结果。 好的,思路我们清楚之后,就需要开始动手完成这个部分了,没错,首先就是编写测试: class ParseTargetTest(unittest.case.TestCase):
"""Test Parse Target"""
def runTest(self):
IPs_1 = '123.1.2.0/24'
IPs_2 = '123.1.2.3-123.1.3.5'
ip_gen = generate_target(IPs_1, PORTS)
self.assertIsInstance(ip_gen, iter)
for i in ip_gen:
ip_port = i.split(':')
try:
IPy.IP(ip_port[0])
except ValueError:
pass
ip_gen = generate_target(IPs_2, PORTS)
self.assertIsInstance(ip_gen, iter)
for i in ip_gen:
ip_port = i.split(':')
try:
IPy.IP(ip_port[0])
except ValueError:
pass
这样的测试如果跑通的话,我们只需要通过一个 generate_target 这个函数,就可以解析 IP 地址了,可以解析 IP 地址段,也可以解析单个 IP 地址,返回值是通过 yield 产生 generator。 根据这样的测试用例我们使用 IPy 模块,进行基本的 IP 运算(IP 地址网络运算),我们很容易就可以解决上面的问题: import unittest
import re
import IPy
from IPy import IP
PORTS = ['80', '8080', '8123', '3128', '82']
def generate_target(IPs, ports):
"""Generate the target for scanning HTTP proxy
Returns:
A Generator: """
gen = None
if '-' in IPs:
pairs = IPs.split('-')
start = pairs[0]
end = pairs[1]
gen = int2ips(IP(start).int(), IP(end).int())
else:
gen = IP(IPs)
for i in gen:
for port in ports:
yield ':'.join([i.__str__(), port])
def int2ips(start, end):
""""""
for i in xrange(int(start), int(end)):
yield IPy.intToIp(i, version=4)
上面的代码,看起来甚至还要比测试代码还要短,其实这就是 TDD 的魅力之一啊,编写出最简洁最少的满足功能的代码。 架构探讨与整合功能现在我们的基本功能都完成了,我们现在应该做什么呢?先想一下我们这个功能的具体流程: 生成目标 多线程扫描 收集结果
大致是这个样子,但是我们仔细想一想,好像还需要一点别的东西: 用户交互(命令行工具 or Web 接口工具?) 工具架构以及扩展性(万一我以后除了扫描代理,还想扫描别的东西呢?)
万幸的是,我们完成的功能是分离的,那么我们现在需要考虑的就是怎么把功能拼接起来。 我们一开始定位是一个代理扫描器,但是现在,我们如果改动一下检查代理(proxy_check)模块,如果改成针对某个端口的 EXP 利用的话,这个工具显然也是可以满足针对某个端口(或者针对某项服务)的扫描-探测-利用的。 换一句话来说,我们现在就想实现 proxy_check 的插件化,那么,问题就来了,现在想要实现我们为了实现插件化解耦(当然我们面对的情景很简单),就需要考虑一下如何插件化? 万幸的是 Python 提供了一个 pluginbase 的模块,但是我们并不打算使用这一个模块,就编写一个插件基类,然后让所有的插件去继承这个基类,然后我们在工具中调用插件基类的接口就可以,这样就可以实现调用多种类型的功能模块了,现在也就只需要所有的功能模块都继承插件并且按照一定规范完成功能编写。 这样来想的话,我们的架构也就非常明朗了: 
那么我们发现如果怕这样设计的话,显然我们需要重新编写基类,然后稍微改造一下原来的功能模块。 其实是非常简单的,我们的插件其实只需要传入参数->执行->处理输出就可以了,再加上基本的属性,什么插件名称,插件描述,就可以完成了不是么? class PluginBaseClass(object):
""""""
__mateclass__ = abc.ABCMeta
def __init__(self):
"""Constructor"""
pass
@abc.abstractproperty
def name(self):
""""""
return '...'
@abc.abstractproperty
def description(self):
""""""
return '...'
@abc.abstractmethod
def _work(self, *args, **kwargs):
""""""
return '...'
@abc.abstractmethod
def _parse_result(self, result):
""""""
return "..."
def start(self, *args, **kwargs):
""""""
ret = self._work(*args, **kwargs)
return self._parse_result(ret)
这样我们定义了插件的基类,继承这个插件,必须复写的特性和方法有: name 属性 description 属性 _work 方法 _parse_result 方法
这样满足了最基本插件的要求了吧:至少要有个名字,有个简单描述,有功能函数,结果解析函数。(关于结果解析的必要性呢?我们考虑如果以后需要添加更多的不同类型的功能插件的话,结果肯定不可能是完全统一的,因此我们需要一个解析结果的函数帮我们完成一些工作)。 那么我们主函数负责任务发布,只需要调用所有插件的 start 函数传入任意的参数就可以完成操作了(但是参数的处理我们肯定要先在程序中完成的对吧?)。 重构功能函数!当然我们要把功能函数改造成(或者再次编写)一个插件才能丢给程序执行。怎么改造呢?上一节讲过的,就是继承插件,然后复写该复写的方法和属性就可以完成了。当然,我们动手肯定应该先去写测试方法对不对?先有测试后有代码。值得一说的是,这次在重写测试方法的时候就有点意思了,我们可以把自己暂且当成主程序,主程序来调用这个插件,应该怎么样调用呢? def test_plugin_ins(self):
''''''
pprint('Plugin Test')
ret = CheckProxyPlugin()
_result = ret.start('45.78.5.48:333')
for _ in _result:
pprint(ret.name)
pprint(ret.description)
pprint(_)
大概这就我们理想的样子吧:接口简单完备,完成的功能什么大家心里有数,那么插件究竟是什么样子的呢?其实非常简单。并没有直接去复制代码,而是采用调用原功能的方法进行简单构造: class CheckProxyPlugin(PluginBaseClass):
""""""
_name = 'Check Proxy'
_description = 'Check if the specific addr(IP:PORT) is a \
HTTP(s) proxy'
@property
def name(self):
"""getter for self._name"""
return self._name
@property
def description(self):
"""getter for self._description"""
return self._description
def _work(self, *args, **kwargs):
""""""
|