Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

Commit

Permalink
Fixed the subscribed topic names
Browse files Browse the repository at this point in the history
  • Loading branch information
rennokki committed Aug 24, 2020
1 parent 3f8bb62 commit c79bac0
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions src/PubSub/Drivers/RedisClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public function publish($appId, string $channel, stdClass $payload): bool

$payload = json_encode($payload);

$this->publishClient->__call('publish', ["{$appId}:{$channel}", $payload]);
$this->publishClient->__call('publish', [$this->getTopicName($appId, $channel), $payload]);

DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [
'channel' => $channel,
'serverId' => $this->getServerId(),
'payload' => $payload,
'pubsub' => "{$appId}:{$channel}",
'pubsub' => $this->getTopicName($appId, $channel),
]);

return true;
Expand All @@ -127,7 +127,7 @@ public function subscribe($appId, string $channel): bool
{
if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
// We're not subscribed to the channel yet, subscribe and set the count to 1
$this->subscribeClient->__call('subscribe', ["{$appId}:{$channel}"]);
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId, $channel)]);
$this->subscribedChannels["{$appId}:{$channel}"] = 1;
} else {
// Increment the subscribe count if we've already subscribed
Expand All @@ -137,7 +137,7 @@ public function subscribe($appId, string $channel): bool
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
'channel' => $channel,
'serverId' => $this->getServerId(),
'pubsub' => "{$appId}:{$channel}",
'pubsub' => $this->getTopicName($appId, $channel),
]);

return true;
Expand Down Expand Up @@ -169,7 +169,7 @@ public function unsubscribe($appId, string $channel): bool
DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
'channel' => $channel,
'serverId' => $this->getServerId(),
'pubsub' => "{$appId}:{$channel}",
'pubsub' => $this->getTopicName($appId, $channel),
]);

return true;
Expand All @@ -194,7 +194,7 @@ public function joinChannel($appId, string $channel, string $socketId, string $d
'serverId' => $this->getServerId(),
'socketId' => $socketId,
'data' => $data,
'pubsub' => "{$appId}:{$channel}",
'pubsub' => $this->getTopicName($appId, $channel),
]);
}

Expand All @@ -209,13 +209,13 @@ public function joinChannel($appId, string $channel, string $socketId, string $d
*/
public function leaveChannel($appId, string $channel, string $socketId)
{
$this->publishClient->__call('hdel', ["{$appId}:{$channel}", $socketId]);
$this->publishClient->__call('hdel', [$this->getTopicName($appId, $channel), $socketId]);

DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
'channel' => $channel,
'serverId' => $this->getServerId(),
'socketId' => $socketId,
'pubsub' => "{$appId}:{$channel}",
'pubsub' => $this->getTopicName($appId, $channel),
]);
}

Expand All @@ -228,7 +228,7 @@ public function leaveChannel($appId, string $channel, string $socketId)
*/
public function channelMembers($appId, string $channel): PromiseInterface
{
return $this->publishClient->__call('hgetall', ["{$appId}:{$channel}"])
return $this->publishClient->__call('hgetall', [$this->getTopicName($appId, $channel)])
->then(function ($members) {
// The data is expected as objects, so we need to JSON decode
return array_map(function ($user) {
Expand All @@ -249,7 +249,7 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa
$this->publishClient->__call('multi', []);

foreach ($channelNames as $channel) {
$this->publishClient->__call('hlen', ["{$appId}:{$channel}"]);
$this->publishClient->__call('hlen', [$this->getTopicName($appId, $channel)]);
}

return $this->publishClient->__call('exec', [])
Expand Down Expand Up @@ -371,4 +371,19 @@ public function getServerId()
{
return $this->serverId;
}

/**
* Get the Pub/Sub Topic name to subscribe based on the
* app ID and channel name.
*
* @param mixed $appId
* @param string $channel
* @return string
*/
protected function getTopicName($appId, string $channel): string
{
$prefix = config('database.redis.options.prefix', null);

return "{$prefix}{$appId}:{$channel}";
}
}

0 comments on commit c79bac0

Please sign in to comment.