sphinx应用(三)—sphinx分布式, XML数据源, Gearman, UpdateAttributes

sphinx应用(三)—sphinx分布式, XML数据源, Gearman, UpdateAttributes

一.说明

本文涉及到sphinx分布式配置,XML数据源,mysql触发器,任务分发系统gearman以及sphinx属性更改。数据依赖之前写的sphinx应用(一),sphinx应用(二),具体请参见: http://www.ttlsa.com/html/category/os/store-system/sphinx-web-application 同时Gearman的用法参见: http://www.ttlsa.com/html/category/os/distributed-software-systems/distributed-processing-systems/gearman-distributed-processing-systems

如有疑问,烦请更贴说明或加入Q群: 39514058

MVA的属性更改不会刷新到磁盘上。

二.分布式索引

对于大量的数据,分布式索引将会是非常有用的。如果是单一索引,性能将急剧下降,索引查询时间增大,每秒处理数量降低。

同时,如果索引数据大于服务器内存大小,将会导致sphinx无法启动的。

启动分布式索引,将index块type选项设置为distributed。

三.sphinx配置

# vi main_ttlsa.conf //mysql数据源

source ttlsa

{

type = mysql

sql_host = 192.168.1.25

sql_user = root

sql_pass =

sql_db = ttlsa_com

sql_query = SELECT id, title, content, UNIX_TIMESTAMP(publish_date) AS publish_date, author_id ,tags FROM posts

sql_attr_uint = author_id

sql_attr_multi = uint tags from field

sql_attr_timestamp = publish_date

sql_query_info = SELECT id, title, content, publish_date, tags, author_id FROM posts WHERE ID=$id

}

index ttlsa_main //主索引

{

source = ttlsa

path = /data/sphinx/ttlsa/main

docinfo = extern

mlock = 0

morphology = none

html_strip = 0

min_word_len = 1

min_prefix_len = 0

min_infix_len = 1

ngram_len = 0

charset_type = utf-8

charset_table = 0..9, A..Z->a..z, _, a..z, \

U+410..U+42F->U+430..U+44F, U+430..U+44F

ngram_chars = U+3000..U+2FA1F

}

index master //分布式索引

{

type = distributed

local = ttlsa_main

agent = 127.0.0.1:3313:ttlsa_xml

agent_connect_timeout = 1000

agent_query_timeout = 3000

}

indexer

{

mem_limit = 512M

}

searchd

{

listen = 3312

listen = 0.0.0.0:4306:mysql41 //mysql协议接口监听地址和端口

log = /data/sphinx/ttlsa/log/searchd_3312.log

query_log = /data/sphinx/ttlsa/log/query_3312.log

read_timeout = 5

max_children = 200

max_matches = 1000

seamless_rotate = 1

preopen_indexes = 1

unlink_old = 1

max_filter_values = 10000

pid_file = /data/sphinx/ttlsa/log/searchd_3312.pid

mva_updates_pool = 16M

compat_sphinxql_magics = 0

}

# vi ttlsa.pl //构造xml数据

[sourcecode language=”perl“]

use strict;

use XML::Writer;

use Sphinx::Search;

my $sphinx_server=”127.0.0.1″;

my $sphinx_port=”3312″;

my $sph=Sphinx::Search->new();

$sph->SetServer($sphinx_server,$sphinx_port);

$sph->SetConnectTimeout(1);

$sph->SetConnectRetries(3);

$sph->SetSelect(“id”);

$sph->SetSortMode(SPH_SORT_EXTENDED,’@id desc’);

$sph->SetLimits(0,1);

my $results = $sph->Query(“”, ‘ttlsa_main’);

my $max_id=$results->{‘matches’}->[0]->{‘id’};

$sph->Close();

my $writer = XML::Writer->new(DATA_MODE => ‘true’, DATA_INDENT => 2);

$writer->xmlDecl(‘utf-8’);

$writer->startTag(‘sphinx:docset’);

$writer->startTag(‘sphinx:schema’);

$writer->emptyTag(‘sphinx:field’,

‘name’=>’title’);

$writer->emptyTag(‘sphinx:field’,

‘name’=>’content’);

$writer->emptyTag(‘sphinx:attr’,

‘name’=>’publish_date’,

‘type’=>’timestamp’,);

$writer->emptyTag(‘sphinx:attr’,

‘name’=>’author_id’,

‘type’=>’int’,

‘bits’=>’32’,);

$writer->endTag(‘sphinx:schema’);

$writer->startTag(‘sphinx:document’,’id’=>1000);

$writer->startTag(‘title’);

$writer->characters(‘0’);

$writer->endTag(‘title’);

$writer->startTag(‘content’);

$writer->characters(‘0’);

$writer->endTag(‘content’);

$writer->startTag(‘publish_date’);

$writer->characters(‘0’);

$writer->endTag(‘publish_date’);

$writer->startTag(‘author_id’);

$writer->characters(‘0’);

$writer->endTag(‘author_id’);

$writer->endTag(‘sphinx:document’);

$writer->endTag(‘sphinx:docset’);

$writer->end();

[/sourcecode]

# vi xml_ttlsa.conf //XML数据源

source ttlsa_xml

{

type = xmlpipe2

xmlpipe_command = perl /usr/local/coreseek4/etc/ttlsa.xml //XML数据流

}

index ttlsa_xml

{

type = plain

source = ttlsa_xml

path = /data/sphinx/ttlsa/xml

docinfo = extern

mlock = 0

morphology = none

html_strip = 0

min_word_len = 1

min_prefix_len = 0

min_infix_len = 1

ngram_len = 0

charset_type = utf-8

charset_table = 0..9, A..Z->a..z, _, a..z, \

U+410..U+42F->U+430..U+44F, U+430..U+44F

ngram_chars = U+3000..U+2FA1F

}

indexer

{

mem_limit = 512M

}

searchd

{

listen = 3313

listen = 0.0.0.0:9307:mysql41

log = /data/sphinx/ttlsa/log/searchd_3313.log

query_log = /data/sphinx/ttlsa/log/query_3313.log

read_timeout = 5

max_children = 200

max_matches = 1000

seamless_rotate = 1

preopen_indexes = 1

unlink_old = 1

max_filter_values = 10000

pid_file = /data/sphinx/ttlsa/log/searchd_3313.pid

mva_updates_pool = 16M

compat_sphinxql_magics = 0

}

将之前的数据表结构更改下。

mysql> alter table posts add tags text NOT NULL;

mysql> update posts set tags=”0,0″;

建索引

# /usr/local/coreseek4/bin/indexer –config /usr/local/coreseek4/etc/main_ttlsa.conf –all

# /usr/local/coreseek4/bin/indexer –config /usr/local/coreseek4/etc/xml_ttlsa.conf –all

启动服务

# /usr/local/coreseek4/bin/searchd –config /usr/local/coreseek4/etc/main_ttlsa.conf

# /usr/local/coreseek4/bin/searchd –config /usr/local/coreseek4/etc/xml_ttlsa.conf

四.Gearman配置

# vi WORKER_UPDATEATTRIBUTES.pl //执行gearman任务

[sourcecode language=”perl”]</pre>
###################################

### author: www.ttlsa.com ###

### QQ群: 39514058 ###

### E-mail: service@ttlsa.com ###

###################################

use strict;

use XML::Writer;

use Sphinx::Search;

use DBI;

use Class::Date qw (date);

use Gearman::Worker;

use Time::HiRes qw(gettimeofday);

my $sphinx_server=”127.0.0.1″;

my $sphinx_port=”3312″;

my $driver=”DBI:mysql”;

my $host=”192.168.1.25:3306″;

my $dbname=”ttlsa_com”;

my $user=”root”;

my $passwd=””;

my $worker=new Gearman::Worker;

$worker->job_servers(‘192.168.1.60:4731’);

$worker->register_function(UPDAT_EATTRIBUTES=>\&UPDATE_ATTRIBUTES);

$worker->work while 1;

sub UPDATE_ATTRIBUTES{

my $sph=Sphinx::Search->new();

$sph->SetServer($sphinx_server,$sphinx_port);

$sph->SetConnectTimeout(1);

$sph->SetConnectRetries(3);

$sph->SetFilter(‘publish_date’,[0]);

$sph->SetFilter(‘author_id’,[0]);

$sph->SetSelect(‘id’);

$sph->SetSortMode(SPH_SORT_EXTENDED,’@id asc’);

$sph->SetLimits(0,1);

my $start_time=gettimeofday();

my $rt = $sph->Query(“”, ‘master’);

my $min_id = $rt->{‘matches’}->[0]->{‘id’};

my $ct=gettimeofday() – $start_time;

print “查询当前最小document ID($min_id)耗时: $ct\n”;

$sph->Close();

if($min_id){

my $dbh=DBI->connect(“$driver:$dbname:$host”,”$user”,”$passwd”) or die DBI->errstr;

my $sql=”select id,author_id,publish_date,tags from posts where id >=$min_id”;

my $sth=$dbh->prepare($sql);

my $rv=$sth->execute;

my $attrs=[‘author_id’,’publish_date’];

my $values={};

while(my $hash_ref=$sth->fetchrow_hashref){

$values->{$hash_ref->{‘id’}}=[$hash_ref->{‘author_id’},date($hash_ref->{‘publish_date’})->epoch];

}

my $start_time=gettimeofday();

my $num=$sph->UpdateAttributes(‘master’,$attrs,$values);

my $ct=gettimeofday() – $start_time;

print “更改属性耗时: $ct\n”;

if(defined($num)){

print “更改属性成功数量: $num\n\n\n”;

}else{

print “!!!更改属性失败!!!\n\n\n”;

}

}

}

[/sourcecode]

使用MySQL UDFs来调用gearman分布式任务分发系统,具体参见: http://www.ttlsa.com/html/1269.html

如果向数据库插入大量的数据,瞬间将会添加输入数据量的任务。

mysql> SELECT gman_servers_set(“192.168.1.60:4731”) AS gman_server;

+——————-+

| gman_server |

+——————-+

| 192.168.1.60:4731 |

+——————-+

1 row in set (0.11 sec)

mysql> CREATE TRIGGER insert_posts after INSERT ON posts FOR each row SET @RETURN=gman_do_background(‘UPDAT_EATTRIBUTES’,’undef’);

Query OK, 0 rows affected (0.08 sec)

//创建触发器,当向表posts插入数据时候,添加任务。

五.测试

mysql> insert into posts values (”,’xxx’,’xxxxxxx’,1, CURRENT_TIMESTAMP(),”2,2″);

gearman任务输出内容如下:

查询当前最小document ID(6)耗时: 0.00577688217163086

更改属性耗时: 0.203788042068481

更改属性成功数量: 1

# mysql -h127.0.0.1 -P4306

mysql> show tables;

+————+————-+

| Index | Type |

+————+————-+

| master | distributed |

| ttlsa_main | local |

+————+————-+

mysql udfs

上面我提到过,使用触发器来添加任务,若插入大量的数据的话,瞬间将添加插入的数据量的任务,届时将阻塞之后的任务执行。下面的脚本是插入大量的数据,大家可以测测。

任务数量可以通过一下命名查看

# telnet 192.168.1.60 4731

status

# vi insert_data.php

[sourcecode language=”php”]

<?php

/*

###################################

### author: www.ttlsa.com ###

### QQ群: 39514058 ###

### E-mail: service@ttlsa.com ###

###################################

*/

$host=”192.168.1.25″;

$user=”root”;

$passwd=””;

$dbname=”ttlsa_com”;

$conn=mysql_connect($host,$user,$passwd) or die(mysql_error());

mysql_select_db($dbname) or die(mysql_error());

$i=1;

$count=1000;

$val=array();

while($i<=$count){

$publish_date=date(“Y-m-d G:i:s”);

$author_id=$i;

$tags=$i.”,”.$i;

$title=$i;

$content=str_pad($i,10,0,STR_PAD_LEFT);

$val[].=”(‘{$publish_date}’,'{$author_id}’,'{$tags}’,'{$title}’,'{$content}’)”;

$i++;

}

$st=microtime(true);

$sql=” insert into posts (publish_date,author_id,tags,title,content) values ” . implode(“,”,$val);

#mysql_query($sql,$conn) or die(mysql_error());

$num=mysql_affected_rows($conn);

$ct=microtime(true) – $st;

print “$sql\n”;

print “执行耗时: $ct\n”;

print “插入行数: $num\n”;

print “所耗内存: “.memory_get_usage(true).”kb\n”;

print “———————————–\n\n\n”;

[/sourcecode]

留下两个问题共大家思索: 1. 如果是delete操作呢?索引又该如何操作? 2. 如果是update操作呢?索引又该如何操作?