再犯爆菊

作为一个天资不高、记忆力不好、视力偏低、文笔太糙、不善言词、反应迟钝的码农,有必要把所有遇到的问题记录下来,防止屡犯同样的错误(注:该文持续更新)。

1、php操作同一台主机的多个数据库

运行上面的代码会发现特么的在第5行居然报db2.table1表找不到,特么的明明$link1使用的db1,为何要去db2里找table1哩?这是因为php的mysql为了避免不停的创建mysql链接会影响性能,对于相同的host、user、password只会有一个链接,而且一个链接只能操作一个数据库(如果你要不停的使用mysql_select_db来切换数据库,那么我表示这段话对你来说是多余的),所以上面的代码无论是$link1还是$link2所使用的都是同一个mysql链接,为了解决问题我们需要创建两个不同的链接,于是mysql_connect提供了第4个参数new_link,至于参数说明就去看文档吧,这里咱把第4个参数传入true,问题解决鸟~~~~

2、mongodb的自增操作只能针对数字类型?

执行上面的代码,特么的居然报错:Cannot apply $inc modifier to non-number, 因为mongodb的$inc操作只能应用于数字,所以不好使啊!可是为什么他不尝试转换一下哩?mysql的varchar还能自增呢!!操操操…..

Read More

给你的代码《约法四章》:基本功能、错误处理、智能纠错、日志收集

作为21世纪的码农,如果只是想混混工资的话,这篇文章可能不太适合你,但如果想在技术上有所提升,本文也许能帮你照亮前进的道路。程序员不能只是天天埋头写代码,还得关心代码的质量,其实写代码很容易,稍微用心点学都能写,但是要产出好的代码却不是那么容易事,所谓好的代码不是说你完成了某某功能就牛B,在完成了基本功能的同时还有很细节要注意,在各种关于代码质量的书中都的提及到了这样的概念:代码格式、命名规则、功能注释等,虽然这些东西看上去并不能主宰软件的“生与死”,但是谁又能保证一个软件从出生到结束都由一个人维护呢?关于代码规范这个话题我不打算列入本文的内容,原因有二,其一是很多书中都的提及到,我就不必重申;其二是很多程序员其实也很注重这个问题,只是注释多少的问题。那么就标题中的四点来给我们的代码约法四章,让我们这些码农的代码更健壮。

首先是基本功能,这个我不多说大家也应该懂了,程序员就是要完成各种产品提出的需求,如果基本功能都完不成,我们后面一切都是空谈。就好比是建房子,连地基都还没有开始搞,就在想我要买什么家具能让房间更漂亮。鸟虽然能飞,但它也是先学会走才学飞的。所以这一点大多数的程序员都能做到,我也就此罢休,也许这一段是多余的,呵呵。

其次是错误处理,错误处理这个东西如果从程序员的角度去看好像不觉得他有多神秘,只要稍加注意,些事就能避免,但试问有多少人真的注意了?这里举个大家经常看到的例子,字符串拷贝函数strcpy:


char *strcpy(char *dst, const char *src)

{
char *addr = dst;

assert((dst != NULL) && (src != NULL));

while((*dst++ = *src++) != '\0');

return addr;

}

就是这么如此简单的代码, 有多少人会加上断言进行NULL判断?据说这段代码考到了无数牛人,因为在考这段代码的时候不是在考你会不会字符串拷贝,而是在考你是否考虑了错误处理。很多代码都是因为没有错误处理或者处理不当导致程序崩溃,想想其实挺不划算的,花了大把时间把主要功能都实现了,结果因为没有考虑这些看似不起眼的错误导致程序不能稳定运行,对于客户来说你这就是没有完成工作,而对于同行来说,你这就不是合格的码农。其实大家在编写代码时候多考虑一点,程序就更健壮一点。记得在网上有很多文章有这么一说:不要相信任何用户的输入,想想还是非常有道理。

然后是智能纠错,对于大多数web开发人员来说,列表翻页是相当常见的功能,但就是这么一个看似简单而又常见的功能,却隐藏了该文很重视的一个智能纠错的问题。假设page参数为列表的页码,如果page小于等于1,你的代码是否能正确的读取第一页的数据?如果page大于最大页数,你的代码是否还在查询数据库列表?当你看到这里,你不防试试打开你的代码,看你是否真的把这些都做完善了?其实这里只是引用了一个简单的实例来说明智能纠错的重要性,在程序中还有很多地方需要类似的纠错,如果你发现了,请完善他。

最后是日志收集,其实能做到上面两点,我挺羡慕你的,因为到目前,我也算不上合格的码农,代码经常出BUG,虽然说BUG对于程序员来说很常见,但是一有BUG心里总归不爽,不爽又能怎么着?还得找问题继续改之。如果能很快定位问题的根源,那么你是幸运的,能很快解决他。但是如果某一天,你的客户告诉你程序中断了,而你也不知道程序错在哪里,那么你就杯具了,慢慢找吧。真的需要要一步一步来找吗?太浪费时间了,不防在你在程序中添加日志收集,在你觉得可能出现错误地方都做上日志标记,在以后程序崩溃后,至少能通过日志定位程序中断时大概的运行位置,你又一次成了幸运儿。

本文就这么结束了,希望对各位有所帮助。最近也是看到自己的代码各种BUG,也有了很多感触,希望能写出本文能加深自己对程序建壮性、稳定性的重视。

Read More

web版五子棋(nodejs + socket.io)

在开始学习程序的时候就想有一天能开发游戏,最近突然越发的起劲了,同时也正好在学习nodejs,于是乎就决定开发一个简单的游戏,其它的话我也就不多说了,简单介绍一下游戏吧,该游戏有服务端和客户端,服务端用nodejs + socket.io开发,客户端由js开发,类似QQ游戏的五子棋,有大厅、用户列表、房间、聊天等功能,代码量不多,大家可以研究研究,下面说下环境搭建吧。

首先下载游戏源代码five.zip

其次下载nodejs,并安装之,然后用dos模式进入源代码目录,执行”npm install socket.io”(这是安装socket.io)

接下来就是将client的代码复制到你的网站目录,访问http://127.0.0.1/ws.html

代码比较简单,可能还有些BUG,大家可以发挥自己的想法,继续开发之。。。。。

demo:   http://www.cxphp.com/demo/ws.html

Read More

Redis源代码分析

一直有打算写篇关于redis源代码分析的文章,一直很忙,还好最近公司终于闲了一点,总算有点时间学习了,于是终于可以兑现承诺了,废话就到此吧,开始我们的源代码分析,在文章的开头我们把所有服务端文件列出来,并且标示出其作用:
adlist.c //双向链表
ae.c //事件驱动
ae_epoll.c //epoll接口, linux用
ae_kqueue.c //kqueue接口, freebsd用
ae_select.c //select接口, windows用
anet.c //网络处理
aof.c //处理AOF文件
config.c //配置文件解析
db.c //DB处理
dict.c //hash表
intset.c //转换为数字类型数据
multi.c //事务,多条命令一起打包处理
networking.c //读取、解析和处理客户端命令
object.c //各种对像的创建与销毁,string、list、set、zset、hash
rdb.c //redis数据文件处理
redis.c //程序主要文件
replication.c //数据同步master-slave
sds.c //字符串处理
sort.c //用于list、set、zset排序
t_hash.c //hash类型处理
t_list.c //list类型处理
t_set.c //set类型处理
t_string.c //string类型处理
t_zset.c //zset类型处理
ziplist.c //节省内存方式的list处理
zipmap.c //节省内存方式的hash处理
zmalloc.c //内存管理
上面基本是redis最主要的处理文件,部分没有列出来,如VM之类的,就不在这里讲了。
首先我们来回顾一下redis的一些基本知识:
1、redis有N个DB(默认为16个DB),并且每个db有一个hash表负责存放key,同一个DB不能有相同的KEY,但是不同的DB可以相同的KEY;
2、支持的几种数据类型:string、hash、list、set、zset;
3、redis可以使用aof来保存写操作日志(也可以使用快照方式保存数据文件)

对于数据类型在这里简单的介绍一下(网上有图,下面我贴上图片可能更容易理解)
1、对于一个string对像,直接存储内容;
2、对于一个hash对像,当成员数量少于512的时候使用zipmap(一种很省内存的方式实现hash table),反之使用hash表(key存储成员名,value存储成员数据);
3、对于一个list对像,当成员数量少于512的时候使用ziplist(一种很省内存的方式实现list),反之使用双向链表(list);
4、对于一个set对像,使用hash表(key存储数据,内容为空)
5、对于一个zset对像,使用跳表(skip list),关于跳表的相关内容可以查看本blog的跳表学习笔记;

下面正式进入源代码的分析
1、首先是初始化配置,initServerConfig(redis.c:759)
void initServerConfig() {
server.port = REDIS_SERVERPORT;
server.bindaddr = NULL;
server.unixsocket = NULL;
server.ipfd = -1;
server.sofd = -1;
2.在初始化配置中调用了populateCommandTable(redis.c:925)函数,该函数的目地是将命令集分布到一个hash table中,大家可以看到每一个命令都对应一个处理函数,因为redis支持的命令集还是蛮多,所以如果要靠if分支来做命令处理的话即繁琐效率还底,因此放到hash table中,在理想的情况下只需一次就能定位命令的处理函数。
void populateCommandTable(void) {
int j;
int numcommands = sizeof(readonlyCommandTable)/sizeof(struct redisCommand);

for (j = 0; j < numcommands; j++) {
struct redisCommand *c = readonlyCommandTable+j;
int retval;

retval = dictAdd(server.commands, sdsnew(c->name), c);
assert(retval == DICT_OK);
}
}

3、对参数的解析,redis-server有一个参数(可以不需要),这个参数是指定配置文件路径,然后由函数loadServerConfig(config.c:28)加载所有配置
if (argc == 2) {
if (strcmp(argv[1], “-v”) == 0 ||
strcmp(argv[1], “–version”) == 0) version();
if (strcmp(argv[1], “–help”) == 0) usage();
resetServerSaveParams();
loadServerConfig(argv[1]);

4、初始化服务器initServer(redis.c:836), 该函数初始化一些服务器信息,包括创建事件处理对像、db、socket、客户端链表、公共字符串等。
void initServer() {
int j;

signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();//设置信号处理

if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
5、在上面初始化服务器中有一段代码是创建事件驱动,aeCreateTimeEvent是创建一个定时器,下面创建的定时器将会每毫秒调用serverCron函数,而aeCreateFileEvent是创建网络事件驱动,当server.ipfd和server.sofd有数据可读的情况将会分别调用函数acceptTcpHandler和acceptUnixHandler。
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) oom(“creating file event”);
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) oom(“creating file event”);

6、接下来就是初始化数据,如果开启了AOF,那么会调用loadAppendOnlyFile(aof.c:216)去加载AOF文件,在AOF文件中存放了客户端的命令,函数将数据读取出来然后依次去调用命令集去处理,当AOF文件很大的时候势必为影响客户端的请求,所以每处理1000条命令就会去尝试接受和处理客户端的请求,其代码在aof.c第250行; 但是如果没有开启AOF并且有rdb的情况,会调用rdbLoad(redis.c:873)尝试去加载rdb文件,理所当然的在加载rdb文件的内部也会考虑文件太大而影响客户端请求,所以跟AOF一样,每处理1000条也会尝试去接受和处理客户端请求。

7、当所有初始化工作做完之后,服务端就开始正式工作了
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);

8、大家都知道redis是单线程模式,所有的请求、处理都是在同一个线程里面进行,也就是一个无限循环,在这个无限循环的内部有两件事要做,第一件就是调用通过aeSetBeforeSleepProc函数设置的回调函数,第二件就是开始接受客户端的请求和处理,所以我们可以在第7节看到设置了回调函数为beforeSleep,但是这个beforeSleep到底有什么作用呢?我们在第9节再详细讲述。对于aeMain(ae.c:375)就是整个程序的主要循环。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
9、在beforeSleep内部一共有三部分,第一部分对vm进行处理(即第一个if块),这里我们略过;第二部分是释放客户端的阻塞操作,在redis里有两个命令BLPOP和BRPOP需要使用这些操作(弹出列表头或者尾,实现方式见t_list.c:862行的blockingPopGenericCommand函数),当指定的key不存在或者列表为空的情况下,那么客户端会一直阻塞,直到列表有数据时,服务端就会去执行lpop或者rpop并返回给客户端,那么什么时候需要用到BLPOP和BRPOP呢?大家平时肯定用redis做过队列,最常见的处理方式就是使用llen去判断队列有没有数据,如果有数据就去取N条,然后处理,如果没有就sleep(3),然后继续循环,其实这里就可以使用BLPOP或者BRPOP来轻松实现,而且可以减少请求,具体怎么实现留给大家思考;第三部分就是flushAppendOnlyFile(aof.c:60),这个函数主要目的是将aofbuf的数据写到文件,那aofbuf是什么呢?他是AOF的一个缓冲区,所以客户端的命令都会在处理完后把这些命令追加到这个缓冲区中,然后待一轮数据处理完之后统一写到文件(所以aof也是不能100%保证数据不丢失的,因为如果当redis正在处理这些命令的情况下服务就挂掉,那么这部分的数据是没有保存到硬盘的),大家都知道写数据到文件并不是立即写到硬盘,只是保存到一个文件缓冲区中,什么情况下会把缓冲区的数据转到硬盘呢?只要满足如下三种条件的一种就能将数据真正存到硬盘:1、手动调用刷新缓冲区;2、缓冲区已满;3、程序正常退出。因此redis将数据写到文件缓冲区之后会判断是否需要刷到硬盘,server.appendfsync有两种方式,第一种(APPENDFSYNC_ALWAYS):无条件刷新,即每次写文件都会保存到硬盘,第二种(APPENDFSYNC_EVERYSEC):每隔一秒保存到硬盘。

10、接下来我们开始讲解aeProcessEvents(ae.c:275)的处理流程,首先我们来回顾一下第5节设置的定时器和监听socket事件处理,其中socket事件处理会回调acceptTcpHandler(networking.c:410)和定时器回调函数serverCron(redis.c:519),在aeProcessEvents的内部有两部分需要处理,第一部分是调用aeApiPoll判断socket是否有数据可读,整个服务端的socket里面要分监听socket和客户端socket,当有客户端链接服务器时,会触发监听socket的事件处理函数,也就是acceptTcpHandler,而acceptTcpHandler会去调用createClient(networking.c:13)创建客户端对像,然后为这个客户端设置事件处理函数readQueryFromClient(networking.c:827),所以当客户端有消息时就会触发客户端socket 事件处理函数,处理数据部分讲在后面详细讲解,接下来的第二部分就是定时器,每次在socket部分处理完后就用调用processTimeEvents(ae.c:212)来处理定时器,那么内部实现也很简单,当设置定时器的时候就会计算好应该触发的时间,所以这里就只需要判断当前时间是否大于或者等于应该触发的时间即可。那么这个定时器到底做了什么呢?请继续第11节。

11、我们继续跟踪源代码serverCron(redis.c:519),整个函数分为七个部分,第一部分:在服务端打印一些关于DB的信息(包括key数量,内存使用量等);第二部分:判断DB的hash table是否需要扩展大小tryResizeHashTables(redis.c:432);第三部分:关闭太长时间没有通信的链接closeTimedoutClients(networking.c:629);第四部分:保存rdb文件rdbSaveBackground(rdb.c:507),当然也是在需要保存的情况才会保存,即设置save参数;第五部分:清除过期的key,当然这里不是清除全部,他只是随机取出一些activeExpireCycle(redic.c:477);第六部分:虚拟内存交换部分,将一部分key转到虚拟内存中,这里的key也是随机抽取的, vmSwapOneObjectBlocking(vm.c:521);第七部分:主从同步,replicationCron(replication.c:500)。

12、在第10节中我们讲到客户端socket处理函数readQueryFromClient,这里我们一层层分析,首先是从客户端读取数据,然后调用processInputBuffer,在内部先是判断类型,然后调用processInlineBuffer或者processMultibulkBuffer解析参数,解析后的参数由argv存储参数,其类型是一个指向指针的指针,其中argv[0]是命令名称,后面就是命令参数,argc存储参数数量;然后调用processCommand(redis.c:979)处理命令,在内部调用lookupCommand(redis.c:940)获取命令对应的函数,然后调用freeMemoryIfNeeded(redis.c:1385)判断是否需要释放一些内存,接下来就是调用call(redis.c:954)去执行命令,执行命令后会调用feedAppendOnlyFile(aof.c:137)把命令行保存到aofbuf中,然后判断是否需要同步数据到slave,如果需要则调用replicationFeedSlaves(replication.c:10),接下来就是判断是否需要将数据发送到监控端,如果需要则调用replicationFeedMonitors(replication.c:82),到这里整个服务流程就结束了。至于每条命令如何执行,大家可以去查看以t_开头的几个文件。下面是一张整个服务的流程图。

注:redis源代码为2.2.8,希望大家看文章的时候配合源代码一起看,更容易理解。。

Read More

php多线程扩展

最近在网上和群里都看到很多朋友在讨论php的多线程,也有说不能实现,也有人说没必要,其实php用C语言写出来,只要c能办的,他基本都能办到,不过如果只是用来处理简单的页面是没有必要去做多线程,一般都是在后端程序的时候来用,比如爬虫、或者是刷投票等其它的程序可能需要用到。所以就来写了一个PHP多线程扩展,代码也不多,也没有考虑太多因素,只是简单的创建线程和退出线程,为了防止创建线程过多对服务器性能造成影响,那么最大支持当前cpu个数X2个线程,如果要创建一个线程代码如下:

mthread_init();
$handle1 = mthread_create('test', "hello fuction test\n");
$handle2 = mthread_create(array(A, 'test'), "hello A::test\n");
while(1){
//mthread_exit($handle1);//可以使用mthread_create返回的句柄退出这个线程
sleep(1);
}
function test($str){
while(1){
echo $str;
sleep(1);
}
}
class A{
function test($str)
{
echo $str;
sleep(1);
}
}

很简单的实现了PHP多线程。此代码仅供娱乐和学习,目前也只支持linux,稍后会慢慢完善,会加上跨平台、互斥等功能。
mthread.tar

Read More

跳表(skiplist)学习笔记

这段时间公司做了个项目,在项目里面用到了redis,爱学习的我又一次翻出了redis的源代码,从头开始看了起来(没看完,哈哈~~),在这一次阅读中把redis的五大数据结构的代码给看了一遍,对于我这么一个菜鸟来说收获颇多,比如他的hash,list结构都没有使用我们平常使用的双向链表,而是使用一种非常节省内存的方式来实现了链表(zipmap和ziplist),还有zset就使用了跳表,其实levelDB也有使用跳表。

Skip List是在有序List(链表)数据结构的基础上的扩展,解决了有序链表结构查找特定值困难的问题,使用Skip List,可以使得在一个有序链表里查找特定值的时间复杂度为O(logn),他是一种可以代替平衡树的数据结构。Skip lists应用概率保证平衡,平衡树采用严格的旋转(比如平衡二叉树有左旋右旋)来保证平衡,因此Skip list比较容易实现,而且相比平衡树有着较高的运行效率。 其实现原理是链表中每一个元素都有N层(N为随机数,并且1<=N

#include <stdio.h>
#include <stdlib.h>
#include <malloc.h>
 
typedef int key_t;
typedef int value_t;
typedef struct node_t
{
    key_t key;
    value_t value;
    struct node_t *forward[];
} node_t;
 
typedef struct skiplist
{
    int level;
    int length;
    node_t *header;
} list_t;
 
#define MAX_LEVEL   16
#define SKIPLIST_P  0.25
 
node_t* slCreateNode(int level, key_t key, value_t value)
{
    node_t *n = (node_t *)malloc(sizeof(node_t) + level * sizeof(node_t*));
    if(n == NULL) return NULL;
    n->key = key;
    n->value = value;
    return n;
}
 
list_t* slCreate(void)
{
    list_t *l = (list_t *)malloc(sizeof(list_t));
    int i = 0;
    if(l == NULL) return NULL;
 
    l->length = 0;
    l->level = 0;
    l->header = slCreateNode(MAX_LEVEL - 1, 0, 0);
    for(i = 0; i < MAX_LEVEL; i++)
    {
        l->header->forward[i] = NULL;
    }
    return l;
}
 
int randomLevel(void)
{
    int level = 1;
    while ((rand()&0xFFFF) < (SKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<MAX_LEVEL) ? level : MAX_LEVEL;
}
 
value_t* slSearch(list_t *list, key_t key)
{
    node_t *p = list->header;
    int i;
 
    for(i = list->level - 1; i >= 0; i--)
    {
        while(p->forward[i] && (p->forward[i]->key <= key)){
            if(p->forward[i]->key == key){
                return &p->forward[i]->value;
            }
            p = p->forward[i];
        }
    }
    return NULL;
}
 
int slDelete(list_t *list, key_t key)
{
    node_t *update[MAX_LEVEL];
    node_t *p = list->header;
    node_t *last = NULL;
    int i = 0;
 
    for(i = list->level - 1; i >= 0; i--){
        while((last = p->forward[i]) && (last->key < key)){
            p = last;
        }
        update[i] = p;
    }
 
    if(last && last->key == key){
        for(i = 0; i < list->level; i++){
            if(update[i]->forward[i] == last){
                update[i]->forward[i] = last->forward[i];
            }
        }
        free(last);
        for(i = list->level - 1; i >= 0; i--){
            if(list->header->forward[i] == NULL){
                list->level--;
            }
        }
        list->length--;
    }else{
        return -1;
    }
 
    return 0;
}
 
int slInsert(list_t *list, key_t key, value_t value)
{
    node_t *update[MAX_LEVEL];
    node_t *p, *node = NULL;
    int level, i;
 
    p = list->header;
    for(i = list->level - 1; i >= 0; i--){
        while((node = p->forward[i]) && (node->key < key)){
            p = node;
        }
        update[i] = p;
    }
    if(node && node->key == key){
        node->value = value;
        return 0;
    }
 
    level = randomLevel();
    if (level > list->level)
    {
        for(i = list->level; i < level; i++){
            update[i] = list->header;
        }
        list->level = level;
    }
 
    node = slCreateNode(level, key, value);
    for(i = 0; i < level; i++){
        node->forward[i] = update[i]->forward[i];
        update[i]->forward[i] = node;
    }
    list->length++;
    return 0;
}
 
int main(int argc, char **argv)
{
    list_t *list = slCreate();
    node_t *p = NULL;
    value_t *val = NULL;
 
    //插入
    for(int i = 1; i <= 15; i++){
        slInsert(list, i, i*10);
    }
 
    //删除
    if(slDelete(list, 12) == -1){
        printf("delete:not found\n");
    }else{
        printf("delete:delete success\n");
    }
 
    //查找
    val = slSearch(list, 1);
    if(val == NULL){
        printf("search:not found\n");
    }else{
        printf("search:%d\n", *val);
    }
 
    //遍历
    p = list->header->forward[0];
    for(int i = 0; i < list->length; i++){
        printf("%d,%d\n", p->key, p->value);
        p = p->forward[0];
    }
 
    getchar();
    return 0;
}

Read More

linux网络编程之epoll学习

在文章的开头先介绍一下何为epoll,epoll是linux的网络通信IO模型,是poll/select的增强版,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。

作为一款高效的IO模型,它的优点当然是我们最值得关注的;首先它不像select那样对socket描述符有数量限制,epoll所支持的FD上限是最大可以打开文件的数目,也就是说如何你有1G的内存,那么epoll就可以打开10万个FD,所以跟物理内存大小有很大关系;其次就是它不会像select/poll那样拥有一个很大的socket集合,在这集合中的任何一个socket被激活都将扫描整个集合,这对于一个高性能服务器来说效率将大打折扣。

另外讲一下epoll的两种工作模式(LT和ET),这里就简单通俗的说一下,具体的可以到网上搜索,LT的工作模式是当内核通知你某个fd能进行IO操作的时候,如果你不进行处理,那么内核还是会继续通知你,直到这个fd没有数据可读或者可写;而ET的工作模式则是内核只会通知你一次某个fd能进行IO操作,也就是说,当内核通知你某个fd可以进行操作的时候,你不处理,内核将不再另行通知。那么在epoll中LT的工作模式是默认的,我们可以通过修改链接的events属性来变更处理模式为ET(代码:ev.events|= EPOLLET);还有注意一下ET的工作模式只支持非阻塞。

其实epoll的操作非常简单,他提供了三个函数来对epoll进行操作,即epoll_create, epoll_ctl和epoll_wait;其中epoll_create用来创建一个epoll文件描述符,epoll_ctl用来添加/修改/删除需要侦听的文件描述符及其事件,epoll_wait接收发生在被侦听的描述符上用户感兴趣的IO事件。

上面介绍了这么多,下面帖出简单的epoll事件封装代码供大家学习和研究,有不正确的地方希望指出来。

socket.h

#ifndef SOCKET_H
#define    SOCKET_H

#include <sys/socket.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <fcntl.h>

#define EV_READ 1
#define EV_WRITE 2
#define EV_SIZE (1024*10)

typedef void procEvent(int fd, void *argv);

typedef struct fileEvent
{
procEvent *readProc;
procEvent *writeProc;
void *argv;
} FileEvent;

typedef struct FiredEvent {
int fd;
int mask;
} FiredEvent;

typedef struct _event
{
FileEvent fe[EV_SIZE];
struct epoll_event events[EV_SIZE];
FiredEvent fired[EV_SIZE];
int epfd;
unsigned char running;
} EventLoop;

int setnonblocking(int fd);
int createSocket(char *host, unsigned short port);
int createEvent(EventLoop *ev, int fd, short mask, procEvent *proc, void *argv);
int deleteEvent(EventLoop *ev, int fd, short mask);
int dispatchEvent(EventLoop *ev);
void stopEvent(EventLoop *ev);
EventLoop *initEvent();

#endif    /* SOCKET_H */

socket.c

#include <stdio.h>
#include "socket.h"

int setnonblocking(int fd)
{
if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) == -1)
{
return -1;
}
return 0;
}

int createSocket(char *host, unsigned short port)
{
struct sockaddr_in addr;
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1)
{
return -1;
}

if(setnonblocking(fd) == -1)
{
close(fd);
return -1;
}

addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(host);
if(bind(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr)) == -1)
{
close(fd);
return -1;
}

if (listen(fd, 512) == -1) {
close(fd);
return -1;
}

return fd;
}

EventLoop *initEvent()
{
EventLoop *ev = malloc(sizeof(EventLoop));
int i;

ev->epfd = epoll_create(1024);
if(ev->epfd == -1){
free(ev);
return NULL;
}

for(i = 0; i < EV_SIZE; i++)
{
ev->events[i].events = 0;
}

return ev;
}

int createEvent(EventLoop *ev, int fd, short mask, procEvent *proc, void *argv)
{
struct epoll_event ee;

ee.events = 0;
ee.data.fd = fd;
ev->fe[fd].argv = argv;

if(mask & EV_READ)
{
ee.events|= EPOLLIN;
ev->fe[fd].readProc = proc;
}

if(mask & EV_WRITE)
{
ee.events|= EPOLLOUT;
ev->fe[fd].writeProc = proc;
}

if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, fd, &ee) == -1) return -1;

return 0;
}

int deleteEvent(EventLoop *ev, int fd, short mask)
{
struct epoll_event ee;
ee.events = 0;
ee.data.fd = fd;

if(mask & EV_READ)
{
ee.events|= EPOLLIN;
ev->fe[fd].readProc = NULL;
}

if(mask & EV_WRITE)
{
ee.events|= EPOLLOUT;
ev->fe[fd].writeProc = NULL;
}

if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, fd, &ee) == -1) return -1;

return 0;
}

int dispatchEvent(EventLoop *ev)
{
int nfds, i;
ev->running = 1;
while(ev->running)
{
nfds = epoll_wait(ev->epfd, ev->events, EV_SIZE, -1);
for (i = 0; i < nfds; i++)
{
struct epoll_event *e = ev->events + i;
int mask = 0;

if(e->events & EPOLLIN) mask|= EV_READ;
if(e->events & EPOLLOUT) mask|= EV_WRITE;
ev->fired[i].fd = e->data.fd;
ev->fired[i].mask = mask;
}

for(i = 0; i < nfds; i++)
{
int mask = ev->fired[i].mask;
int fd = ev->fired[i].fd;
FileEvent *fe = &ev->fe[fd];

if(mask & EV_READ){
fe->readProc(fd, fe->argv);
}

if(mask & EV_WRITE){
fe->writeProc(fd, fe->argv);
}
}
}
return 0;
}

void stopEvent(EventLoop *ev)
{
ev->running = 0;
}

main.c

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include "socket.h"

EventLoop *base;

void onRead(int fd, void *argv)
{
char buf[MAX_BUFFER_SIZE] = {0};
int nread = 0;
nread = read(fd, buf, MAX_BUFFER_SIZE);
}

void onAccept(int fd, void *argv)
{
struct sockaddr_in addr;
int len = sizeof(addr);
int cfd;

while(1)
{
cfd = accept(fd, (struct sockaddr *)&addr, &len);
if(cfd == -1)
{
if (errno == EINTR) continue;
printf("accept failed\n");
return ;
}
break;
}
setnonblocking(cfd);
printf("new client accept\n");
createEvent(base, cfd, EV_READ, onRead, NULL);
}

void sigHandle(const int sig)
{
printf("sighandled\n");
kill(getpid(), sig);
exit(EXIT_SUCCESS);
}

int main(int argc, char *argv[])
{
int fd;
signal(SIGINT, sigHandle);
signal(SIGKILL, sigHandle);
signal(SIGQUIT, sigHandle);
signal(SIGTERM, sigHandle);
signal(SIGHUP, sigHandle);

if((fd = createSocket("0.0.0.0", 7000)) == -1)
{
perror("create socket failed\n");
exit(EXIT_FAILURE);
}

base= initEvent();

if(base == NULL){
perror("initialize event failed\n");
exit(EXIT_FAILURE);
}
createEvent(base, fd, EV_READ, onAccept, NULL);
dispatchEvent(base);
close(fd);
return 0;
}
Read More

红黑树学习笔记

要学习红黑树,那么首先要知道红黑树到底是个什么东西,其实他的定义在网络到随处都可以搜索到,他是一种平衡二叉搜索树,在这个定义里面引入了三个关键词,一是二叉树,另一个是二叉搜索树,第三个是平衡。我们一层一层分析,

二叉树,他的定义是每个节点最多只有两个子树(即左子树和右子树,当然也可以没有子树),如下图是一个简单的二叉树:

对于这样的数据结构,在C语言中通常这样来定义结构体

typedef struct _binary_tree{
struct _binary_tree *parent;//父节点指针
struct _binary_tree *left;//左子树指针
struct _binary_tree *right;//右子树指针
} BinaryTree;

二叉搜索树,首先他也是一个二叉树,与二叉树不同的是他加了一个属性值来进行排序,这个属性要满足两个条件,1、每个节点的左子树的值均小于该节点的值(在有左子树的情况下);2、每个节点的右子树的值均大于该节点的值(在有右子树的情况下),如下图是简单的二叉搜索树:

对于二叉搜索树的数据结构,在C语言中一般这样定义结构体:

typdef struct _binary_search_tree{
struct _binary_search_tree *parent;//父节点指针
struct _binary_search_tree *left;//左子树指针
struct _binary_search_tree *right;//右子树指针
int key;//用来排序的属性
} BinarySearchTree;

平衡,上面讲的两个都是数据结构,而这个不是数据结构,是一种衡量性能的指标,为什么这样说呢?因为你越是让这个树平衡,那么他的效率就越高,二分查找法就是利用这一点来达到高效的目地,那么如何让一颗红黑树平衡呢?这里先不讲如何让他平衡,我先告诉你平衡的定义,平衡即是左右子树的深度之差的绝对值不大于1,对于上面的二叉搜索树,他不满足平衡,因为左子树的深度是2,右子树的深度是4,他们之间的绝对值大于1,所以不平衡。

红黑树,终于到了本篇文章的主题,红黑树在涵盖了二叉树和二叉搜索树的内容之后,还加了一个颜色属性,这个颜色的目地就是为了让这个二叉搜索树更平衡(看清楚这里我用的是更平衡,并不是绝对平衡,因为红黑树是不能保证树100%平衡,只能说是尽可能的平衡,在这里千万别钻牛角尖),从而达到高效的目地。对于添加了这么一个颜色之后,红黑树又产生了新的条件,这产生的新条件就是红黑树的五大性质:1、节点颜色必须是红或者黑;2、根节点必须是黑色;3、每个叶子节点是黑色;4、每个红色节点的两个子节点都是黑色;5、从任一节点到其每个叶子的所有路径都包含相同数目的黑色节点。在这里有很多初学者就很迷惑,这个性质到底什么意思?我要如何做才能保证红黑树的性质不被破坏?其实这里你只需要记住这五大性质只是用来判断一颗树是否是红黑树,至于要怎么做,这才是我们接下来要讲的重点。当在一颗红黑树上进行添加和删除操作后,有可能会破坏红黑的五大性质,那破坏了性质我们又该如何处理呢?对于每一种可能发生的情况,设计红黑树的作者都给出了相应的解决方案,对于开发人员而言,你只需要将每一种可能发生的情况对应的解决方案转换为代码即可。下面分别对添加和删除后可能发生的情况给出了解决方案。

添加节点后产生的5个情况如下
情况1:当父节点与叔父节点都是红色
解决方案:把祖父节点设为红色,父节点与叔父节点设为黑色,然后从祖父节点开始向上继续修复

情况2:当父节点是红色,父节点是祖父节点的左孩子,并且当前节点是父节点的右孩子
解决方案:使用父节点为轴进行左旋转,然后跳至情况3

情况3:父节点是红色,父节点是祖父节点的左孩子,并且当前节点是父节点的左孩子
解决方案:父节点设为黑,祖父节点设置红色,然后以祖父节点为轴进行右旋转

情况4:当父节点是红色,父节点是祖父节点的右孩子,并且当前节点是父节点的左孩子
解决方案:使用父节点为轴进行右旋转,然后跳至情况5

情况5:当父节点是红色,父节点是祖父节点的右孩子,并且当前节点是父节点的右孩子
解决方案:父节点设成黑色,祖父节点设为红色,以祖父节点为轴进行转旋转

删除节点后产生的情况如下
在删除一个节点后,如果该节点的左、右子树都存在,那么用他右子树中最小的节点来替代这个被删除的节点,如果该节点只有左子树,或者只有右子树,那么就用他的左节点或者右节点来代替这个节点,所以在以下描述中的当前节点是表示一个被替换的新节点,并非被删除的那个节点,切记,切记。
情况1:当前节点是父节点的左孩子,兄弟节点为红色
解决方案:父节点染红,兄弟节点为黑,然后以父母节点为轴进行左旋转,然后重新获取兄弟节点

情况2:当前节点是父节点的左孩子,兄弟节点的左、右孩子为黑色
解决方案:兄弟节点染红,当前节点指向父节点,继续修复

情况3:当前节点是父节点的左孩子,兄弟节点的右孩子是黑色
解决方案:把兄弟节点的左孩子染为黑色,兄弟节点染为红色,以兄弟节点为轴进行右旋转,重新获取兄弟节点,然后跳转到情况4

情况4:当前节点是父节点的左孩子,兄弟节点的左、右孩子其中一个为红
解决方案:兄弟节点颜色为父节点颜色,父节点为黑色,兄弟节点右孩子为黑色,然后以父节点为轴进行左旋转

情况5:当前节点是父节点的右孩子,兄弟节点为红色
解决方案:兄弟节点染黑,父节点染红,以父节点为轴右旋转

情况6:同情况2

情况7:当前节点是父节点的右孩子,兄弟节点的左孩子为黑色
解决方案:兄弟节点的右孩子染黑,兄弟节点染红,以兄弟节点为轴进行左旋转,重新获取兄弟节点然后跳至情况8

情况8:当前节点是父节点的右孩子,兄弟节点的左、右孩子其中一个为红
解决方案:兄弟节点颜色染成父节点颜色,父节点为黑色,兄弟节点左孩子为黑色,以父节点为轴进行右旋转

上面的13种情况是红黑树在添加和删除节点时产生的,也配有相应的解决方案。细心的同学会发现,在上面的解决方案中多处用到了左旋转和右旋转,他们是如何实现的呢?其实这个在源代码中也有写,这里我们这样简单的来理解,左旋转即是将中心节点移到他右节点的左节点上,然后用他的右节点替换他的位置(如果中心节点的右节点的左节点已存在,那么将中心节点的右节点的左节点移到中心节点的右节点上);右旋转即是将中心节点移到他左节点的右节点上,然后用他的左节点替换他的位置(如果中心节点的左节点的右节点已存在,那么将中心节点的左节点的右节点移到中心节点的左节点上)。对于上面两个定义,有可能些同学会感觉有点绕口,但是如果根据这个定义你自己去画张图,你会理解的更好。

下面我将把源代码分享出来,里面的代码非本人所写,不过我在里面添加了大量的注释以方便大家学习,就红黑树的学习笔记就分享到这里,如果有问题,请即时与我联系。

点击下载源代码RBTree.tar(代码出自:http://blog.csdn.net/v_JULY_v/archive/2011/01/03/6114226.aspx)

Read More

Memcache源代码分析之数据存储

其它先不说,直接上张图,为什么呢,看图说话,哈哈。。入主题,上篇我们讲到网络处理,这次讲讲数据存储,从上面图上基本可以看到处理流程,不过忽略了一些细节,但里面有两个地方要值得说一下,在图上也做了标注,就是图中的1和2,分别是内存分配和hashtable管理,

1、内存分配,当一条命令的参数解析完之后,就要为之分配其内存空间,要分配的空间大小=key长度+1+后缀长度+内容长度,这里的后缀长度其实是压缩参数和内容长度的ascii所占的字符个数(具体请查看items.c的第81行),得知需要的内存长度之后就得从内存块中去查找空闲的内存,这就是所谓的内存池(什么是内存池技术,本blog里面也有写到),如何查找空闲的内存呢,首先从内存块的中找一块能放下该数据的块(内存块是一个数组,每个元素存放了该块的大小和由多块内存组成的链表),然后从内存块的块链表末尾开始向前循环查找,直到找到一个空闲的块(在这个查询过程中有一个前提就是,如果循环了50次还没找到数据,则视为没有空闲块),如果没有找到空闲的内存块,就重新分配一大块内存,然后挂到这个块链表上。

2、hashtable管理,当数据数量超过表大小的1.5倍,那么memcache会新申请一块原始表2倍的内存作为新表,然后启动一个线程将原来的数据重新hash(key)分配到新的表上;

以上就是简单的数据存储过程,太多细节的东西没有写进去,大家空了也可以去研究研究memcache的源代码。有错误的地方记得提出来。

Read More

Memcache源代码分析之网络处理

memcache是什么说略过,大家都懂。这篇文章主要分析memcache的网络处理,memcache的网络处理是基于libevent的库写的,下面依据源代码来一步一步讲解。

1、初始化event、链接池、内存块等

/* initialize main thread libevent instance */
main_base = event_init(); //memcache.c line:4583

/* initialize other stuff */
stats_init();
assoc_init();
conn_init();
slabs_init(settings.maxbytes, settings.factor, preallocate);

2、 初始化线程,调用函数thread_init(settings.num_threads, main_base),内部实现如下:

a)创建num_threads个线程处理对像(包括读写管道、客户端队列等,thread.c line:608


for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {//创建读写管道
perror("Can't create notify pipe");
exit(1);
}

threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];

//为读取管道绑定事件(当有可读数据时,以事件方式通知thread_libevent_process函数处理),并且初始化客户端队列
setup_thread(&threads[i]);
}

b)创建线程,thread.c line:622

for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);//对创建成功的线程进行计数,同时开始进入事件循环
}

3、创建套接字,memcache.c line:4643

if (settings.port && server_socket(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}

4、在server_socket调用conn_new创建一个链接对像,内部实现如下:

a)更新状态为conn_listening,memcache.c line:400

c->state = init_state;

b)绑定事件,当套接字有可读数据时以事件方式通知event_handler处理(memcache.c line:420)

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);

c)进入主事件循环,memcache.c line:4675

event_base_loop(main_base, 0);

5、当有用户请求时即进行event_handler的conn_listening分支,接受请求后调用dispatch_conn_new函数(memcache.c line:3450),内部实现过程如下:

a)从链接队列中取出一个空闲的对象,(thread.c line:496)

CQ_ITEM *item = cqi_new();

b)以轮询的方式访问线程一个线程,(thread.c line:297)

iint tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;

c)将状态置为conn_new_cmd,(thread.c line:304)

item->init_state = init_state

d)然后将这个对象加入到线程的客户端链表中(thread.c line:309)

cq_push(thread->new_conn_queue, item);

e)然后通知线程处理事件绑定等工作(原理是向线程的写入管道中发送一个字节,告诉线程有客户端来请求了),(thread.c line:312)

if (write(thread->notify_send_fd, "", 1) != 1) {
perror("Writing to thread notify pipe");
}

6、当某一线程的管道有可读数据时,调用thread_libevent_process进行处理,内部实现过程如下:

a)读取一个字节,即取出一个客户端列表,(thread.c line:259)

if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");

item = cq_pop(me->new_conn_queue);//从客户端队列中取出一个对象

b)调用conn_new创建一个链接对像,并将初始状态设为conn_new_cmd,同时为套接定绑定事件(thread.c line:266)

conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);

c)在主线程循环中进入conn_new_cmd分支,(memcache.c line:3494);

case conn_new_cmd:
/* Only process nreqs at a time to avoid starving other
connections */

--nreqs;
if (nreqs >= 0) {
reset_cmd_handler(c);

d)判断该套接字是否还有未处理数据,有未处理数据就更新状态为conn_parse_cmd,没有就更新为conn_waiting,(memcache.c line:2075)

if (c->rbytes > 0) {
conn_set_state(c, conn_parse_cmd);
} else {
conn_set_state(c, conn_waiting);
}

e)当有未处理数据时进入conn_parse_cmd分支,尝试去处理(具体请看第7大步),如果内容不满足一条完整的数据就更新为conn_waiting(memcache.c line:3486)

case conn_parse_cmd :
if (try_read_command(c) == 0) {
/* wee need more data! */
conn_set_state(c, conn_waiting);
}

f)当套接字有数据时,更新状态为conn_read,(memcache.c line:3461)

case conn_waiting:
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}

conn_set_state(c, conn_read);

g)进入conn_read分支以udp或者tcp的方式开始读取数据,当读取数据成功后
更新状态为conn_parse_cmd(即下一次循环跳到第e步开始执行)(memcache.c line:3467)

case conn_read:
res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory */
/* State already set by try_read_network */
break;
}
break;

h)在try_read_network内部,首先是将没有处理掉的数据前置(memcache.c line:3247),然后循环读取客户端发来的所有数据(memcache.c line:3253),这里要详细讲一下客户端数据的存储,对于每个请求的客户端,都会分配数据缓冲区(rbuf)、当前数据处理偏移项(rcurr)、缓冲区长度(rsize)、已占用字节数(rbytes),当有消息时,先看看缓存区能不能存放下,如果不能,那么将数据缓冲区将以2倍的长度开始增长,然后将接受的数据存下;

7、处理消息
a)当有消息时会调用try_read_command来解析命令行,一般情况下是使用ascii传输数据,那么在函数内部会直接跳到memcache.c的3145行,然后判断数据包的完整性,在数据包完整的情况下,会直接调用process_command函数(memcache.c line:3181)

process_command(c, c->rcurr);

b)在process_command内部先是调用tokenize_command函数来计算参数总数量,参数值长度,参数值(memcache.c line:2967)

ntokens = tokenize_command(command, tokens, MAX_TOKENS);

c)根据参数总数量,参数值长度和参数值来解析命令,然后将数据更新到HashTable,包括:get,set,update等(memcache.c line:2968)

if (ntokens >= 3 &&
((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
(strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

process_get_command(c, tokens, ntokens, false);

整个关于tcp处理这块分析完毕,比较简单,有错误之处请回复告之。

Read More