Como estender a classe Python Thread

Resumo : neste tutorial, você aprenderá como estender a Threadclasse Python para executar código em um novo thread.

Introdução à classe Python Thread

Quando um programa Python é iniciado, ele possui um thread chamado thread principal . Às vezes, você deseja descarregar as tarefas vinculadas a E/S para um novo thread para executá-las simultaneamente. Para fazer isso, você usa o módulo de threading integrado .

Uma maneira de executar código em um novo thread é estender a Threadclasse do threadingmódulo. Aqui estão as etapas:

  • Primeiro, defina uma subclasse de threading.Threadclasse.
  • Segundo, substitua __init__(self, [,args])o método dentro do __init__()método da subclasse para adicionar argumentos personalizados.
  • Terceiro, substitua o run(self, [,args])método dentro da subclasse para personalizar o comportamento da nova classe de thread quando um novo thread for criado.

Vejamos um exemplo de extensão da Threadclasse. Desenvolveremos uma classe que executa uma solicitação HTTP para uma URL e exibe o código de resposta:

class HttpRequestThread(Thread):
    def __init__(self, url: str) -> None:
        super().__init__()
        self.url = url

    def run(self) -> None:
        print(f'Checking {self.url} ...')
        try:
            response = urllib.request.urlopen(self.url)
            print(response.code)
        except urllib.error.HTTPError as e:
            print(e.code)
        except urllib.error.URLError as e:
            print(e.reason)Linguagem de código:  Python  ( python )

Como funciona.

Primeiro, defina um HttpRequestThreadque estenda a Threadclasse do threadingmódulo:

class HttpRequestThread(Thread):Linguagem de código:  Python  ( python )

Segundo, defina o __init__()método que aceita uma URL. Dentro do __init__()método chama o __init__()método da superclasse.

def __init__(self, url: str) -> None:
    super().__init__()
    self.url = urlLinguagem de código:  Python  ( python )

Terceiro, substitua o método run que usa urllibpara obter o código de status HTTP do URL especificado e exibi-lo no console:

def run(self) -> None:
    print(f'Checking {self.url} ...')
    try:
        response = urllib.request.urlopen(self.url)
        print(response.code)
    except urllib.error.HTTPError as e:
        print(e.code)
    except urllib.error.URLError as e:
        print(e.reason)Linguagem de código:  Python  ( python )

Para usar a HttpRequestThreadclasse, você cria instâncias da HttpRequestThreadclasse e chama o start()método. Além disso, você pode chamar o join()método para aguardar a conclusão de todos os threads.

O seguinte define a main()função que usa a HttpRequestThreadclasse:

def main() -> None:
    urls = [
        'https://httpstat.us/200',
        'https://httpstat.us/400'
    ]

    threads = [HttpRequestThread(url) for url in urls]

    [t.start() for t in threads]

    [t.join() for t in threads]Linguagem de código:  Python  ( python )

Como funciona.

Primeiro, defina uma lista de URLs que queremos verificar:

urls = [
    'https://httpstat.us/200',
    'https://httpstat.us/400'
]Linguagem de código:  Python  ( python )

Em segundo lugar, crie instâncias com HttpRequestThreadbase na urlslista usando compreensão de lista. A compreensão da lista retorna uma lista de instâncias da HttpRequestThreadclasse:

threads = [HttpRequestThread(url) for url in urls]Linguagem de código:  Python  ( python )

Terceiro, chame o start()método de cada thread da threadslista:

[t.start() for t in threads]Linguagem de código:  Python  ( python )

Por fim, chame o join de cada Threadinstância para aguardar a threadsconclusão de tudo:

[t.join() for t in threads]Linguagem de código:  Python  ( python )

Junte tudo:

from threading import Thread
import urllib.request


class HttpRequestThread(Thread):
    def __init__(self, url: str) -> None:
        super().__init__()
        self.url = url

    def run(self) -> None:
        print(f'Checking {self.url} ...')
        try:
            response = urllib.request.urlopen(self.url)
            print(response.code)
        except urllib.error.HTTPError as e:
            print(e.code)
        except urllib.error.URLError as e:
            print(e.reason)


def main() -> None:
    urls = [
        'https://httpstat.us/200',
        'https://httpstat.us/400'
    ]

    threads = [HttpRequestThread(url) for url in urls]

    [t.start() for t in threads]

    [t.join() for t in threads]


if __name__ == '__main__':
    main()Linguagem de código:  Python  ( python )

Saída:

Checking https://httpstat.us/200 ...
Checking https://httpstat.us/400 ...
200
400Linguagem de código:  JavaScript  ( javascript )

Resumo

  • Estenda a Threadclasse para executar código em um novo thread chamando o __init__()método da superclasse na subclasse e substitua o método run para adicionar o código para execução em um novo thread.

Deixe um comentário

O seu endereço de email não será publicado. Campos obrigatórios marcados com *