背景 大家在测试dubbo接口是不是特别痛苦?因为dubbo接口并不是比较常见的http协议的,而是dubbo协议的,测试dubbo接口的有几种方法,譬如jmeter自定义sampler调用,java连接zookeeper中心调用dubbo,telnet命令调用dubbo等。
痛点 相信大家都比较熟悉使用jmeter,看了上面的
测试方案,肯定是首选jmeter,但是这里踩坑较多,比如下载的插件与dubbo版本不对应,有时候响应参数出现中文乱码,有时候需要反编译jar包查看对应的入参类型等等...
解决痛点 在网上搜索了一下和看了dubbo接口的用户手册,发现dubbo接口支持telnet命令执行,
Python中刚好有个库可以执行telnet命令 telnetlib库 dubbo接口又是通过zookeeper中心进行注册服务的,那我直接通过zookeeper中心查询dubbo接口相关的信息(ip、端口、服务名、方法名和入参类型)然后模拟telnet命令进行dubbo接口调用,岂不是美滋滋!
实现方案
Python + fastapi + telnetlib + kazoo。
调用流程
telnet命令实操 前提,我们得知了一个dubbo接口的ip和端口(通常可以通过服务名在zk中心搜索得知): telnet 192.168.xx.xx 32024 连接dubbo服务
ls -l cn.com.api.dubbo.xxxxService 查询该服务的方法列表
invoke xxxxService.xxxMethod(1234, "test") 调用服务的方法
通过上述一系列的骚操作,相当于进行了dubbo接口的测试,当然,我们决对不可能通过终端敲命令进行dubbo接口测试,下面我们进行通过代码模拟telnet命令。
核心源码分析
zk中心搜索服务封装 class GetDubboService(object): def __init__(self): #测试环境,ZK_CONFIG为zk中心的注册地址,可传入string或者list,譬如ZK_CONFIG = ['xxx','xxx','xxx']或者ZK_CONFIG = 'xxx' self.hosts = ZK_CONFIG self.zk = self.zk_conn()
def zk_conn(self): try: zk = KazooClient(hosts=self.hosts, timeout=2) zk.start(2) # 与zookeeper连接 except BaseException as e: return False return zk
def get_dubbo_info(self, dubbo_service): global data dubbo_service_data = {} try: #先查出注册中心所有的dubbo服务 all_node = self.zk.get_children('/dubbo') # 根据传入服务名匹配对应的服务 node = [i for i in all_node if dubbo_service.lower() in i.lower()] # 查询dubbo服务的详细信息 #遍历数据,过滤掉空数据 for i in node: if self.zk.get_children(f'/dubbo/{i}/providers'): dubbo_data = self.zk.get_children(f'/dubbo/{i}/providers') for index, a in enumerate(dubbo_data): url = parse.urlparse(parse.unquote(a)).netloc host, port = url.split(":") conn = BmDubbo(host, port) #判断获取的ip地址是否连接成功,因为有些开发本地起了dubbo服务 status = conn.command("") if status: data = dubbo_data[index] break self.zk.stop() except BaseException as e: return dubbo_service_data #parse.unquote 解码 #parse.urlparse 解析URL #parse.query 获取查询参数 #parse.parse_qsl 返回列表 url_data = parse.urlparse(parse.unquote(data)) query_data = dict(parse.parse_qsl(url_data.query)) query_data['methods'] = query_data['methods'].split(",") dubbo_service_data['url'] = url_data.netloc dubbo_service_data['dubbo_service'] = dubbo_service dubbo_service_data.update(query_data) return dubbo_service_data
telnet命令调用封装: class BmDubbo(object):
prompt = 'dubbo>'
def __init__(self, host, port): self.conn = self.conn(host, port)
def conn(self,host, port): conn = telnetlib.Telnet() try: conn.open(host, port, timeout=1) except BaseException: return False return conn
def command(self, str_=""): # 模拟cmd控制台 dubbo>invoke ... if self.conn : self.conn.write(str_.encode() + b'\n') data = self.conn.read_until(self.prompt.encode()) return data else: return False
def invoke(self, service_name, method_name, arg): command_str = "invoke {0}.{1}({2})".format(service_name, method_name, arg) data = self.command(command_str) try: # 字节数据解码 utf8 data = data.decode("utf-8").split('\n')[0].strip() except BaseException: # 字节数据解码 gbk data = data.decode("gbk").split('\n')[0].strip() return data
def ls_invoke(self, service_name): command_str = "ls -l {0}".format(service_name) data = self.command(command_str) if "No such service" in data.decode("utf-8"): return False else: data = data.decode("utf-8").split('\n') key = ['methodName', 'paramType','type'] dubbo_list = [] #这里解析有点复杂,可以自己通过telnet命令实操一下,ls -l xxx for i in range(0, len(data) - 1): value = [] dubbo_name = data[i].strip().split(' ')[1] method_name = re.findall(r"(.*?)[(]", dubbo_name)[0] value.append(method_name) paramType = re.findall(r"[(](.*?)[)]", dubbo_name)[0] paramTypeList = paramType.split(',') if len(paramTypeList) ==1: paramTypeList = paramTypeList[0] value.append(paramTypeList) #这里我将传参类型分成了4大类 if 'java.lang' in paramType or 'java.math' in paramType: value.append(0) elif not paramType: value.append(1) elif 'List' in paramType: value.append(2) else: value.append(3) dubbo_list.append(dict(zip(key, value))) return dubbo_list
def param_data(self,service_name,method_name): #这里是根据服务名和方法名,找到对应的传参类型 dubbo_data = self.ls_invoke(service_name) if dubbo_data: dubbo_list = dubbo_data if dubbo_list: for i in dubbo_list: for v in i.values(): if v == method_name: param_key = ['paramType','type'] param_value = [i.get('paramType'),i.get('type')] return dict(zip(param_key,param_value)) else: return False else: return False
dao层设计: class DubboHandle(object): @staticmethod def invoke(service_name, method_name, data): zk_conn = GetDubboService() if zk_conn.zk: zk_data = zk_conn.get_dubbo_info(service_name) if zk_data: host, port = zk_data['url'].split(":") service_name = zk_data['interface'] boby = data.copy() conn = BmDubbo(host, port) status = conn.command("") if status: # 根据服务名和方法名,返回param方法名和类型 param_data = conn.param_data(service_name, method_name) if param_data: type = param_data['type'] param = param_data['paramType'] # 传参类型为枚举值方法 if type == 0 and isinstance(boby, dict): l_data = [] for v in boby.values(): if isinstance(v,str): v = f"'{v}'" elif isinstance(v,dict) or isinstance(v,list): v = json.dumps(v) v = f"'{v}'" l_data.append(str(v)) boby = ','.join(l_data) # 无需传参 elif type == 1: boby = '' # 传参类型为集合对象 elif type == 2: # params 只有一个集合对象传参 if isinstance(boby, list): boby = boby # params 一个集合对象后面跟着多个枚举值 elif isinstance(boby, dict): set_list = [] for v in boby.values(): set_list.append(v) set_data = str(set_list) boby = set_data[1:-1] # 传参类型为自定义对象 elif type == 3: # 兼容多个自定义对象传参 if isinstance(param, list): dtoList = [] for index, dto in enumerate(boby): dto.update({"class": param[index]}) dtoList.append(json.dumps(dto)) boby = ','.join(dtoList) elif isinstance(boby, dict): boby.update({"class": param}) boby = json.dumps(boby) else: return None, f"data请求参数有误,请检查!" response_data = conn.invoke(service_name, method_name, boby) try: response_data = json.loads(response_data) except Exception as e: return None, f"解析json失败:{response_data}" return response_data, None else: return None, f"{service_name.split('.')[-1]}服务下不存在{method_name}方法" else: return None, f"{service_name}服务连接出错" else: return None, f"{service_name}没有在zk中心注册" else: return None, "zk服务连接失败"
view层引用: @router.post('/invoke', name='dubbo业务请求接口') async def dubboInvoke(data: DubboInvokeBody): res_data, err = DubboHandle.invoke(data.serviceName, data.methodName, data.data) if err: return res_400(msg=err) return res_200(data=res_data)
invoke接口传参说明 原生对象或者自定义对象传参(xxDto、jsonObj、java.util.HashMap): { "serviceName": "xxxxxx", "methodName": "xxxxxx", "data": { //data传入对应的对象数据,一般为json格式的 "productStoreQueryDTOS": [ { "productNoNumDTOList": [ { "num": 13, "productNo": "10000620" }, { "num": 13, "productNo": "10000014" } ], "storeCode": "4401S1389" } ] } }
枚举值类型传参(java.lang.String、java.lang.Integer): { "serviceName": "xxxx", "methodName": "xxxxx", "data": { //格式为json,枚举值顺序必须按照dubbo接口定义的传参顺序,注意是否为int还是string "account":"123456", "password":"3fd6ebe43dab8b6ce6d033a5da6e6ac5" } }
方法名无需传参: { "serviceName": "xxxx", "methodName": "xxxxxx", "data":{} //传入空对象 }
集合对象传参(java.util.List): { "serviceName": "xxxx", "methodName": "xxxxxx", "data":{ "List": [ "1221323", "3242442" ] } //传入对象,里面嵌套数组 }
集合对象传参,后面跟着枚举值(java.util.List 、 java.lang.String 、 java.lang.Integer): { "serviceName": "xxxx", "methodName": "xxxxxx", "data":{ "userCode": ["12345","686838"], "startTime": "2021-04-16 13:30:00", "endTime": "2021-04-16 14:30:00" } }
多个自定义对象传参,对象顺序按照dubbo接口定义的传参顺序(xxdtox、xxdto): { "serviceName": "xxxx", "methodName": "xxxxxx", "data":[ { "userCode": "7932723", "startTime": "2021-04-16 13:30:00", "endTime": "2021-04-16 14:30:00" }, { "name": "fang", "age": "18" } ] }
上述传参可以满足大部分入参类型~
疑惑 问:dubbo的传输协议,本身支持http协议,跟开发沟通,测试环境切换为http协议,不就方便测试了么? 答:公司内部系统对接大多走的是 Dubbo 协议,这是公司的开发规范,只能从外部绕了。 问:为什么我部署之后,连接不上zk服务或者出现dubbo服务连接出错? 答:部署服务的主机必须可以连通dubbo服务。
总结 本期解决了测试dubbo接口的痛点,希望能对大家有帮助~
本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理
21天更文挑战,赢取价值500元大礼,还有机会成为签约作者!
原文地址:http://www.51testing.com/?action-viewnews-itemid-7789497
免责声明:本文来源于互联网,版权归合法拥有者所有,如有侵权请公众号联系管理员
* 本站提供的一些文章、资料是供学习研究之用,如用于商业用途,请购买正版。