rabbitmq

PHP实现消息队列MQ

1、MQ的应用场景

  优点:

    a)主要解决异步消息

    b)应用解耦

    c)流量消峰等问题

    d)日志处理(kafka)

  缺点:

    a)系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低

    b)系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

1、为什么会造成重复消费?

因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

2、解决重复消费的方案:

(1)比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

(2)再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

(3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

2、消息模型

a)P2P(Point to Point)点对点模式(也就是一个任务只能被一个消费者消费)

1、包含三个角色:消息队列(Queue),发送者(Sender),接受者(Receiver)

PHP实现:

安装rabbitMQ扩展:

在你的项目中添加一个 composer.json文件:

{

     “require”: {

      “php-amqplib/php-amqplib”: “2.6.1”

     }

}

2、简单模式(一对一)

0

php

@File  : sample-send.php

@Author: Liugp

@Date  : 2019/7/22

@Desc  : 生产者

require_once DIR . ‘/vendor/autoload.php’;

use PhpAmqpLib\Connection\AMQPStreamConnection;

use PhpAmqpLib\Message\AMQPMessage;

// 建立AMQP连接

$connection = new AMQPStreamConnection(’localhost’, 5672, ‘guest’, ‘guest’);

$channel    = $connection->channel();

// 定义队列名称

$channel->queue_declare(‘hello’, false, false, false, false);

// 定义要发送的信息

READ MORE

rabbitmq接入PHP

1.安装php-amqplib

php-amqplib是一个纯PHP库,使用它,基于PHP的脚本客户端就可以轻松的连接和操作RabbitMQ。我们使用composer来安装。

composer require php-amqplib/php-amqplib

示例说明

生产者(Producer)和消费者(Consumer)是消息队列的基本概念,生产者是指生产消息的一方,也是消息发送方,消费者就是消费消息的一方,也是消息接收方,队列就是存储消息的一个缓存区。

本实例将由生产者发送很多消息给消息队列,由多个消费者来消费队列中的消息。我们可以想象这样的场景:皮鞋生产打包打包车间,不断有成品鞋进入传送带(消息队列)等待操作工人(消费者)将皮鞋打包。

因为等待打包的鞋子特别多,我们需要安排多个打包工人在传送带两边,及时从传送带取出成品鞋,然后装箱打包。我们要求是要确保工人最后打包好的皮鞋数量一双不少,不能因为打包工人操作慢或者个人原因暂时离开生产线,导致最终打包数不一致。

消息发送

生产者将消息发送给队列,至于谁来消费(处理)这些消息,生产者不管。

消息队列(MQ),用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

消息到达队列中后,如果没有一个消费者来处理消息的话,我们希望队列中的消息不要丢弃,也就是消息持久化。在生产者和消费者中都要将queue_declare第3个参数设置为true,表示让消息队列持久化。

$channel->queue_declare($queue, false, true, false, false);

此外,我们可以确保即使RabbitMQ重启了,消息队列不会丢失,在生产者端设置:。

‘delivery_mode’ => AMQPMessage::DELIVERY_MODE_PERSISTENT

生产者文件sender.php

/**

  • @sender.php

  • @消息生产者-分发任务

*/

require_once DIR . ‘/vendor/autoload.php’;

use PhpAmqpLib\Connection\AMQPStreamConnection;

use PhpAmqpLib\Message\AMQPMessage;

$queue = ‘worker’;

//$connection = new AMQPStreamConnection(’localhost’, 5672, ‘guest’, ‘guest’);

$connection = new AMQPStreamConnection(

    ‘192.168.0.100’,

    56720,

    ‘helloweba’,  //user

    ‘helloweba’,  //password

    ’test’  //vhost

);

$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false); //第3个参数设置为true,表示让消息队列持久化

READ MORE

rdbtools

需要python

pip install rdbtools

pip install python-lzf

转成csv文件

rdb -c memory dump_6379.rdb  > memory.csv 

匹配指定key前缀

rdb -c memory –key “app:.*” dump.rdb  > memory.csv

第4列排序

sort -t ‘,’  -k4 memory.csv | head

READ MORE

Redis - 事务(ACID)分析

前言 ACID,是指数据库管理系统(DBMS)在写入或更新资料的过程中,为保证事务(transaction)是正确可靠的,所必须具备的四个特性:原子性(atomicity,或称不可分割性)、一致性(consistency)、隔离性(isolation,又称独立性)、持久性(durability)。

常见支持事务的数据库或其部分引擎有:Oracle、DB2、MySQL…

可以看到上面的举例都是关系型数据,那对于 Redis 这种非关系性数据库来说。是否也支持 ACID 特性尼? 接着往下看。

ACID分析 Redis 提供了一组命令来实现事务的开启、提交、回滚等。事务的实现是在开启事务开始操作的命令被暂存到了命令队列,还没有实际执行。当执行了EXEC后才会被真正的执行。

命令 含义 备注 MULTI 开启事务 EXEC 提交事务 假如某个(或某些) key 正处于 WATCH 命令的监视之下,且事务块中有和这个(或这些) key 相关的命令,那么 EXEC 命令只在这个(或这些) key 没有被其他命令所改动的情况下执行并生效,否则该事务被打断(abort)。 DISCARD 类似回滚事务,只能用来主动放弃事务执行,把暂存的命令队列清空,起不到回滚的效果。 如果正在使用 WATCH 命令监视某个(或某些) key,那么取消所有监视,等同于执行命令 UNWATCH WATCH 监视一个(或多个) key 如果在事务提交之前这个(或这些) key 被其他命令所改动,那么事务将被打断。 UNWATCH 取消监视 如果在执行 WATCH 命令之后, EXEC 命令或 DISCARD 命令先被执行了的话,那么就不需要再执行 UNWATCH 了。 在具体分析前,先模拟一个事务的例子。比如商品 goods_a 目前库存还有10件,同样商品 goods_b 也是还有10件库存。下面用这个例子来进行分析。

原子性 一个事务(transaction)中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节。事务在执行过程中发生错误,会被恢复(Rollback)到事务开始前的状态。这是原子型的含义。

现在需要分析 Redis 是否支持原子性,我们需要举例当出现一些异常情况发生时候观察 Redis 的行为即可。

1、 在事务中进行语法错误的操作,例如 set 写成了 sset 。如下:

READ MORE

redis-shake

迁移单机到单机 ./redis-shake sync.toml

迁移集群到集群 python3 cluster_helper.py ../redis-shake ../sync.toml

pip3 install redis
pip3 install toml
[source]

version = 3.2 # redis version, such as 2.8, 4.0, 5.0, 6.0, 6.2, 7.0, ...
address = "10.2.1.200:6381"
username = "" # keep empty if not using ACL
password = "aaaaaa" # keep empty if no authentication is required
tls = false
elasticache_psync = "" # using when source is ElastiCache. ref: https://github.com/alibaba/RedisShake/issues/373

  

[target]

type = "standalone" # "standalone" or "cluster"
version = 7.0 # redis version, such as 2.8, 4.0, 5.0, 6.0, 6.2, 7.0, ...
# When the target is a cluster, write the address of one of the nodes.
# redis-shake will obtain other nodes through the `cluster nodes` command.
address = "10.2.1.186:6379"
username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
tls = false

READ MORE