|
目录
0.前言
1. 介绍 Sogou C++ Workflow
2. 部署安装
3. 使用
4. 个人心得 0. 前言
最近在学习 Sogou workflow 框架,总之,这个框架不简单,还需要花时间深入研究,一篇文章写不完,打算根据自己的理解写几篇总结(算是给自己挖坑了o(╥﹏╥)o),目前大概是分下面三篇:
- (1)一个小白眼中的 workflow-教程篇 (上篇):主要是根据一个小 demo 串起来的一篇小教程,搭配官方 demo 可以看做一篇入门篇。
- (2)一个小白眼中的 workflow-设计篇 (中篇):深入架构层面;探讨其背后的设计。
- (3)一个小白眼中的 workflow-源码篇 (下篇):深入源码架构层面;探讨其背后的奥秘。
本篇是第一篇。
1. 介绍 Sogou C++ Workflow
workflow 是搜狗公司近期开源发布的一款 C++ 服务器引擎,支撑搜狗几乎所有后端 C++ 在线服务,包括所有搜索服务,云输入法,在线广告等,每日处理超百亿请求。
是一个设计轻盈优雅的企业级程序引擎,可以满足大多数 C++ 后端开发需求。
(ps:好了,我知道了,这好像是一个听起来很厉害的框架,那么,它到底能用来干甚呢?)
可以用的地方多的去了:
只要你能灵活的搭配使用,利用它,你可以:
- 快速搭建 http 服务器:
- 作为万能异步客户端。目前支持 http,redis,mysql 和 kafka 协议。
- 轻松构建效率极高的 spider。
- 实现自定义协议 client/server,构建自己的 RPC 系统。
- srpc就是以它为基础,作为独立项目开源。支持srpc,brpc和thrift等协议。
- 构建异步任务流,支持常用的串并联,也支持更加复杂的 DAG 结构。
- 作为并行计算工具使用。除了网络任务,也包含计算任务的调度。所有类型的任务都可以放入同一个流中。
- 在Linux系统下作为文件异步 IO 工具使用,磁盘IO也是一种任务。
- 实现任何计算与通讯关系非常复杂的高性能高并发的后端服务。
- 构建微服务系统。
- 项目内置服务治理与负载均衡等功能。
居然支持这么多的功能,果然牛逼啊,假如我现在手上有一台机器,那么我该怎么快速把它使用起来呢?
编译和运行环境
放心,这个框架支持跨平台编译和运行。
(ps:感觉这年头,一个开源项目如果还不支持跨平台,都不好意思说出去啊)
- 项目支持 Linux,macOS,Windows
等操作系统。
- Windows版以windows分支发布,使用iocp实现异步网络。用户接口与Linux版一致。
- 支持所有CPU平台,包括32或64位x86处理器,大端或小端arm处理器。
- 需要依赖于OpenSSL,推荐OpenSSL 1.1及以上版本。
- 不喜欢SSL的用户可以使用nossl分支,代码更简洁。但仍需链接crypto。
- 项目使用了C++11标准,需要用支持C++11的编译器编译。但不依赖boost或asio。
- 项目无其它依赖。如需使用kafka协议,需自行安装lz4,zstd和snappy几个压缩库。
官方 GitHub 地址:
一句话介绍:基于C++11 std::function的异步引擎。用于解决一切关于串行,并行和异步的问题。
下面,就跟随我的脚步,通过一个简单的 workflow MySQL 任务在实践中一步步上手这个框架。
2. 部署安装
1、命令安装 workflow-dev 截止本文档提交的最新版本是 1.13.1 。具体使用时,可 yum list | grep workflow 获取最新版本
yum install workflow-devel-1.13.1
2、查看 workflow 库文件安装目录,一般在 /usr/include/workflow/ 目录下 。
cd /usr/include/workflow/
3、检查本机是否安装 make,cmake(其中cmake大于等于3.5版本,低版本编译会报错不支持),是否支持 OpenSSL 。
4、在要运行源文件的目录,为了使用 workflow 的库文件,可以拷贝 /usr/include/workflow/ 到本目录或者直接在源文件里 include /usr/include/目录。
同时创建 CMakeLists.txt,内容参考如下。
//要求的cmake最低版本
cmake_minimum_required(VERSION 3.5)
set(CMAKE_BUILD_TYPE RelWithDebInfo CACHE STRING "build type")
//项目的文件类型
project(example
LANGUAGES C CXX
)
//设置cmake运行和输出目录
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_SOURCE_DIR})
//要求支持OpenSSL
find_package(OpenSSL REQUIRED)
find_package(workflow REQUIRED)
//相应的头文件
include_directories(${OPENSSL_INCLUDE_DIR} ${WORKFLOW_INCLUDE_DIR})
//添加可执行文件所需要的库
link_directories(${WORKFLOW_LIB_DIR})
if (WIN32)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /MP /wd4200")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /std:c++14")
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fexceptions")
endif ()
//这里添加你要运行的源文件名称,即最终生成的可执行文件
set(EXAMPLE_LIST
mysql_server
)
set(WORKFLOW_LIB workflow)
foreach(src ${EXAMPLE_LIST})
add_executable(${src} ${src}.cc)
target_link_libraries(${src} workflow rt)
endforeach()
5、CMake 主要是编写 CMakeLists.txt 文件, 然后用 cmake 命令将 CMakeLists.txt 文件转化为 make 所需要的 makefile 文件,最后用 make 命令编译源码生成可执行程序或共享库,由于编译中出现中间的文件,因此最好新建一个独立的目录 build,在该目录下进行编译生成的 GNUmakefile 内容参考如下:
ALL_TARGETS := all clean
MAKE_FILE := Makefile
.PHONY: $(ALL_TARGETS)
all:
ifeq ($(MAKE_FILE), $(wildcard $(MAKE_FILE)))
make -f Makefile
else
mkdir -p build
ifeq ($(DEBUG),y)
cd build && cmake3 -D CMAKE_BUILD_TYPE=Debug .. || cmake -D CMAKE_BUILD_TYPE=Debug ..
else
cd build && cmake3 .. || cmake ..
endif
make -C build
endif
clean:
ifeq ($(MAKE_FILE), $(wildcard $(MAKE_FILE)))
-make -f Makefile clean
else ifeq (build, $(wildcard build))
-make -C build clean
endif
rm -rf build
6、以上步骤配置没问题之后,在当前目录执行 make 命令,编译无误即可通过执行 ./xxx 来运行相应程序 。
3. 使用
对于框架的一些使用详细介绍,可以参考 workflow 的官网,说的比较清楚了,这里仅以我使用到的部分来简单说一下我自己的理解。基于 workflow 实现了一个简单的 MySQL 服务,并向外提供 post 、get 、alter 和 delete 接口。
在 workflow 里面,提供了一个 set_handler 的方法来触发不同的请求处理函数。
ps:支持 set_handler 的例子基于版本是 1.11. 最新的版本好像已经把这个 WFWebServer 类干掉了,取而代之的是封装的二级工厂 WFMySQLConnection。
也就是说,原先的逻辑概括就是:
如果我编写的服务启动在 IP 为 10.160.61.162(我的虚机IP),端口 8888 的服务器上,
浏览器访问 http://10.160.61.162:8888 就是第一个 handler,得到 hello world 的页面,
访问 http://10.160.61.162:8888/mysql/get 就会触发 mysql_get 函数 (函数名自定义),
访问 http://10.160.61.162:8888/mysql/post 就会触发 mysql_post 函数 ,
其中 指定数据库的地址可通过 一个全局字符串变量 mysql_URL 来赋值,这个地址会在所有你需要编写的请求处理函数里绑定到,参考如下。 //std::string mysql_URL = "mysql://root@127.0.0.1/workflow";
int main(int argc, char *argv[]) {
if (argc != 2) {
fprintf(stderr, &#34;USAGE: %s <port>\n&#34;, argv[0]);
exit(1);
}
signal(SIGINT, sighandler);
signal(SIGTERM, sighandler);
WFWebServer server;
//POST 接口
server.set_handler(&#34;/&#34;, [](const HttpRequest& req, HttpResponse& resp) {
std::string body = &#34;Hello World-resp&#34;;
resp.append_output_body(body.c_str(), body.size());
});
server.set_handler(&#34;/mysql/post&#34;, mysql_post);
//GET 接口
server.set_handler(&#34;/mysql/get/&#34;, mysql_get);
//ALTER 接口
server.set_handler(&#34;/mysql/alter/&#34;,mysql_alter);
//DELETE 接口
server.set_handler(&#34;/mysql/delete/&#34;,mysql_delete);
if (server.start(atoi(argv[1])) >= 0) {
log(&#34;[main]start[%s] WFT_STATE_SUCCESS\n&#34;,get_now_time().c_str());
pause();
server.stop();
} else {
perror(&#34;server.start&#34;);
}
return 0;
}
新版的逻辑在官网的解释中是这样数说的:由于支持高并发异步客户端,这意味着对一个 server 的连接可能会不止一个。而 MySQL 的事务和预处理都是带状态的,为了保证一次事务或预处理独占一个连接,用户可以使用封装的二级工厂 WFMySQLConnection 来创建任务,每个 WFMySQLConnection保证独占一个连接,具体参考 WFMySQLConnection.h。
好了,到这里,我们开始进入重点内容:容以 POST 、GET 为例,简单说一下处理逻辑:
3.1 POST
1、在 workflow 里面,创建一个 set_handler 的方法,就是触发了不同的请求处理函数,同时也创建了一个 WFHttpTask。
2、在对应的处理函数里面,直接通过 server_task->get_req() 和 server_task->get_resp() 方法来得到 req 和 resp。
3、POST 请求的时候,需要注意通过 HttpUtil::decode_chunked_body 来解析 user_req 的 body。
4、通过后续的解析数据的大小判断,通过 user_resp.set_status_code 方法来设置返回状态以及 user_resp.append_output_body 方法来设置返回信息。
5、通过 WFTaskFactory::create_mysql_task 来创建一个具体的 myql_task,注意: callback 函数里面是编写具体的处理逻辑以及状态返回。在 create_mysql_task 的时候,可以通过值捕获或引用捕获来传递你需要传递的参数或者变量。最后通过 *series_of(server_task) <<mysql_insert_task 方法来把相应的 task 放到运行队列里(我的理解,这是理解异步设计的关键,可以支持在一个 callback 里面插入其它的 task)。
POST demo 参考如下:
//POST接口处理主函数
void mysql_post(WFHttpTask *server_task) {
//user_req->get_request_uri()调用得到请求的完整URL,通过这个URL构建发往mysql的insert任务。
const HttpRequest& user_req = *server_task->get_req();
HttpResponse& user_resp = *server_task->get_resp();
//调用 decode_chunked_body 方法解析 req 注意解析的是 body
std::string http_body = HttpUtil::decode_chunked_body(&user_req);
json j_complete;
std::string m_source;
//此次POST请求若为空,不影响下次的请求
if(http_body.empty()) {
user_resp.set_status_code(&#34;404&#34;);
user_resp.append_output_body(&#34;resp no data&#34;);
} else {
user_resp.set_status_code(&#34;200&#34;);
user_resp.append_output_body(&#34;resp ok&#34;);
try {
j_complete = json::parse(http_body);
log(&#34;[mysql_post][%s][parse(http_body) success]\n&#34;,get_now_time().c_str());
std::cout <<&#34;解析成功====&#34;<<std::setw(4) << j_complete << &#34;\n&#34;;
}
catch(const std::exception& e) {
//数据不为空,但非 json 格式 则 return
log(&#34;[mysql_post][%s][parse(http_body) failed invalid json]\n&#34;,get_now_time().c_str());
user_resp.append_output_body(&#34;invalid json return&#34;);
return ;
}
}
//这里写具体的业务逻辑代码
//创建一个 task
auto *mysql_insert_task = WFTaskFactory::create_mysql_task(mysql_URL, 0, [&](WFMySQLTask *mysql_task) {
mysql_insert_callback(mysql_task, user_resp);
});
mysql_insert_task->get_req()->set_query(mysql_insert_string);
*series_of(server_task) <<mysql_insert_task;
}
3.2 GET
1、前两步和 POST 请求一样,第三步注意的是调用 parse_request_uri 方法来解析 get 的 query ,从而判断相应的参数是否正常获取。
注意如果请求 req 参数捕获为空的情况,调用 parse_request_uri 还是会返回 0(这是目前框架的逻辑),所以,如果需要判断 req 参数捕获为空的情况。
GET demo 参考如下:
//请求参数缺失 默认正常的请求,此次return 但不会影响下一次请求
json ret_all;
std::map<std::string,std::string>query_map;
try {
if(uri.query != NULL ) {
query_map = URIParser::split_query(uri.query);
}
}
catch(const std::exception& e) {
log(&#34;[mysql_get][%s]URIParser::split_query error 请求参数不合法或缺失 %s\n&#34;,get_now_time().c_str(), e.what());
ret_all[&#34;condition&#34;]= &#34;200&#34;;
ret_all[&#34;msg&#34;] = &#34;请求参数缺失&#34;;
ret_all[&#34;result&#34;][&#34;list&#34;] = array;
user_resp.append_output_body(ret_all.dump());
return;
}
2、如果要解决跨域问题,加上下面两行就 ok。
//解决跨域问题
user_resp.set_header_pair(&#34;Access-Control-Allow-Origin&#34;, &#34;*&#34;);
user_resp.set_header_pair(&#34;Access-Control-Allow-Methods&#34;, &#34;*&#34;);
3、注意:lambda callback是一个变量、代表一个函数。并不是立即被执行的,所以如果想要利用第一个 task 的值创建第二个 task,可以在第一个的 callback 里去 create 第二个 task,而不是在外面 create 第二个 task ,也就是说可以在 task callback 里创建新的 task 扔到 series 后面接着跑。
比如我想在 这个callback1 里面执行另外一个 task ,也就是 callback2 并传参,可以先通过 lambda 值捕获获取参数的值。
auto *mysql_query_all_task = WFTaskFactory::create_mysql_task(mysql_URL, 0, [&,struct_req_data](WFMySQLTask *mysql_task) {
callback1(mysql_task, user_resp, struct_req_data);
});
mysql_query_all_task->get_req()->set_query(mysql_query_string);
if(!mysql_query_string.empty()) {
*series_of(server_task) << callback1;
}
在 callback1 里面通过 push_back 函数来往后接着插入另外一个task。也就是说:series of 的 push back 就是为了这样嵌套设计的。
这样设计有什么好处?我的理解是用户可以不用关心具体的逻辑,任何任务在 callback 之后被自动 delete。series 里面可以放很多嵌套的的 callback,它们之间的任务互不影响,如果任意一个 callback 需要用到另外一个 callback 的结果,直接 push_back 到当前 callback 的任务后面。相当于整体上是并行任务,并行再拆分之后可以是串行任务。计算任务可以和通信任务一起串联或并联,有各自的调度器来实现调度。
WFMySQLTask *next_mysql_limit_task;
next_mysql_limit_task = WFTaskFactory::create_mysql_task(mysql_URL, 0, [&,m_total_query_sum,struct_req_data](WFMySQLTask *mysql_task) {
mysql_query_callback(mysql_task, user_resp,m_total_query_sum,struct_req_data);
});
next_mysql_limit_task->get_req()->set_query(limit_mysql_query_string);
//series of 的 push back 方法提供了task的这样嵌套设计
if(!limit_mysql_query_string.empty()) {
series_of(mysql_task)->push_back(next_mysql_limit_task);
}
当然,对于我来说,自己负责一个的后端服务需要用到数据库,所以目前使用 workflow 最多的地方还是把它当做 MySQL 客户端来用。
官方目前支持的命令为 COM_QUERY,已经能涵盖用户基本的增删改查、建库删库、建表删表、预处理、使用存储过程和使用事务的需求。基本满足了我现在的全部需求。
并且一些高级的用法比如事务,我还并没有用到。
交互命令中不支持选库(USE命令),所以,如果SQL语句中有涉及到跨库的操作,则可以通过db_name.table_name的方式指定具体哪个库的哪张表。
其他所有命令都可以拼接到一起通过 set_query() 传给WFMySQLTask(包括INSERT/UPDATE/SELECT/PREPARE/CALL)。
举个例子:
req->set_query(&#34;SELECT * FROM app_data WHERE is_show = &#39;true&#39; AND push_source = &#34;手机百度&#34; ORDER BY create_time DESC&#34;);
数据查询成功。

4. 个人心得
其实对于一个初学者来说,面对一个新的框架,我可能更加关注的是,我怎么才能把它快速接入到我的业务,它对我现有的业务能否带来帮助,比如说是否能提高开发的效率、性能是否能带来提升、开发流程上是否比之前更简单、更方便、使用起来是否更加容易上手。
比如说,在创建一个 WFServertask 的时候,后面的参数就很简单,就是一个<HttpRequest, HttpResponse> 怎么方便怎么来,只要你用户自定义的函数直接写进去就行了。 这种对称性还体现在算法是与协议对称的概念、通用算法,例如sort,merge,psort,reduce,可以直接使用,也是一个输入和一个输出:WFThreadTask<algorithm::SortInput,algorithm::SortOutput>;
给人感觉就是都给屏蔽了所有的底层概念,例如IO线程,工作线程,任务队列和超时处理等等。为开发者提供了一个简洁到极致的开发方式,使用起来非常的简单。具体的去实现你的业务逻辑,然后你只要填充它提供接口调用就行了。
总之,对于我这个刚工作一年,第一次接触并使用这种开源级别框架的小白而言,两个字就能完全表达我的感受:好用,很棒。
后续:目前接触到的也只是框架的一小部分功能,因此在这里也只是展示了自己的一些经验和理解,如有不对的地方,欢迎沟通指正。
框架其它的部分设计有待后续深入实践和了解,可以参照官网 demo 加深理解。 |
|