1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
//! # client
//!
//! Implements the redis client capabilities.
//!
#[cfg(test)]
#[path = "./client_test.rs"]
mod client_test;
use crate::connection;
use crate::subscriber;
use crate::types::{
Interrupts, Message, RedisBoolResult, RedisEmptyResult, RedisError, RedisResult,
RedisStringResult,
};
use std::str::FromStr;
/// The redis client which enables to invoke redis operations.
pub struct Client {
/// Internal redis client
client: redis::Client,
/// Holds the current redis connection
connection: connection::Connection,
/// Internal subscriber
subscriber: subscriber::Subscriber,
}
fn run_command_on_connection<T: redis::FromRedisValue>(
connection: &mut redis::Connection,
command: &str,
args: Vec<&str>,
) -> RedisResult<T> {
let mut cmd = redis::cmd(command);
for arg in args {
cmd.arg(arg);
}
let result: redis::RedisResult<T> = cmd.query(connection);
match result {
Err(error) => Err(RedisError::RedisError(error)),
Ok(output) => Ok(output),
}
}
impl Client {
/// Returns true if the currently stored connection is valid, otherwise false.<br>
/// There is no need to call this function as any redis operation invocation will
/// ensure a valid connection is created.
pub fn is_connection_open(self: &mut Client) -> bool {
self.connection.is_connection_open()
}
/// Closes the internal connection to redis.<br>
/// The client can still be reused and any invocation of other operations after this call,
/// will reopen the connection.<br>
/// See redis [QUIT](https://redis.io/commands/quit) command.
///
/// # Example
///
/// ```
/// # let mut client = simple_redis::create("redis://127.0.0.1:6379/").unwrap();
/// match client.quit() {
/// Err(error) => println!("Error: {}", error),
/// _ => println!("Connection Closed.")
/// }
/// ```
///
pub fn quit(self: &mut Client) -> RedisEmptyResult {
let mut result = if self.is_connection_open() {
self.run_command_empty_response("QUIT", vec![])
} else {
Ok(())
};
if result.is_ok() {
result = self.unsubscribe_all();
}
result
}
/// Invokes the requested command with the provided arguments (all provided via args) and returns the operation
/// response.<br>
/// This function ensures that we have a valid connection and it is used internally by all other exposed
/// commands.<br>
/// This function is also public to enable invoking operations that are not directly exposed by the client.
///
/// # Arguments
///
/// * `command` - The Redis command, for example: `GET`
/// * `args` - Vector of arguments for the given command
///
/// # Example
///
/// ```
/// # let mut client = simple_redis::create("redis://127.0.0.1:6379/").unwrap();
/// match client.run_command::<String>("ECHO", vec!["testing"]) {
/// Ok(value) => assert_eq!(value, "testing"),
/// _ => panic!("test error"),
/// }
/// ```
pub fn run_command<T: redis::FromRedisValue>(
self: &mut Client,
command: &str,
args: Vec<&str>,
) -> RedisResult<T> {
match self.connection.get_redis_connection(&self.client) {
Ok(connection) => run_command_on_connection::<T>(connection, command, args),
Err(error) => Err(error),
}
}
/// invokes the run_command and returns typed result
pub fn run_command_from_string_response<T: FromStr>(
self: &mut Client,
command: &str,
args: Vec<&str>,
) -> RedisResult<T> {
match self.run_command::<String>(command, args) {
Ok(value) => match T::from_str(&value) {
Ok(typed_value) => Ok(typed_value),
_ => Err(RedisError::Description("Unable to parse output value.")),
},
Err(error) => Err(error),
}
}
/// invokes the run_command but returns empty result
pub fn run_command_empty_response(
self: &mut Client,
command: &str,
args: Vec<&str>,
) -> RedisEmptyResult {
self.run_command(command, args)
}
/// invokes the run_command but returns string result
pub fn run_command_string_response(
self: &mut Client,
command: &str,
args: Vec<&str>,
) -> RedisStringResult {
self.run_command(command, args)
}
/// invokes the run_command but returns bool result
pub fn run_command_bool_response(
self: &mut Client,
command: &str,
args: Vec<&str>,
) -> RedisBoolResult {
self.run_command(command, args)
}
/// Subscribes to the provided channel.<br>
/// Actual subscription only occurs at the first call to get_message.
///
/// # Arguments
///
/// * `channel` - The channel name, for example: `level_info`
///
/// # Example
///
/// ```
/// # let mut client = simple_redis::create("redis://127.0.0.1:6379/").unwrap();
/// client.subscribe("important_notifications");
/// ```
pub fn subscribe(self: &mut Client, channel: &str) -> RedisEmptyResult {
self.subscriber.subscribe(channel)
}
/// Subscribes to the provided channel pattern.<br>
/// Actual subscription only occurs at the first call to get_message.
///
/// # Arguments
///
/// * `channel` - The channel pattern, for example: `level_*`
///
/// # Example
///
/// ```
/// # let mut client = simple_redis::create("redis://127.0.0.1:6379/").unwrap();
/// client.psubscribe("important_notifications*");
/// ```
pub fn psubscribe(self: &mut Client, channel: &str) -> RedisEmptyResult {
self.subscriber.psubscribe(channel)
}
/// Returns true if subscribed to the provided channel.
pub fn is_subscribed(self: &mut Client, channel: &str) -> bool {
self.subscriber.is_subscribed(channel)
}
/// Returns true if subscribed to the provided channel pattern.
pub fn is_psubscribed(self: &mut Client, channel: &str) -> bool {
self.subscriber.is_psubscribed(channel)
}
/// Unsubscribes from the provided channel.
pub fn unsubscribe(self: &mut Client, channel: &str) -> RedisEmptyResult {
self.subscriber.unsubscribe(channel)
}
/// Unsubscribes from the provided channel pattern.
pub fn punsubscribe(self: &mut Client, channel: &str) -> RedisEmptyResult {
self.subscriber.punsubscribe(channel)
}
/// Unsubscribes from all channels.
pub fn unsubscribe_all(self: &mut Client) -> RedisEmptyResult {
self.subscriber.unsubscribe_all()
}
/// Fetches the messages from any of the subscribed channels and invokes the provided
/// on_message handler.<br>
/// This function will return an error in case no subscriptions are defined.<br>
/// This function will block and continue to listen to all messages, until either the
/// on_message returns true.
///
/// # Arguments
///
/// * `on_message` - Invoked on each read message. If returns true, the fetching will stop.
/// * `poll_interrupts` - Returns the interrupts struct, enabling to modify the fetching.
///
/// # Example
///
/// ```rust,no_run
/// # use simple_redis::Interrupts;
/// # let mut client = simple_redis::create("redis://127.0.0.1:6379/").unwrap();
/// client.subscribe("important_notifications");
///
/// // fetch messages from all subscriptions
/// client.fetch_messages(
/// &mut |message: simple_redis::Message| -> bool {
/// let payload : String = message.get_payload().unwrap();
/// println!("Got message: {}", payload);
///
/// // continue fetching
/// false
/// },
/// &mut || -> Interrupts { Interrupts::new() },
/// ).unwrap();
/// ```
pub fn fetch_messages(
self: &mut Client,
on_message: &mut dyn FnMut(Message) -> bool,
poll_interrupts: &mut dyn FnMut() -> Interrupts,
) -> RedisEmptyResult {
self.subscriber
.fetch_messages(&self.client, on_message, poll_interrupts)
}
}
/// Constructs a new redis client.<br>
/// The redis connection string must be in the following format: `redis://[:<passwd>@]<hostname>[:port][/<db>]`
///
/// # Arguments
///
/// * `connection_string` - The connection string in the format of: `redis://[:<passwd>@]<hostname>[:port][/<db>]`
///
/// # Example
///
/// ```
/// extern crate simple_redis;
/// fn main() {
/// match simple_redis::create("redis://127.0.0.1:6379/") {
/// Ok(client) => println!("Created Redis Client"),
/// Err(error) => println!("Unable to create Redis client: {}", error)
/// }
/// }
/// ```
pub fn create(connection_string: &str) -> Result<Client, RedisError> {
match redis::Client::open(connection_string) {
Ok(redis_client) => {
let redis_connection = connection::create();
let redis_pubsub = subscriber::create();
let client = Client {
client: redis_client,
connection: redis_connection,
subscriber: redis_pubsub,
};
Ok(client)
}
Err(error) => Err(RedisError::RedisError(error)),
}
}