南寧公司和幾個(gè)分公司之間都使用了呼叫系統(tǒng),然后現(xiàn)在需要做一個(gè)呼叫通話數(shù)據(jù)分析,由于分公司的呼叫服務(wù)器是在內(nèi)網(wǎng),通過技術(shù)手段映射出來,分公司到南寧之間的網(wǎng)絡(luò)不穩(wěn)定,所以需要把分公司的通話數(shù)據(jù)同步到南寧。
本身最簡單的方法就是直接配置MySQL的主從同步就可以同步數(shù)據(jù)到南寧來了。但是銷售呼叫系統(tǒng)那邊的公司不給MySQL權(quán)限我們。 所以這個(gè)方法只能放棄了。
于是我們干脆的想,使用PHP來實(shí)現(xiàn)定時(shí)一個(gè)簡易的PHP定時(shí)同步工具,然后PHP進(jìn)程常駐后臺(tái)運(yùn)行,所以首先就先到了一個(gè)PHP組件:SWOOLE,經(jīng)過討論,分公司的每天半天生成的數(shù)據(jù)量最大在5000條左右,所以這個(gè)方案是可行,就這樣干。
我們使用PHP SWOOLE 做一個(gè)異步的定時(shí)任務(wù)系統(tǒng)。
本身MySQL數(shù)據(jù)庫的主從同步是通過解析Master庫中的binary-log來進(jìn)行同步數(shù)據(jù)到從庫的。然而我們使用PHP來同步數(shù)據(jù)的時(shí)候,那么只能從master庫分批查詢數(shù)據(jù),然后插入到南寧的slave庫來了。
這里我們使用的框架是 ThinkPHP 3.2
.
首先安裝PHP擴(kuò)展: SWOOLE,因?yàn)闆]有使用到特別的功能,所以這里我們使用pecl來快速安裝:
pecl install swoole
安裝完成后在 php.ini
里面加入 extension="swoole.so"
安裝完成后,我們使用 phpinfo()
來檢查是否成功了.
安裝成功了,我們就來寫業(yè)務(wù).
服務(wù)端
1、首先啟動(dòng)一個(gè)后臺(tái)的服務(wù)端,監(jiān)聽端口9501
public function index() { $serv = new \swoole_server("0.0.0.0", 9501); $serv->set([ 'worker_num' => 1,//一般設(shè)置為服務(wù)器CPU數(shù)的1-4倍 'task_worker_num' => 8,//task進(jìn)程的數(shù)量 'daemonize' => 1,//以守護(hù)進(jìn)程執(zhí)行 'max_request' => 10000,//最大請(qǐng)求數(shù)量 "task_ipc_mode " => 2 //使用消息隊(duì)列通信,并設(shè)置為爭搶模式 ]); $serv->on('Receive', [$this, 'onReceive']);//接收任務(wù),并投遞 $serv->on('Task', [$this, 'onTask']);//可以在這個(gè)方法里面處理任務(wù) $serv->on('Finish', [$this, 'onFinish']);//任務(wù)完成時(shí)候調(diào)用 $serv->start(); }
2、接收和投遞任務(wù)
public function onReceive($serv, $fd, $from_id, $data) { //使用json_decode 解析任務(wù)數(shù)據(jù) $areas = json_decode($data,true); foreach ($areas as $area){ //投遞異步任務(wù) $serv->task($area); } }
3、任務(wù)執(zhí)行,數(shù)據(jù)從master庫查詢和寫入到slave數(shù)據(jù)庫
public function onTask($serv, $task_id, $from_id, $task_data) { $area = $task_data;//參數(shù)是地區(qū)編號(hào) $rows = 50; //每頁多少條 //主庫地址,根據(jù)參數(shù)地區(qū)($area)編號(hào)切換master數(shù)據(jù)庫連接 //從庫MySQL實(shí)例,根據(jù)參數(shù)地區(qū)($area)編號(hào)切換slave數(shù)據(jù)庫連接 //由于程序是常駐內(nèi)存的,所以MySQL連接可以使用長連接,然后重復(fù)利用。要使用設(shè)計(jì)模式的,可以使用對(duì)象池模式 Code...... //master 庫為分公司的數(shù)據(jù)庫,slave庫為數(shù)據(jù)同步到南寧后的從庫 Code...... //使用$sql獲取從庫中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1 $slaveMaxIncrementId = ...; //使用$sql獲取主庫中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1 $masterMaxIncrementId = ...; //如果相等的就不同步了 if($slaveMaxIncrementId >= $masterMaxIncrementId){ return false; } //根據(jù)條數(shù)計(jì)算頁數(shù) $dataNumber = ceil($masterMaxIncrementId - $slaveMaxIncrementId); $eachNumber = ceil($dataNumber / $rows); $left = 0; //根據(jù)頁數(shù)來進(jìn)行分批循環(huán)進(jìn)行寫入,要記得及時(shí)清理內(nèi)存 for ($i = 0; $i < $eachNumber; $i++) { $left = $i == 0 ? $slaveMaxIncrementId : $left + $rows; $right = $left + $rows; //生成分批查詢條件 //$where = "id > $left AND <= $right"; $masterData = ...;//從主庫查詢數(shù)據(jù) $slaveLastInsertId = ...;//插入到從庫 unset($masterData,$slaveLastInsertId); } echo "New AsyncTask[id=$task_id]".PHP_EOL; $serv->finish("$area -> OK"); }
4、任務(wù)完成時(shí)候調(diào)用
public function onFinish($serv, $task_id, $task_data) { echo "AsyncTask[$task_id] Finish: $task_data".PHP_EOL; }
客戶端推送任務(wù)
到此基本完成,剩下來我們來寫客戶端任務(wù)推送
public function index() { $client = new \swoole_client(SWOOLE_SOCK_TCP); if (!$client->connect('127.0.0.1', 9501, 1)) { throw new Exception('鏈接SWOOLE服務(wù)錯(cuò)誤'); } $areas = json_encode(['liuzhou','yulin','beihai','guilin']); //開始遍歷檢查 $client->send($areas); echo "任務(wù)發(fā)送成功".PHP_EOL; }