最近在研究PHP,很喜歡,碰到PHP并發查詢MySQL的問題,研究了一下,順便留個筆記:
同步查詢
這是我們最常的調用模式,客戶端調用Query[函數],發起查詢命令,等待結果返回,讀取結果;再發送第二條查詢命令,等待結果返回,讀取結果。總耗時,會是兩次查詢的時間之和。簡化一下過程,例如下圖:
例圖,由1.1到1.3為一個Query[函數]的調用,兩次查詢,就要串行經歷1.1、1.2、1.3、2.1、2.2、2.3,尤其在1.2和2.2會阻塞等待,進程沒法做其他事情。
同步調用的好處是,符合我們的直觀思維,調用和處理都簡單。缺點是進程阻塞在等待結果返回,增加額外的運行時間。
如果,有多條查詢請求,或者進程還有其他的事情處理,那么能否把等待的時間也合理利用起來,提高進程的處理能力呢,顯然是可以的。
拆分
現在,我們把Query[函數]打碎,客戶端在1.1后,馬上返回,客戶端跳過1.2,在1.3有數據達到后再去讀取數據。這樣進程在原來的1.2階段就解放了,可以做更多的事情,例如…再發起一條sql查詢[2.1],是否看到了并發查詢的雛形了。
并發查詢
相對于同步查詢的下一條查詢的發起都在上一條完成后,并發查詢,可以在上一條查詢請求發起后,立刻發起下一條查詢請求。簡化一下過程,下圖:
例圖,在1.1.1成功發送完請求后,立馬返回[1.1.2],最終查詢結果的返回時在遙遠的1.2 。但是在,1.1.1到1.2中間,還發起了另一個查詢請求,這時間段內,就同時發起了兩條查詢請求,2.2先于1.2到達,那么兩條查詢的總耗時,只相當于第一條查詢的時間。
并發查詢的優點是,可以提高進程的使用率,避免阻塞等待服務器處理查詢,縮短了多條查詢的耗時。但缺點也很明顯,發起N條并發查詢,就需要建立N條數據庫鏈接,對于有數據庫連接池的應用來說,可以避免這種情況。
退化
理想情況下,我們希望并發N條查詢,總耗時等于查詢時間最長的一條查詢。但也有可能并發查詢會[退化]為[同步查詢]。What?例圖中,如果1.2在2.1.1前就返回了,那么并發查詢就[退化]為[同步查詢]了,但付出的代價卻比同步查詢要高。
多路復用
- 發起query1
- 發起query2
- 發起query3
- ………
- 等待query1、query2、query3
- 讀取query2結果
- 讀取query1結果
- 讀取query3結果
那么,怎么等待知道什么時候查詢結果返回了,又是哪個的查詢結果返回呢?
對每個查詢IO調用read?如果是遇上阻塞IO,這樣就會阻塞在一個IO上,其他IO有結果返回了,也沒法處理。那么,如果是非阻塞IO,那不用怕會阻塞在其中一個IO上了,確實是,但又會造成不斷地輪詢判斷,浪費CPU資源。
對于這種情況可以使用多路復用輪詢多個IO。
PHP實現并發查詢MySQL
PHP的mysqli(mysqlnd驅動)提供多路復用輪詢IO(mysqli_poll)和異步查詢(MYSQLI_ASYNC、mysqli_reap_async_query),使用這兩個特性實現并發查詢,示例代碼:
<?php $sqls = array( 'SELECT * FROM `mz_table_1` LIMIT 1000,10', 'SELECT * FROM `mz_table_1` LIMIT 1010,10', 'SELECT * FROM `mz_table_1` LIMIT 1020,10', 'SELECT * FROM `mz_table_1` LIMIT 10000,10', 'SELECT * FROM `mz_table_2` LIMIT 1', 'SELECT * FROM `mz_table_2` LIMIT 5,1' ); $links = []; $tvs = microtime(); $tv = explode(' ', $tvs); $start = $tv[1] * 1000 + (int)($tv[0] * 1000); // 鏈接數據庫,并發起異步查詢 foreach ($sqls as $sql) { $link = mysqli_connect('127.0.0.1', 'root', 'root', 'dbname', '3306'); $link->query($sql, MYSQLI_ASYNC); // 發起異步查詢,立即返回 $links[$link->thread_id] = $link; } $llen = count($links); $process = 0; do { $r_array = $e_array = $reject = $links; // 多路復用輪詢IO if(!($ret = mysqli_poll($r_array, $e_array, $reject, 2))) { continue; } // 讀取有結果返回的查詢,處理結果 foreach ($r_array as $link) { if ($result = $link->reap_async_query()) { print_r($result->fetch_row()); if (is_object($result)) mysqli_free_result($result); } else { } // 操作完后,把當前數據鏈接從待輪詢集合中刪除 unset($links[$link->thread_id]); $link->close(); $process++; } foreach ($e_array as $link) { die; } foreach ($reject as $link) { die; } }while($process < $llen); $tvs = microtime(); $tv = explode(' ', $tvs); $end = $tv[1] * 1000 + (int)($tv[0] * 1000); echo $end - $start,PHP_EOL;