超详细干货:Appium+Pytest实现App并发测试!

测试开发技术

共 28285字,需浏览 57分钟

 ·

2021-09-07 17:06


开课了:重磅消息 | 2021年最新全栈测试开发技能实战指南(第2期)

1. 前言

Appium结合Pytest开展App自动化测试时,你知道如何实现用例并发执行吗?费话不多说,直接上代码, 毕竟想让每个人都能看明白也不容易,所以先放代码,有兴趣的先自行研究。

2. 目录结构

3. 文件源码

3.1 base/base_page.py


"""
------------------------------------
@File : base_page.py
------------------------------------
"""

import time
from appium.webdriver import WebElement
from appium.webdriver.webdriver import WebDriver
from appium.webdriver.common.touch_action import TouchAction
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchElementException, TimeoutException
 
 
class Base(object):
 
    def __init__(self, driver: WebDriver):
         self.driver = driver
 
    @property
    def get_phone_size(self):
         """获取屏幕的大小"""
         width = self.driver.get_window_size()['width']
         height = self.driver.get_window_size()['height']
          return width, height
 
    def swipe_left(self, duration=300):
        """左滑"""
         width, height = self.get_phone_size
         start = width * 0.9, height * 0.5
         end = width * 0.1, height * 0.5
         return self.driver.swipe(*start, *end, duration)
 
    def swipe_right(self, duration=300):
          """右滑"""
        width, height = self.get_phone_size
        start = width * 0.1, height * 0.5
        end = width * 0.9, height * 0.5
        return self.driver.swipe(*start, *end, duration)
 
    def swipe_up(self, duration):
         """上滑"""
         width, height = self.get_phone_size
         start = width * 0.5, height * 0.9
         end = width * 0.5, height * 0.1
        return self.driver.swipe(*start, *end, duration)
 
    def swipe_down(self, duration):
        """下滑"""
        width, height = self.get_phone_size
        start = width * 0.5, height * 0.1
        end = width * 0.5, height * 0.9
        return self.driver.swipe(*start, *end, duration)
 
      def skip_welcome_page(self, direction, num=3):
        """
        滑动页面跳过引导动画
        :param direction:  str 滑动方向,left, right, up, down
        :param num: 滑动次数
        :return:
        """

         direction_dic = {
             "left""swipe_left",
             "right""swipe_right",
              "up""swipe_up",
            "down""swipe_down"
        }
        time.sleep(3)
        if hasattr(self, direction_dic[direction]):
            for _ in range(num):
                getattr(self, direction_dic[direction])()  # 使用反射执行不同的滑动方法
        else:
             raise ValueError("参数{}不存在, direction可以为{}任意一个字符串".
                              format(direction, direction_dic.keys()))
  
    @staticmethod
    def get_element_size_location(element):
        width = element.rect["width"]
        height = element.rect["height"]
        start_x = element.rect["x"]
        start_y = element.rect["y"]
        return width, height, start_x, start_y
 
     def get_password_location(self, element: WebElement) -> dict:
          width, height, start_x, start_y = self.get_element_size_location(element)
        point_1 = {"x": int(start_x + width * (1 / 6) * 1), "y": int(start_y + height * (1 / 6) * 1)}
        point_2 = {"x": int(start_x + width * (1 / 6) * 3), "y": int(start_y + height * (1 / 6) * 1)}
        point_3 = {"x": int(start_x + width * (1 / 6) * 5), "y": int(start_y + height * (1 / 6) * 1)}
        point_4 = {"x": int(start_x + width * (1 / 6) * 1), "y": int(start_y + height * (1 / 6) * 3)}
        point_5 = {"x": int(start_x + width * (1 / 6) * 3), "y": int(start_y + height * (1 / 6) * 3)}
        point_6 = {"x": int(start_x + width * (1 / 6) * 5), "y": int(start_y + height * (1 / 6) * 3)}
        point_7 = {"x": int(start_x + width * (1 / 6) * 1), "y": int(start_y + height * (1 / 6) * 5)}
        point_8 = {"x": int(start_x + width * (1 / 6) * 3), "y": int(start_y + height * (1 / 6) * 5)}
        point_9 = {"x": int(start_x + width * (1 / 6) * 5), "y": int(start_y + height * (1 / 6) * 5)}
        keys = {
           1: point_1,
           2: point_2,
           3: point_3,
           4: point_4,
           5: point_5,
           6: point_6,
           7: point_7,
           8: point_8,
           9: point_9
        }
        return keys

    def gesture_password(self, element: WebElement, *pwd):
        """手势密码: 直接输入需要链接的点对应的数字,最多9位
        pwd: 1, 2, 3, 6, 9
        """

        if len(pwd) > 9:
            raise ValueError("需要设置的密码不能超过9位!")
        keys_dict = self.get_password_location(element)
        start_point = "TouchAction(self.driver).press(x={0}, y={1}).wait(200)". \
            format(keys_dict[pwd[0]]["x"], keys_dict[pwd[0]]["y"])
        for index in range(len(pwd) - 1):  # 0,1,2,3
            follow_point = ".move_to(x={0}, y={1}).wait(200)". \
                format(keys_dict[pwd[index + 1]]["x"],
                       keys_dict[pwd[index + 1]]["y"])
            start_point = start_point + follow_point
        full_point = start_point + ".release().perform()"
        return eval(full_point)

    def find_element(self, locator: tuple, timeout=30) -> WebElement:
       wait = WebDriverWait(self.driver, timeout)
       try:
           element = wait.until(lambda driver: driver.find_element(*locator))
            return element
        except (NoSuchElementException, TimeoutException):
            print('no found element {} by {}', format(locator[1], locator[0]))


if __name__ == '__main__':
    pass

3.2 common/check_port.py

"""
------------------------------------
@File : check_port.py
------------------------------------
"""

import socket
import os


def check_port(host, port):
    """检测指定的端口是否被占用"""
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建socket对象
    try:
        s.connect((host, port))
        s.shutdown(2)
    except OSError:
        print('port %s is available! ' % port)
        return True
    else:
        print('port %s already be in use !' % port)
        return False


def release_port(port):
    """释放指定的端口"""
    cmd_find = 'netstat -aon | findstr {}'.format(port)  # 查找对应端口的pid
    print(cmd_find)

    # 返回命令执行后的结果
    result = os.popen(cmd_find).read()
    print(result)

    if str(port) and 'LISTENING' in result:
        # 获取端口对应的pid进程
        i = result.index('LISTENING')
        start = i + len('LISTENING') + 7
        end = result.index('\n')
        pid = result[start:end]
        cmd_kill = 'taskkill -f -pid %s' % pid  # 关闭被占用端口的pid
        print(cmd_kill)
        os.popen(cmd_kill)
    else:
        print('port %s is available !' % port)


if __name__ == '__main__':
    host = '127.0.0.1'
    port = 4723
    if not check_port(host, port):
        print("端口被占用")
        release_port(port)

3.3 common/get_main_js.py

"""
------------------------------------
@File : get_main_js.py
@IDE  : PyCharm
------------------------------------
"""

import subprocess
from config.root_config import LOG_DIR

"""
获取main.js的未知,使用main.js启动appium server
"""



class MainJs(object):
    """获取启动appium服务的main.js命令"""

    def __init__(self, cmd: str = "where main.js"):
        self.cmd = cmd

    def get_cmd_result(self):
        p = subprocess.Popen(self.cmd,
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE,
                             shell=True)
        with open(LOG_DIR + "/" + "cmd.txt""w", encoding="utf-8") as f:
            f.write(p.stdout.read().decode("gbk"))
        with open(LOG_DIR + "/" + "cmd.txt""r", encoding="utf-8") as f:
            cmd_result = f.read().strip("\n")
        return cmd_result


if __name__ == '__main__':
    main = MainJs("where main.js")
    print(main.get_cmd_result())

3.4 config/desired_caps.yml

automationName: uiautomator2
platformVersion: 5.1.1
platformName: Android
appPackage: com.xxzb.fenwoo
appActivity: .activity.addition.WelcomeActivity
noReset: True
ip: "127.0.0.1"

3.5 config/root_config.py


"""
------------------------------------
@File : root_config.py
------------------------------------
"""

import os

"""
project dir and path
"""

ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(ROOT_DIR, "log")
CONFIG_DIR = os.path.join(ROOT_DIR, "config")
CONFIG_PATH = os.path.join(CONFIG_DIR, "desired_caps.yml")

3.6 drivers/app_driver.py

"""
------------------------------------
@File : app_driver.py
------------------------------------
"""

import subprocess
from time import ctime
from appium import webdriver
import yaml

from common.check_port import check_port, release_port
from common.get_main_js import MainJs
from config.root_config import CONFIG_PATH, LOG_DIR


class BaseDriver(object):
    """获取driver"""
    def __init__(self, device_info):
        main = MainJs("where main.js")
        with open(CONFIG_PATH, 'r') as f:
            self.data = yaml.load(f, Loader=yaml.FullLoader)
        self.device_info = device_info
        js_path = main.get_cmd_result()
        cmd = r"node {0} -a {1} -p {2} -bp {3} -U {4}:{5}".format(
            js_path,
            self.data["ip"],
            self.device_info["server_port"],
            str(int(self.device_info["server_port"]) + 1),
            self.data["ip"],
            self.device_info["device_port"]
        )
        print('%s at %s' % (cmd, ctime()))
        if not check_port(self.data["ip"], int(self.device_info["server_port"])):
            release_port(self.device_info["server_port"])
        subprocess.Popen(cmd, shell=True, stdout=open(LOG_DIR + "/" + device_info["server_port"] + '.log''a'),
                         stderr=subprocess.STDOUT)

    def get_base_driver(self):
        desired_caps = {
            'platformName': self.data['platformName'],
            'platformVerion': self.data['platformVersion'],
            'udid': self.data["ip"] + ":" + self.device_info["device_port"],
            "deviceName": self.data["ip"] + ":" + self.device_info["device_port"],
            'noReset': self.data['noReset'],
            'appPackage': self.data['appPackage'],
            'appActivity': self.data['appActivity'],
            "unicodeKeyboard": True
        }
        print('appium port:%s start run %s at %s' % (
            self.device_info["server_port"],
            self.data["ip"] + ":" + self.device_info["device_port"],
            ctime()
        ))
        driver = webdriver.Remote(
            'http://' + self.data['ip'] + ':' + self.device_info["server_port"] + '/wd/hub',
            desired_caps
        )
        return driver


if __name__ == '__main__':
    pass

3.7 conftest.py

"""
------------------------------------
@File : conftest.py
------------------------------------
"""

from drivers.app_driver import BaseDriver
import pytest
import time

from common.check_port import release_port

base_driver = None


def pytest_addoption(parser):
    parser.addoption("--cmdopt", action="store", default="device_info"help=None)


@pytest.fixture(scope="session")
def cmd_opt(request):
    return request.config.getoption("--cmdopt")


@pytest.fixture(scope="session")
def common_driver(cmd_opt):
    cmd_opt = eval(cmd_opt)
    print("cmd_opt", cmd_opt)
    global base_driver
    base_driver = BaseDriver(cmd_opt)
    time.sleep(1)
    driver = base_driver.get_base_driver()
    yield driver
    # driver.close_app()
    driver.quit()
    release_port(cmd_opt["server_port"])

3.8 run_case.py

"""
------------------------------------
@File : run_case.py
------------------------------------
"""

import pytest
import os
from multiprocessing import Pool


device_infos = [
    {
        "platform_version""5.1.1",
        "server_port""4723",
        "device_port""62001",
    },
    {
        "platform_version""5.1.1",
        "server_port""4725",
        "device_port""62025",
    }
]


def main(device_info):
    pytest.main(["--cmdopt={}".format(device_info),
                 "--alluredir""./allure-results""-vs"])
    os.system("allure generate allure-results -o allure-report --clean")


if __name__ == "__main__":
    with Pool(2) as pool:
        pool.map(main, device_infos)
        pool.close()
        pool.join()

3.9 cases/test_concurrent.py

"""
------------------------------------
@File : test_concurrent.py
------------------------------------
"""

import pytest
import time
from appium.webdriver.common.mobileby import MobileBy

from base.base_page import Base


class TestGesture(object):

    def test_gesture_password(self, common_driver):
        """这个case我只是简单的做了一个绘制手势密码的过程"""
        driver = common_driver
        base = Base(driver)
        base.skip_welcome_page('left', 3)  # 滑动屏幕
        time.sleep(3)  # 为了看滑屏的效果
        driver.start_activity(app_package="com.xxzb.fenwoo",
                              app_activity=".activity.user.CreateGesturePwdActivity")
        commit_btn = (MobileBy.ID, 'com.xxzb.fenwoo:id/right_btn')
        password_gesture = (MobileBy.ID, 'com.xxzb.fenwoo:id/gesturepwd_create_lockview')
        element_commit = base.find_element(commit_btn)
        element_commit.click()
        password_element = base.find_element(password_gesture)
        base.gesture_password(password_element, 1, 2, 3, 6, 5, 4, 7, 8, 9)
        time.sleep(5)  # 看效果


if __name__ == '__main__':
    pytest.main()

4. 启动说明

  1. 我代码中使用的是模拟器,如果你需要使用真机,那么需要修改部分代码,模拟器是带着端口号的,而真机没有端口号,具体怎么修改先自己研究,后面我再详细的介绍

  2. desired_caps.yml文件中的配置需要根据自己的app配置修改

  3. 代码中没有包含自动连接手机的部分代码,所以执行项目前需要先手动使用adb连接上手机(有条件的,可以自己把这部分代码写一下,然后再运行项目之前调用一下adb连接手机的方法即可)

  4. 项目目录中的allure_report, allure_results目录是系统自动生成的,一个存放最终的测试报告,一个是存放报告的依赖文件,如果你接触过allure应该知道

  5. log目录下存放了appium server启动之后运行的日志

5. 效果展示


6. 最后

上述只是初步实现了这样一个多手机并发的需求,并没有写的很详细,比如,让项目更加的规范还需要引入PO设计模式,这里没写这部分,其次base_page.py中还可以封装更多的方法,上述代码中也只封装了几个方法,如果真正的把这个并发引入到项目中肯定还需要完善的,但是需要添加的东西都是照葫芦画瓢了,有问题多思考!

原文链接:https://www.cnblogs.com/linuxchao/p/linuxchao-pytest-mult.html


重磅消息: 由狂师老师授课主讲的「全栈测试开发技能训练营」已经正式开课了,课程内容非常值得推荐!课程大纲:重磅消息 | 2021年最新全栈测试开发技能实战指南(第2期)


推荐阅读
官宣了,测试大神必备的"三把利剑"!

Python全栈测试开发,下周六,不见不散!

微信小程序和公众号H5自动化测试技巧,赶紧GET!

重磅消息 | 2021年最新全栈测试开发技能实战指南(第2期)

月薪40K+银行测试经理,自动化测试实践经验分享


END

所有原创文章
第一时间发布至此公众号「测试开发技术」

长按二维码/微信扫码  添加作者


阅读原文

浏览 26
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报