1: <?php
2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18:
19: namespace Fluent\Logger;
20:
21: 22: 23: 24: 25:
26: class FluentLogger extends BaseLogger
27: {
28: const CONNECTION_TIMEOUT = 3;
29: const SOCKET_TIMEOUT = 3;
30: const MAX_WRITE_RETRY = 10;
31:
32:
33: const USLEEP_WAIT = 1000;
34:
35:
36: const DEFAULT_LISTEN_PORT = 24224;
37: const DEFAULT_ADDRESS = "127.0.0.1";
38:
39:
40: protected $host;
41:
42:
43: protected $port;
44:
45:
46: protected $transport;
47:
48: protected $socket;
49:
50:
51: protected $packer;
52:
53: protected $options = array(
54: "socket_timeout" => self::SOCKET_TIMEOUT,
55: "connection_timeout" => self::CONNECTION_TIMEOUT,
56: "usleep_wait" => self::USLEEP_WAIT,
57: );
58:
59: protected static $supported_transports = array(
60: "tcp","unix",
61: );
62:
63: protected static $acceptable_options = array(
64: "socket_timeout", "connection_timeout","usleep_wait",
65: );
66:
67: protected static $instances = array();
68:
69: 70: 71: 72: 73: 74: 75: 76: 77:
78: public function __construct($host = FluentLogger::DEFAULT_ADDRESS, $port = FluentLogger::DEFAULT_LISTEN_PORT, array $options = array())
79: {
80:
81: $this->host = $host;
82: $this->port = $port;
83:
84:
85: $this->transport = self::getTransportUri($host, $port);
86:
87: $this->packer = new JsonPacker();
88:
89: $this->mergeOptions($options);
90: }
91:
92: 93: 94: 95: 96: 97: 98: 99: 100:
101: public static function getTransportUri($host, $port)
102: {
103: if (($pos = strpos($host,"://")) !== false) {
104: $transport = substr($host,0,$pos);
105: $host = substr($host, $pos + 3);
106:
107: if (!in_array($transport, self::$supported_transports)) {
108: throw new \Exception("transport `{$transport}` does not support");
109: }
110:
111:
112: if ($transport == "unix") {
113:
114: $result = "unix://" . $host;
115: } else {
116: if (strpos($host,"::") !== false) {
117:
118: $host = sprintf("[%s]",trim($host,"[]"));
119: }
120:
121: $result = sprintf("%s://%s:%d",$transport, $host, $port);
122: }
123:
124: } else {
125: if (strpos($host,"::") !== false) {
126:
127: $host = sprintf("[%s]",trim($host,"[]"));
128: }
129:
130: $result = sprintf("tcp://%s:%d",$host, $port);
131: }
132:
133: return $result;
134: }
135:
136: 137: 138: 139: 140: 141:
142: public function setPacker(PackerInterface $packer)
143: {
144: return $this->packer = $packer;
145: }
146:
147: 148: 149: 150: 151:
152: public function getPacker()
153: {
154: return $this->packer;
155: }
156:
157: 158: 159: 160: 161: 162:
163: public function mergeOptions(array $options)
164: {
165: foreach ($options as $key => $value) {
166: if (!in_array($key, self::$acceptable_options)) {
167: throw new \Exception("option {$key} does not support");
168: }
169:
170: $this->options[$key] = $value;
171: }
172: }
173:
174: 175: 176: 177: 178: 179:
180: public function setOptions(array $options)
181: {
182: $this->options = array();
183: $this->mergeOptions($options);
184: }
185:
186: 187: 188: 189: 190: 191: 192: 193:
194: public static function open($host = FluentLogger::DEFAULT_ADDRESS, $port = FluentLogger::DEFAULT_LISTEN_PORT, array $options = array())
195: {
196: $key = sprintf("%s:%s:%s",$host, $port, md5(join(",",$options)));
197:
198: if (!isset(self::$instances[$key])) {
199: $logger = new self($host,$port, $options);
200: self::$instances[$key] = $logger;
201: }
202:
203: return self::$instances[$key];
204: }
205:
206: 207: 208: 209: 210:
211: public static function clearIntances()
212: {
213: foreach (self::$instances as $object) {
214: unset($object);
215: }
216: self::$instances = array();
217: }
218:
219: 220: 221: 222: 223:
224: protected function connect()
225: {
226:
227:
228: $socket = @stream_socket_client($this->transport,$errno,$errstr,
229: $this->getOption("connection_timeout",self::CONNECTION_TIMEOUT),
230: \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_PERSISTENT
231: );
232:
233: if (!$socket) {
234: $errors = error_get_last();
235: throw new \Exception($errors['message']);
236: }
237:
238:
239: stream_set_timeout($socket,$this->getOption("socket_timeout",self::SOCKET_TIMEOUT));
240: $this->socket = $socket;
241: }
242:
243: 244: 245: 246: 247:
248: protected function reconnect()
249: {
250: if (!is_resource($this->socket)) {
251: $this->connect();
252: }
253: }
254:
255: 256: 257: 258: 259: 260: 261: 262: 263:
264: public function post($tag, array $data)
265: {
266: $entity = new Entity($tag, $data);
267: return $this->postImpl($entity);
268: }
269:
270: 271: 272: 273: 274: 275:
276: public function post2(Entity $entity)
277: {
278: return $this->postImpl($entity);
279: }
280:
281: protected function postImpl(Entity $entity)
282: {
283: $buffer = $packed = $this->packer->pack($entity);
284: $length = strlen($packed);
285: $retry = $written = 0;
286:
287: try {
288: $this->reconnect();
289: } catch (\Exception $e) {
290: $this->processError($entity, $e->getMessage());
291: return false;
292: }
293:
294: try {
295:
296: while ($written < $length) {
297: $nwrite = $this->write($buffer);
298:
299: if ($nwrite === false) {
300:
301:
302: throw new \Exception("could not write message");
303: } else if ($nwrite === "") {
304:
305:
306: throw new \Exception("connection aborted");
307: } else if ($nwrite === 0) {
308: if ($retry > self::MAX_WRITE_RETRY) {
309: throw new \Exception("failed fwrite retry: max retry count");
310: }
311:
312: $retry++;
313: usleep($this->getOption("usleep_wait",self::USLEEP_WAIT));
314: continue;
315: }
316:
317: $written += $nwrite;
318: $buffer = substr($packed,$written);
319: }
320: } catch (\Exception $e) {
321: $this->processError($entity, $e->getMessage());
322: return false;
323: }
324:
325: return true;
326: }
327:
328: 329: 330: 331: 332: 333:
334: protected function write($buffer)
335: {
336: return fwrite($this->socket, $buffer);
337: }
338:
339: public function __destruct()
340: {
341:
342: }
343:
344: 345: 346: 347: 348: 349: 350:
351: public function getOption($key, $default = null)
352: {
353: $result = $default;
354: if (isset($this->options[$key])) {
355: $result = $this->options[$key];
356: }
357:
358: return $result;
359: }
360: }
361: