Python の並列処理



こんにちは、切り口太郎です。

マルチコアCPUを活かすPython のコーディング方法についてご紹介します。

マルチコアを活かすって、処理を複数同時に実行する事です。

Python は、普通にコーディングしていると、1つのCPUしか利用しません。
シングルコアのコンピューターでも、マルチコアのコンピューターでも処理時間は同じです。

これだと、マルチコアCPUを搭載しているコンピューターの性能が活かしきれません。

シングルコアで、10分処理時間がかかるのであれば、4コアCPUを搭載しているコンピューターでは、3分弱で終わるようにするのが並列処理です。
WEBサーバーであれば、シングルコアが同時に1つの要求しか処理できないとして、4コアCPUでは同時に4つのリクエストを処理する事ができます。

Pythonで並列処理をするためのコーディングを覚えましょう!

1 2つの並列処理


Pythonでは、プロセスとスレッドの2つに処理方法があります。
どちらも同じように処理を並列で動作させる方法ですが、動作させる単位が異なります。

プロセスによる並列処理(4プロセス)


プロセスによる並列処理は、要求元のPythonプログラム(実線)が、Python プログラムを4つ起動して、それぞれに処理を実行させます。
この時に、Python インタープリターも4つ起動しますので、OS上では、Pythonプロセスが5個起動しています。




Pythonでマルチプロセスを実行した時の稼働プロセスです。(linux)
起動プロセスを親プロセスとして、4つの子プロセスが起動しています。
マルチプロセスは、Pythonのインタープリータ単位のプロセスをOS上で起動するので、起動に時間がかかり、リソースを多く消費します。



スレッドによる並列処理(4スレッド)


スレッドによる並列処理は、起動もとのPythonプログラムがスレッド用に4つコードを複製し実行します。
Python インタープリターは、1つで、スレッドのプログラムで共有します。




こちらは、同じマシンで、Python のマルチスレッドを実行した時の稼働プロセスです。 
起動プロセスが1つだけ起動しています。


プロセス単位で並列処理するようなコーディングは特殊な処理を実現する場合に利用します。
Pythonでプロセス単位の並列処理を行う場合は、主に複数のコンピューターで分散させて動作するようなデータ分析システムや負荷分散システムなどに利用されます。
ただ、これらのシステムを作る場合、Pythonのプロセス制御をはじめからコーディングするのではなく、フレームワークを利用します。

プロセスの並列化処理は、最近はあまり要されていないので Python のコーディングで良く利用されるスレッドによる並列処理を紹介します。

Pythonによるスレッド処理は、一般の業務システムの構築でも良く利用されます。
スレッドによる並列処理はきちんと覚えておくと、「できるプログラマー」への道につながります。

2 スレッドプールによる並列処理


Pythonのスレッド処理は、Python3からかなり便利になっています。(Python2 の時代は苦労が多かったみたいです)

Python3 では、スレッドこれだけ覚えておけば大丈夫というパッケージがあります。
concurrent というパッケージでこの中にはいっている ThreadPoolExecutor の使い方をまず覚えましょう!

パッケージモジュール説明
concurrent.futuresThreadPoolExecutorPythonスレッドのプール管理
Pythonの ThreadPoolExcutor のスレッドの管理方法の概要についです。
ThreadPoolExcutor は、submit() で実行要求を受け取る queue と Pythonのスレッドを保持する Thread Pool で構成されます。
ThreadPoolExcutorのパラメータでスレッド数を指定します。
例はスレッド数に4を設定した場合です。

ThreadPoolExcutor 作成後は、queue が空、スレッドは4つ作成されて pool に格納されています。



ここで、submit で、スレッドの実行要求 (R1) を出すと、R1 は、Queue に格納されます。


Thread Pool には利用していないスレッドが4つあるので、すぐに空いているスレッドが割り当てられ、スレッドが実行されます。



R1の処理が終了すると、R1で利用していたスレッドがプールに戻されます。



次の例は、submit で (R1,R2,R3,R4,R5)と5つの要求を出した場合です。
submitの要求はすべてQueeueへ格納されます。


Thread Pool には4つの未使用スレッドが格納されているので、Queue から順に、R1,R2,R3,R4 が割り当てられます。



スレッドで実行中の R2が処理を終了すると R2で利用していたスレッドが、Thread Pool へ戻ります。


ThreadPoolExcutorは、Thread Pool に空きがあるので、Queue から (R5)の要求を取り出して、空いているスレッドの Pool2 へ R5 を割り当てて実行します。



スレッドは実行すると、1つだけ処理結果を戻せるという制約がありますが、戻り値をクラスや tuple , dict にすれば、たくさんの処理結果を戻すことができます。
1つの処理を複数のスレッドで実行する場合には、すべてのスレッドの終了を待ち合わせる必要がありますが、これも ThreadPoolExecutor 側で管理しています。

複雑なスレッドの実装自体が非常に簡単でかつ確実にできる、まさに Python らしいお手軽パッケージです。

ThreadPoolExecutorを使用したスレッドによる並列処理はThreadPoolExecutorのメソッドを利用します。
サンプルで利用するメソッドは2つです。

メソッド名リターン説明
submitconcurrent.futures
スレッドを1つ生成して実行する。スレッドで実行したメソッドの実行結果は、
futuresから取得できる。
shutdown
すべての実行スレッドの終了を待つ

スレッド実行と終了のメソッドです。

submit メソッド

メソッド名リターン説明
submitconcurrent.futures
スレッドを1つ生成して実行する。スレッドで実行したメソッドの実行結果は、
futuresから取得できる。
shutdown
すべての実行スレッドの終了を待つ

位置引数と、キワード引数を整理します。

python は引数の指定方法がいくつかありますが、主に位置引数、キーワード引数での呼び出しがあります。

例として、def func(arg1 , arg2 , arg3='yes') を呼び出す方法を説明します。

引数の指定説明
位置引数func(10, 20,  'no')
引数の値を順に記述します。
デフォルトが記述しているもの パラメータ=value (デフォルト引数)は省略すると設定値が利用されます。
キーワード引数func(arg1=10,arg2=20,arg3='no')引数を パラメーター=値の形式で指定します。

Pythonの記述方法として、位置引数を *args , キーワード引数を **kwargs と表記する慣習があるので、python のライブラリー仕様書などを読む時に思い出してください。

shutdownメソッド

順序名前初期値必須説明
第1引数waitTrue
shutdown を実行したあとに、すべてのスレッド処理が終了するまで待機する場合には、True 待機しない場合には Flase を設定します。
False を設定するとスレッドの完了を待たずにshotdown() メソッドを抜けてくるため、自分でスレッドの確認をする必要があります。

スレッドのサンプルプログラムです。

import time
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

# スレッドで動作する関数
# ひたすら足し算を繰り返す
def thread_method(name , max):
    value = 0;
    print("start thread : " + name)
    time.sleep(2)
    for count in range(1 , max):
        value += count
    print(name + " max thread end : " + str(value))
    return 1

# スレッド管理
def thread_pooler(threads , max_counter):
    # ThreadPoolExecutor のインスタンス化 
    executor = ThreadPoolExecutor(max_workers=threads)

    count = 0
    result = list();
    while count < threads:
        count += 1
        # submit はじめはメソッド名、次からはメソッドに渡すパラメーターを順に記述する。
        future = executor.submit(thread_method , "thread" + str(count) , max_counter)
        # 関数の場合戻り値が取れるので受け取る。ここでは仮の戻り値で、shutdown() 後に値が取れる
        result.append(future);

    print("thread submit end")
    # すべてのスレッドが終了するのを待つ
    executor.shutdown(wait=True)
    print("thread shuting down")
    ret = 0
    for future in result:
        ret += future.result()
    return ret


# スレッド処理の呼び出し処理
print("----------- thread test ----------")
# このマシンは、2core/4thread なので、スレッド数を4に設定する。
threads = 4
# 足し算する回数
calc_count = 100000000

print(str(threads) + " thread start");
# thread で 計算スレッドを実行する
print("thread  return  " + str(thread_pooler(threads , calc_count)))
ソースの説明です。

7行目が、スレッドで実行する関数です。
python では、特にスレッドで動作するモジュールを作成しなくても、関数で大丈夫です。
この関数は、ひたすら足し算をしているだけです。

17行目からが、スレッド処理を行っている関数です。
19行目が ThreadPoolExecutorクラスのインスタンス化をしています。
インスタンス化パラメーターとして、max_workers (スレッドプールするスレッドの数)を指定しています。

22行目のリストは、スレッドの処理結果(7行目から始まる関数のリターン値)を保持するリストです。

24行目からのループ処理は、スレッドを起動する処理です。
スレッドは、ThreadPoolExecutor.submit() で1つづつ起動します。

26行目は、スレッドの実行をしています。
第一引数にスレッドで動かす関数名を指定し、第二引数移行は関数のパラメータを順に指定します。
リターン値として、future を取得しています。
この future は、仮の戻り値で、スレッドで動作させる関数の処理が終了するまでは、値が入っていません。
submit() は、スレッドの起動が完了すると、関数の処理の実行の完了を待たずに、すぐ戻ってきます。

28行目は、submi() の仮の戻り値をリストに格納しています。

32行目が、executor が管理しているスレッドの動作がすべて完了するまで待機する処理です。
shutdown() というメソッド名ですが、途中で処理を中断するのではなく、すべての処理が完了後にスレッドプールなどの資源を
開放してくれるという内容です。
shutdown() のパラメータで wait=True を設定していますが、これがすべての終了を待つためのパラメーターです。

35行目からのループは、スレッドの戻り値を取得しています。

呼び出し処理は、40行目から始まります。

43行目では、スレッドのプールサイズを設定しています。
サンプルを実行したマシンは、2core/4thread の corei5 を搭載しているので、最大4スレッドまで並列動作ができます。

56行目でスレッド処理を呼び出しています。

実行結果です。

----------- thread test ----------
4 thread start
start thread : thread1
start thread : thread2
start thread : thread3
thread submit end
start thread : thread4
thread4 max thread end : 4999999950000000
thread2 max thread end : 4999999950000000
thread1 max thread end : 4999999950000000
thread3 max thread end : 4999999950000000
thread shuting down
thread  return  4


3 Python のスレッドプールによる並列処理の性能



サンプルを動作した時のCPUパフォーマンスグラフです。このマシンは、2Core/4Thread で動作します。
矢印のところで、サンプルのプログラムを動作させましたが、CPUを並列で利用していることが確認できます。

Python のスレッドは、かなり良くできていて、1部のスレッドに偏らずに分散してくれます。すばらしい!

4 同時に動かせるスレッド数をpythonプログラムから取得する方法


Anaconda Prompt 画面で Pyhon を起動したのちに、次のコマンドを入力します。スレッド数が表示されます。

import os
os.cpu_count()

5 CPU コアとスレッドの関係

 
色使いが派手で申し訳ないのですが、簡略図で説明します。

昔は、1CPU=1CORE=1SMT(Simultaneous Multithreading)でしたので、1つのCPUでは同時に1つのプログラムだけ動かせました。
SMTは、プログラムコードを実行する単位で、OSは、これをCPUと見ています。
一般にCPUのスレッドは、このSMTを指しています。


シングルコアCPUといいます。(1Core/1Thread)



次に、1つのCPUに複数のコアを実装したCPUが作成されました。
マルチコアといいます。マルチコアは、コア数のプログラムを同時に動かすことができます。
以下の例では、1台のCPUでCOREなので、プログラムを同時に4つ動かすことができます。
(4Core/4Thread)



次に、1つのCPUコアに複数のSMTを実装することで、1つのCPUコア複数のプログラムを動かす事ができます。
この技術を Intel は、HT(Hyper Threading) テクノロジーといいます。
以下の例では、1台のCPUで同時に8つのプログラムを動作させる事ができます。
(4Core/8Thread)







コメント

このブログの人気の投稿

Python のファイルアクセス

Lambda について

Visual Studio Code での Python 開発のポイント