张量上的异或使用矢量化在 Tensorflow 中具有浮点值

问题描述

我有两个形状相同的张量 t1 和 t2(在我的例子中是 [64,64,3])。我需要计算这两个张量的异或。但想不出办法做到这一点。

import bitstring
from bitstring import *

@tf.function
def xor(x1,x2) :
  a = BitArray(float=x1,length = 64)
  b = BitArray(float=x2,length = 64)
  a ^= b
  return a.float

这个 xor 函数在 python 中计算两个浮点值的异或。

样本输入张量是,

t1 = tf.constant([[1.1,2.2,3.3],[4.4,5.5,6.6]],dtype=tf.float64)
t2 = tf.constant([[7.7,8.8,9.9],[10.1,11.11,12.12]],dtype=tf.float64)

我似乎找不到计算两个张量的 xor方法

  1. 我如何编写 xor 函数调用矢量化版本,该函数调用将从 任何形状 的两个张量计算每对浮点数的异或(类似于tf.add、tf.matmul 等)?我试过 np.vectorized 等。
  2. 如何高效地编写xor函数?为了在 tensorflow 中使用 GPU,我需要使用 tf.something 编写每个语句,例如tf.add,tf. matmul 等。但是由于 tensorflow 没有对 Bitstring 的原生支持,有没有办法在 tensorflow 中(在 xor 函数中)将浮点数转换为位串,以便我可以执行 tf.bitwise_xor以后再说?

解决方法

在实际需要浮点数的上下文中尝试在两个浮点数之间使用异或结果时要注意这一点。

import struct

x = 1.0
y = 3.5
x1 = list(struct.pack('d',x ))
y1 = list(struct.pack('d',y ))
print('x1',x1)
print('y1',y1)

z1 = [a^b for a,b in zip(x1,y1)]
print('z1',z1)

z1 = bytes(z1)
z = struct.unpack('d',z1)[0]
print('z',z)

输出:

C:\tmp>python x.py
x1 [0,240,63]
y1 [0,12,64]
z1 [0,252,127]
z nan

C:\tmp>
,

您可能需要自定义 C++ 操作来执行此操作。 Tensorflow docs 有一个关于如何构建一个很好的教程。这是一个让您入门的示例。

xor_op.cc

#include "tensorflow/core/framework/common_shape_fns.h"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/shape_inference.h"
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/framework/tensor_types.h"

namespace tensorflow {
using shape_inference::InferenceContext;

REGISTER_OP("Xor")
    .Input("input_tensor_a: float")
    .Input("input_tensor_b: float")
    .Output("output_tensor: float")
    .SetShapeFn([](InferenceContext* c) {
      return shape_inference::UnchangedShapeWithRankAtLeast(c,1);
    });

class XorOp : public OpKernel {
 public:
  explicit XorOp(OpKernelConstruction* ctx) : OpKernel(ctx) {}

  float XorFloats(const float* a,const float* b,float* c) {
    *(int*)c = *(int*)a ^ *(int*)b;
    return *c;
  }

  void Compute(OpKernelContext* ctx) override {
    // get input tensors
    const Tensor& input_fst = ctx->input(0);
    const Tensor& input_snd = ctx->input(1);

    TTypes<float,1>::ConstFlat c_in_fst = input_fst.flat<float>();
    TTypes<float,1>::ConstFlat c_in_snd = input_snd.flat<float>();

    // allocate output tensor
    Tensor* output_tensor = nullptr;
    OP_REQUIRES_OK(ctx,ctx->allocate_output(0,input_fst.shape(),&output_tensor));

    auto output_flat = output_tensor->flat<float>();
    const int N = c_in_fst.size();

    for (int i = 0; i < N; ++i) {
      XorFloats(&c_in_fst(i),&c_in_snd(i),&output_flat(i));
    }
  }
};

REGISTER_KERNEL_BUILDER(Name("Xor").Device(DEVICE_CPU),XorOp);

}  // namespace tensorflow

让我们构建操作并进行测试

$ TF_LFLAGS=($(python -c 'import tensorflow as tf; print(" ".join(tf.sysconfig.get_link_flags()))'))
$ TF_CFLAGS=($(python -c 'import tensorflow as tf; print(" ".join(tf.sysconfig.get_compile_flags()))'))
$ 
$ g++ -std=c++14 -shared xor_op.cc -o xor_op.so -fPIC ${TF_CFLAGS[@]} ${TF_LFLAGS[@]} -O2

让我们运行 op 看看它是否有效。

main.py

import tensorflow as tf


def main():
    xor_module = tf.load_op_library("./xor_op.so")
    xor_op = xor_module.xor

    # make some data
    a = tf.constant(
        [[1.1,2.2,3.3],[4.4,5.5,6.6]],dtype=tf.float32)

    b = tf.constant(
        [[7.7,8.8,9.9],[10.1,11.11,12.12]],dtype=tf.float32)
    
    c = xor_op(a,b)

    print(f"a: {a}")
    print(f"b: {b}")
    print(f"c: {c}")


if __name__ == "__main__":
    main()

# a: [[1.1 2.2 3.3]
#     [4.4 5.5 6.6]]
# b: [[ 7.7   8.8   9.9 ]
#     [10.1  11.11 12.12]]
# c: [[3.3319316e+38 2.3509887e-38 3.7713776e-38]
#     [6.3672620e-38 4.7666294e-38 5.3942895e-38]]

酷。让我们更严格地测试一下。

test.py

import tensorflow as tf
from tensorflow.python.platform import test as test_lib


class XorOpTest(test_lib.TestCase):
    def setUp(self):
        # import the custom op
        xor_module = tf.load_op_library("./xor_op.so")
        self._xor_op = xor_module.xor

        # make some data
        self.a = tf.constant(
            [[1.1,dtype=tf.float32)

        self.b = tf.constant(
            [[7.7,dtype=tf.float32)

    def test_xor_op(self):
        c = self._xor_op(self.a,self.b)
        self.assertAllEqual(self._xor_op(c,self.b),self.a)


if __name__ == "__main__":
    test_lib.main()

# [ RUN      ] XorOpTest.test_xor_op
# [       OK ] XorOpTest.test_xor_op
# ----------------------------------------------------------------------
# Ran 1 test in 0.005s
# 
# OK

我会把它留给你来扩展它以在 GPU 上工作。 如果您好奇,XorFloats 方法来自 inverse square root problem 中使用的位级操作。